我正在使用 pyspark 来处理我的数据,最后我需要使用 rdd.collect() 从 rdd 收集数据。然而,由于内存问题,我的 Spark 崩溃了。我尝试了很多方法,但没有成功。我现在运行以下代码,为每个分区处理一小块数据:
def make_part_filter(index):
def part_filter(split_index, iterator):
if split_index == index:
for el in iterator:
yield el
return part_filter
for part_id in range(rdd.getNumPartitions()):
part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
myCollection = part_rdd.collect()
for row in myCollection:
#Do something with each row
我当前使用的新代码不会崩溃,但似乎会永远运行。
有没有更好的方法从大型 rdd 中收集数据?
我不知道这是否是最好的方法,但这是我尝试过的最好的方法。不知道比你的好还是差。同样的想法,将其分成块,但是您可以更灵活地设置块大小。
def rdd_iterate(rdd, chunk_size=1000000):
indexed_rows = rdd.zipWithIndex().cache()
count = indexed_rows.count()
print("Will iterate through RDD of count {}".format(count))
start = 0
end = start + chunk_size
while start < count:
print("Grabbing new chunk: start = {}, end = {}".format(start, end))
chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
for row in chunk:
yield row[0]
start = end
end = start + chunk_size
示例用法,我想将一个巨大的 RDD 附加到磁盘上的 CSV 文件,而不用整个 RDD 填充 Python 列表:
def rdd_to_csv(fname, rdd):
import csv
f = open(fname, "a")
c = csv.writer(f)
for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD
c.writerows([row])
f.close()
rdd_to_csv("~/test.csv", my_really_big_rdd)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)