从 Spark rdd 收集大型数据集的最佳实践是什么?

2023-12-19

我正在使用 pyspark 来处理我的数据,最后我需要使用 rdd.collect() 从 rdd 收集数据。然而,由于内存问题,我的 Spark 崩溃了。我尝试了很多方法,但没有成功。我现在运行以下代码,为每个分区处理一小块数据:

def make_part_filter(index):
    def part_filter(split_index, iterator):
        if split_index == index:
            for el in iterator:
                yield el
    return part_filter


for part_id in range(rdd.getNumPartitions()):
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
    myCollection = part_rdd.collect()
    for row in myCollection:
          #Do something with each row

我当前使用的新代码不会崩溃,但似乎会永远运行。

有没有更好的方法从大型 rdd 中收集数据?


我不知道这是否是最好的方法,但这是我尝试过的最好的方法。不知道比你的好还是差。同样的想法,将其分成块,但是您可以更灵活地设置块大小。

def rdd_iterate(rdd, chunk_size=1000000):
    indexed_rows = rdd.zipWithIndex().cache()
    count = indexed_rows.count()
    print("Will iterate through RDD of count {}".format(count))
    start = 0
    end = start + chunk_size
    while start < count:
        print("Grabbing new chunk: start = {}, end = {}".format(start, end))
        chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
        for row in chunk:
            yield row[0]
        start = end
        end = start + chunk_size

示例用法,我想将一个巨大的 RDD 附加到磁盘上的 CSV 文件,而不用整个 RDD 填充 Python 列表:

def rdd_to_csv(fname, rdd):
    import csv
    f = open(fname, "a")
    c = csv.writer(f)
    for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD
        c.writerows([row])
    f.close()

rdd_to_csv("~/test.csv", my_really_big_rdd)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Spark rdd 收集大型数据集的最佳实践是什么? 的相关文章

随机推荐