关于这个问题的帖子很多,但没有一个回答我的问题。
我遇到了OutOfMemoryError
在 PySpark 中尝试将许多不同的数据帧连接在一起。
我的本地机器有 16GB 内存,我的 Spark 配置如下:
class SparkRawConsumer:
def __init__(self, filename, reference_date, FILM_DATA):
self.sparkContext = SparkContext(master='local[*]', appName='my_app')
SparkContext.setSystemProperty('spark.executor.memory', '3g')
SparkContext.setSystemProperty('spark.driver.memory', '15g')
显然有很多很多关于 Spark 中 OOM 错误的帖子,但基本上大多数都说要增加你的内存属性。
我本质上是从 50-60 个较小的数据帧执行连接,这些数据帧有两列uid
, and data_in_the_form_of_lists
(通常,它是 Python 字符串的列表)。我要加入的主数据框有大约 10 列,但还包含uid
专栏(我正在加入)。
我只尝试连接 1,500 行数据。但是,当显然所有这些数据都可以放入内存时,我会频繁遇到 OutOfMemory 错误。我通过查看存储中的 SparkUI 来确认这一点:
在代码中,我的连接如下所示:
# lots of computations to read in my dataframe and produce metric1, metric2, metric3, .... metric 50
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric1, schema=["uid", "metric1"]), on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric2, schema=["uid", "metric2"]),
on="gid_value")
metrics_df.repartition("gid_value")
metrics_df = metrics_df.join(
self.sqlContext.createDataFrame(metric3, schema=["uid", "metric3"]),
on="uid")
metrics_df.count()
metrics_df.repartition("gid_value")
Where metric1
, metric2
and metric3
是我在连接之前转换为数据帧的 RDD(请记住,实际上有 50 个较小的 RDD)metric
我正在加入 dfs)。
I call metric.count()
强制评估,因为它似乎有助于防止内存错误(否则在尝试最终收集时我会遇到更多驱动程序错误)。
这些错误是不确定的。我没有看到它们始终出现在我的连接中的任何特定位置,有时似乎出现在我的最后一个位置metrics_df.collect()
调用,有时在较小的连接期间。
我真的怀疑任务序列化/反序列化存在一些问题。例如,当我查看典型阶段的事件时间线时,我发现其中大部分由任务反序列化占用:
我还注意到垃圾收集时间很大:
垃圾收集是导致内存错误的问题吗?还是任务序列化?
编辑回答评论问题
我一直在将 Spark 作业作为更大的 PyCharm 项目的一部分来运行(因此 Spark 上下文被包裹在一个类中)。我使用以下 Spark 提交重构了代码以将其作为脚本运行:
spark-submit spark_consumer.py \
--driver-memory=10G \
--executor-memory=5G \
--conf spark.executor.extraJavaOptions='-XX:+UseParallelGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'