我们正在运行以下阶段的 DAG,并且对于相对较小的 shuffle 数据大小(每个任务大约 19MB),经历了较长的 shuffle 读取时间
一个有趣的方面是每个执行器/服务器内的等待任务具有相同的随机读取时间。以下是其含义的示例:对于以下服务器,一组任务等待大约 7.7 分钟,另一组任务等待大约 26 秒。
这是同一阶段运行的另一个示例。该图显示了 3 个执行器/服务器,每个执行器/服务器都有统一的任务组,并且具有相同的随机读取时间。蓝色组代表由于推测执行而被终止的任务:
并不是所有的执行者都是这样。有些任务几乎均匀地在几秒钟内完成所有任务,并且这些任务的远程读取数据大小与在其他服务器上等待很长时间的任务相同。
此外,这种类型的阶段在我们的应用程序运行时运行两次。产生这些具有大量随机读取时间的任务组的服务器/执行器在每个阶段运行中都是不同的。
以下是其中一台服务器/主机的任务统计表示例:
看起来负责这个 DAG 的代码如下:
output.write.parquet("output.parquet")
comparison.write.parquet("comparison.parquet")
output.union(comparison).write.parquet("output_comparison.parquet")
val comparison = data.union(output).except(data.intersect(output)).cache()
comparison.filter(_.abc != "M").count()
我们非常感谢您对此的想法。
显然,问题出在 JVM 垃圾收集 (GC) 上。这些任务必须等待远程执行器上的 GC 完成。等效的随机读取时间是由于多个任务正在等待执行 GC 的单个远程主机这一事实造成的。我们遵循了发布的建议here https://stackoverflow.com/questions/38981772/spark-shuffle-operation-leading-to-long-gc-pause/39111205并且问题减少了一个数量级。远程主机上的 GC 时间和本地 shuffle 读取时间之间仍然存在很小的相关性。未来我们想尝试shuffle服务。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)