使用 Spark collectionAccumulator 时出现 ConcurrentModificationException

2024-04-30

我尝试在 Azure HDInsight 按需群集上运行基于 Spark 的应用程序,并且看到记录了大量 SparkException(由 ConcurrentModificationException 引起)。当我启动本地 Spark 实例时,应用程序运行时没有出现这些错误。

我看过有关的报道使用累加器时出现类似错误 https://issues.apache.org/jira/browse/SPARK-17463我的代码确实使用了 CollectionAccumulator,但是我在使用它的任何地方都放置了同步块,这没有什么区别。累加器相关的代码如下所示:

class MySparkClass(sc : SparkContext) {
    val myAccumulator = sc.collectionAccumulator[MyRecord]

    override def add(record: MyRecord) = {
        synchronized {
            myAccumulator.add(record)
        }
    }

    override def endOfBatch() = {
        synchronized {
            myAccumulator.value.asScala.foreach((record: MyRecord) => {
                processIt(record)
            })
        }
    }
}

异常不会导致应用程序失败,但是当endOfBatch被调用并且代码尝试从累加器中读取值,它是空的并且processIt从未被调用过。

我们正在使用HDInsight 版本 3.6 https://learn.microsoft.com/en-us/azure/hdinsight/hdinsight-component-versioningSpark 版本 2.3.0

18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList.writeObject(ArrayList.java:770)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)
    at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
    ... 13 more

以下代码是一个更独立的示例,可重现该问题。MyRecord是一个仅包含数值的简单案例类。该代码在本地运行时没有错误,但在 HDInsight 群集上会产生上述错误。

object MainDemo {
    def main(args: Array[String]) {
        val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
        val myAccumulator = sparkContext.collectionAccumulator[MyRecord]

        sparkContext.binaryFiles("/my/files/here").foreach(_ => {
            for(i <- 1 to 100000) {
                val record = MyRecord(i, 0, 0)
                myAccumulator.add(record)
            }
        })

        myAccumulator.value.asScala.foreach((record: MyRecord) => {
            // we expect this to be called once for each record that we 'add' above,
            // but it is never called
            println(record)
        })
    }
}

我怀疑同步块是否真的有帮助。 CustomeAccumulators 或所有其他累加器都不是线程安全的。它们实际上不必这样做,因为 Spark 驱动程序在任务完成(成功或失败)后用于更新累加器值的 DAGScheduler.updateAccumulators 方法仅在运行调度循环的单个线程上执行。除此之外,它们是具有自己的本地累加器引用的工作人员的只写数据结构,而仅驱动程序允许访问累加器的值。 当你说它在本地模式下工作时,因为它是单个 JVM,但在集群模式下,它们是不同的 JVM 和 java 实例,正在触发 PRC 调用以启用通信。

MyRecord 对象的外观以及如果您只是以 .value 结束行而不是在其上使用迭代器会有所帮助。你试一试。

myAccumulator.value
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 Spark collectionAccumulator 时出现 ConcurrentModificationException 的相关文章

随机推荐