我正在尝试对 Cloudera 的 Spark (2.1.0) 中的数据帧进行 groupBy 操作,该集群位于总 RAM 约为 512GB 的 7 节点集群上。我的代码如下。
ndf = ndf.repartition(20000)
by_user_df = ndf.groupBy(ndf.name) \
.agg(collect_list("file_name")) \
.withColumnRenamed('collect_list(file_name)', 'file_names')
by_user_df = by_user_df.repartition(20000)
by_user_df.count()
ndf 是一个包含 2 列、一个用户 ID 和一个文件名的数据框。我正在尝试按用户 ID 创建文件名列表,以传递给 CountVectorizer 和聚类。
我收到以下错误
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
据我所知,这是由于分配的数组大于虚拟机在连续内存中可以处理的大小,或者大于系统数组大小的最大值。许多建议是通过分成更多分区来实现更多并行化。
我有大约 6000 个用户和大约 7000 个文件名。我注意到死亡的执行者大部分时间都花在垃圾收集上。
到目前为止我已经尝试过以下操作:
- 重新分区 ndf 数据帧和生成的数据帧。我在每个重新分区参数中尝试了最多 60k。
- 我已将“spark.sql.shuffle.partitions”设置为最多 20000 个步骤
- 我已将执行程序内存提升至 25G
- 尽管死掉的执行程序似乎不是驱动程序,但我也将驱动程序内存增加到了 25G。
作为这个问题的更新:我意识到在这种情况下我正在对数据进行二进制聚类,所以我实际上只需要每个文件名之一。改变collect_list
to collect_set
给我留下了我需要的输出,并且显然足够小,可以在给定的参数内运行。我仍然会尝试修复原来的情况。
首先我不太明白为什么你需要这么高的分区值。我不知道 7 个工作线程中每个线程有多少个核心,但我怀疑你需要超过 200 个分区(你使用的分区数量非常多,这实际上可以解释为什么你的工作线程死于垃圾收集)
您的问题看起来像是 JVM 定义中的内存问题,因此我认为没有理由增加驱动程序或工作人员内存。
我认为您需要的是设置 Xss 和 Xmx 或 MaxPermSize,如下所述:如何修复 Java 中的“请求的数组大小超出 VM 限制”错误? https://stackoverflow.com/questions/5497259/how-to-fix-requested-array-size-exceeds-vm-limit-error-in-java
为此,您需要在运行 Spark 时使用 --conf spark.driver.extraJavaOptions 和 --conf spark.executor.extraJavaOptions。
例如:
--conf spark.driver.extraJavaOptions="-Xss10m -XX:MaxPermSize=512M " --conf spark.executor.extraJavaOptions="-Xss10m -XX:MaxPermSize=128M "
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)