我有一项工作需要在分区的 Spark 数据帧上运行,该过程如下所示:
rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
结果是rdd
of pandas.dataframe
,
type(rdd) => pyspark.rdd.PipelinedRDD
type(rdd.collect()[0]) => pandas.core.frame.DataFrame
and rdd.glom().collect()
返回结果如下:
[[df1], [df2], ...]
现在我希望将结果转换为spark dataframe,我所做的方式是:
sp = None
for i, partition in enumerate(rdd.collect()):
if i == 0:
sp = spark.createDataFrame(partition)
else:
sp = sp.union(spark.createDataFrame(partition))
return sp
然而,结果可能是巨大的rdd.collect()
可能超出驱动程序的内存,所以我需要避免collect()
手术。有办法解决这个问题吗?
提前致谢!
如果你想继续使用 rdd api。mapPartitions
接受一种类型的迭代器并期望另一种类型的迭代器作为结果。 pandas_df 不是迭代器类型mapPartitions
可以直接处理。如果你必须使用 pandas api,你可以从创建一个合适的生成器pandas.iterrows
这样你的整体mapPartitions
结果将是行类型的单个 rdd,而不是 pandas 数据帧的 rdd。这样的 rdd 可以通过动态模式发现无缝转换为数据帧
from pyspark.sql import Row
def some_fuction(iter):
pandas_df = some_pandas_result(iter)
for index, row in pandas_df.iterrows():
yield Row(id=index, foo=row['foo'], bar=row['bar'])
rdd = sp_df.repartition(n_partitions, partition_key).rdd.mapPartitions(lambda x: some_function(x))
df = spark.createDataFrame(rdd)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)