Spark:当我在 Range 中使用累加器时,它无法正常工作

2024-04-05

我不明白为什么 Spark 没有正确更新我的累加器。

object AccumulatorsExample extends App {
  val acc = sc.accumulator(0L, "acc")
  sc range(0, 20000, step = 25) map { _ => acc += 1 } count()
  assert(acc.value == 800) // not equals
}

我的火花配置:

setMaster("local[*]") // should use 8 cpu cores

我不确定 Spark 是否在每个核心上分配累加器的计算,也许这就是问题所在。

我的问题是如何汇总所有acc值在一个总和中并获得正确的累加器值(800)?

PS

如果我限制核心数量setMaster("local[1]")比一切都好。


这里有两个不同的问题:

  • 你正在延长App而不是实施main方法。有一些与此方法相关的已知问题,包括不正确的累加器行为,因此它不应该在 Spark 应用程序中使用 https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications。这很可能是问题的根源。

    参见示例SPARK-4170 https://issues.apache.org/jira/browse/SPARK-4170对于与扩展相关的其他可能的问题App.

  • 您在转换中使用累加器。这意味着累加器可以递增任意次数(当给定作业成功时至少一次)。

    一般来说,您需要精确的结果,您应该仅在类似的操作中使用累加器foreach and foreachPartition尽管您不太可能在像这样的玩具应用程序中遇到任何问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark:当我在 Range 中使用累加器时,它无法正常工作 的相关文章

随机推荐