Spark - java.lang.OutOfMemoryError:请求的数组大小超出 VM 限制

2024-05-05

我正在尝试对 Cloudera 的 Spark (2.1.0) 中的数据帧进行 groupBy 操作,该集群位于总 RAM 约为 512GB 的 7 节点集群上。我的代码如下。

ndf = ndf.repartition(20000)
by_user_df = ndf.groupBy(ndf.name) \
            .agg(collect_list("file_name")) \
            .withColumnRenamed('collect_list(file_name)', 'file_names')


by_user_df = by_user_df.repartition(20000)    
by_user_df.count()

ndf 是一个包含 2 列、一个用户 ID 和一个文件名的数据框。我正在尝试按用户 ID 创建文件名列表,以传递给 CountVectorizer 和聚类。

我收到以下错误

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:73)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:166)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

据我所知,这是由于分配的数组大于虚拟机在连续内存中可以处理的大小,或者大于系统数组大小的最大值。许多建议是通过分成更多分区来实现更多并行化。

我有大约 6000 个用户和大约 7000 个文件名。我注意到死亡的执行者大部分时间都花在垃圾收集上。

到目前为止我已经尝试过以下操作:

  1. 重新分区 ndf 数据帧和生成的数据帧。我在每个重新分区参数中尝试了最多 60k。
  2. 我已将“spark.sql.shuffle.partitions”设置为最多 20000 个步骤
  3. 我已将执行程序内存提升至 25G
  4. 尽管死掉的执行程序似乎不是驱动程序,但我也将驱动程序内存增加到了 25G。

作为这个问题的更新:我意识到在这种情况下我正在对数据进行二进制聚类,所以我实际上只需要每个文件名之一。改变collect_list to collect_set给我留下了我需要的输出,并且显然足够小,可以在给定的参数内运行。我仍然会尝试修复原来的情况。


首先我不太明白为什么你需要这么高的分区值。我不知道 7 个工作线程中每个线程有多少个核心,但我怀疑你需要超过 200 个分区(你使用的分区数量非常多,这实际上可以解释为什么你的工作线程死于垃圾收集)

您的问题看起来像是 JVM 定义中的内存问题,因此我认为没有理由增加驱动程序或工作人员内存。

我认为您需要的是设置 Xss 和 Xmx 或 MaxPermSize,如下所述:如何修复 Java 中的“请求的数组大小超出 VM 限制”错误? https://stackoverflow.com/questions/5497259/how-to-fix-requested-array-size-exceeds-vm-limit-error-in-java

为此,您需要在运行 Spark 时使用 --conf spark.driver.extraJavaOptions 和 --conf spark.executor.extraJavaOptions。

例如:

--conf spark.driver.extraJavaOptions="-Xss10m -XX:MaxPermSize=512M " --conf spark.executor.extraJavaOptions="-Xss10m -XX:MaxPermSize=128M "
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark - java.lang.OutOfMemoryError:请求的数组大小超出 VM 限制 的相关文章

随机推荐