我正在一个相当小的数据集(HDFS 中的 80 个文件,总共很少)上执行一个简单的 groupBy。我在纱线集群中的 8 台低内存机器上运行 Spark,即:
spark-submit ... --master yarn-client --num-executors 8 --executor-memory 3000m --executor-cores 1
该数据集由长度为 500-2000 的字符串组成。
我正在尝试做一个简单的groupByKey
(见下文),但它失败了java.lang.OutOfMemoryError: GC overhead limit exceeded
例外
val keyvals = sc.newAPIHadoopFile("hdfs://...")
.map( someobj.produceKeyValTuple )
keyvals.groupByKey().count()
我可以使用以下方法计算组大小reduceByKey
没有问题,请确保问题不是由单个过大的组引起的,也不是由过多的组引起的:
keyvals.map(s => (s._1, 1)).reduceByKey((a,b) => a+b).collect().foreach(println)
// produces:
// (key1,139368)
// (key2,35335)
// (key3,392744)
// ...
// (key13,197941)
我尝试过重新格式化、重新排列和增加 groupBy 并行度:
keyvals.groupByKey(24).count // fails
keyvals.groupByKey(3000).count // fails
keyvals.coalesce(24, true).groupByKey(24).count // fails
keyvals.coalesce(3000, true).groupByKey(3000).count // fails
keyvals.coalesce(24, false).groupByKey(24).count // fails
keyvals.coalesce(3000, false).groupByKey(3000).count // fails
我尝试过玩spark.default.parallelism
,并且增加spark.shuffle.memoryFraction
to 0.8
同时降低spark.storage.memoryFraction
to 0.1
失败的阶段(计数)将在 3000 个任务中的第 2999 个任务上失败。
我似乎找不到任何表明 groupBy 不应该只溢出到磁盘而不是将内容保留在内存中的内容,但我就是无法让它正常工作,即使在相当小的数据集上也是如此。显然情况并非如此,我一定做错了什么,但我不知道从哪里开始调试!