Spark 3 KryoSerializer 问题 - 无法找到类:org.apache.spark.util.collection.OpenHashMap

2024-06-20

我正在将 Spark 2.4 项目升级到 Spark 3.x。我们遇到了一些现有 Spark-ml 代码的问题:

var stringIndexers = Array[StringIndexer]()
for (featureColumn <- FEATURE_COLS) {
    stringIndexers = stringIndexers :+ new StringIndexer().setInputCol(featureColumn).setOutputCol(featureColumn + "_index")
}
val pipeline = new Pipeline().setStages(stringIndexers)
val dfWithNumericalFeatures = pipeline.fit(decoratedDf).transform(decoratedDf)

具体来说,这一行:val dfWithNumericalFeatures = pipeline.fit(decoratedDf).transform(decoratedDf)现在 Spark 3 中会导致这个神秘的异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 238.0 failed 1 times, most recent failure: Lost task 0.0 in stage 238.0 (TID 5589) (executor driver): com.esotericsoftware.kryo.KryoException: Unable to find class: org.apache.spark.util.collection.OpenHashMap$mcJ$sp$$Lambda$13346/2134122295
[info] Serialization trace:
[info] org$apache$spark$util$collection$OpenHashMap$$grow (org.apache.spark.util.collection.OpenHashMap$mcJ$sp)
[info]  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:156)
[info]  at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
[info]  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
[info]  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
[info]  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
[info]  at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
[info]  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:396)
[info]  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:307)
[info]  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:790)
[info]  at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:397)
[info]  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
[info]  at org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression.deserialize(TypedAggregateExpression.scala:271)
[info]  at org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate.merge(interfaces.scala:568)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$3(AggregationIterator.scala:199)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$1.$anonfun$applyOrElse$3$adapted(AggregationIterator.scala:199)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7(AggregationIterator.scala:213)
[info]  at org.apache.spark.sql.execution.aggregate.AggregationIterator.$anonfun$generateProcessRow$7$adapted(AggregationIterator.scala:207)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:151)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:77)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2(ObjectHashAggregateExec.scala:107)
[info]  at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$2$adapted(ObjectHashAggregateExec.scala:85)
[info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:885)
[info]  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:885)
[info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
[info]  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
[info]  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
[info]  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
[info]  at org.apache.spark.scheduler.Task.run(Task.scala:131)
[info]  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
[info]  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
[info]  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
[info]  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]  at java.lang.Thread.run(Thread.java:750)
[info] Caused by: java.lang.ClassNotFoundException: org.apache.spark.util.collection.OpenHashMap$mcJ$sp$$Lambda$13346/2134122295
[info]  at java.lang.Class.forName0(Native Method)
[info]  at java.lang.Class.forName(Class.java:348)
[info]  at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:154)
[info]  ... 36 more

我四处搜索,发现的唯一相关问题是这个未回答的具有相同问题的问题:Spark Kryo 序列化问题 https://stackoverflow.com/questions/69759477/spark-kryo-serialization-issue.

OpenHashMap未在我的代码中使用,似乎在此期间 KryoSerializer 存在错误Pipeline.fit()功能。有什么想法可以解决这个问题吗?谢谢!

编辑:我还尝试在单元测试期间删除 KryoSerializer 的使用:

spark = SparkSession
      .builder
      .master("local[*]")
      .appName("UnitTest")
      .config("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
      .config("spark.driver.bindAddress", "127.0.0.1")
      .getOrCreate()

确认我正在使用 JavaSerializer:println(spark.conf.get("spark.serializer"))输出org.apache.spark.serializer.JavaSerializer。然而,即使不使用 KryoSerializer,仍然存在同样的问题。


尝试更改sparkVersion。
版本也有同样的问题3.1.0
变成3.3.2

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 3 KryoSerializer 问题 - 无法找到类:org.apache.spark.util.collection.OpenHashMap 的相关文章

随机推荐

  • 获取 FTP 服务器上的文件大小并将其放在标签上

    我正在尝试获取托管在FTP服务器并将其放入Label而 BackgroundWorker 在后台工作 我在用着 Try 来获取该值 但是该值在第一次尝试时被捕获 下载后 如果我按尝试再次获取它 那么它就可以工作 Note 第一次尝试时进度条
  • glibc 堆一致性检查

    根据2008年的帖子 我现在找不到 glibc 堆检查 http www gnu org s libc manual html node Heap Consistency Checking html在多线程环境中不起作用 现在还是2010年
  • 叠加 SKScene 未显示

    我正在尝试将 SKScene 覆盖在 SCNScene 上 当我在模拟器和 iPhone6 上运行我的应用程序时 overlayScene SKScene 按预期显示 但是当我尝试在 iPhone5 上运行它 尝试了 2 个不同的设备 时
  • 如何将 ascii 值列表转换为 python 中的字符串?

    我在 Python 程序中有一个列表 其中包含一系列数字 这些数字本身就是 ASCII 值 如何将其转换为可以在屏幕上回显的 常规 字符串 您可能正在寻找 chr gt gt gt L 104 101 108 108 111 44 32 1
  • Firebase ref.removeAllObservers() 是否也会递归删除子观察者?

    我看到了一些与此相关的问题 但没有一个真正证实了我的疑问 If I removeAllObservers 在父节点上 这是否也会递归地删除可能已附加在所有子节点和子节点的子节点等处的所有其他观察者 递归地 API 文档为removeAllO
  • 稍后解决承诺

    我想构建一个 Promise 但将解决方案推迟到以后 下面的代码创建了一个承诺 但它立即得到解决 我如何控制承诺何时被评估 var p new Promise resolve reject gt resolve 1 then p1 gt c
  • Doctrine EntityManager 清除嵌套实体中的方法

    我想用学说批量插入处理 http doctrine orm readthedocs org en latest reference batch processing html为了优化大量实体的插入 问题出在 Clear 方法上 它表示此方法
  • 在 Visual Studio 2010 中从 Fortran 调用 C++ 函数

    我想从 Fortran 调用 C 函数 为此 我在 Visual Studio 2010 中创建了一个 FORTRAN 项目 之后 我将一个 Cpp 项目添加到该 FORTRAN 项目中 当我要构建程序时出现以下错误 Error 1 unr
  • Javascript 数组到 VBScript

    我有一个使用 Javascript 构建的对象数组 我需要使用 VBScript 读取它 如下例所示 我找不到在 VbScript 代码中循环遍历数组的方法myArray object 这个例子是我的问题的简化 我无法更改页面的默认语言 这
  • 打印 OSGI 包类路径?

    在普通的 java 应用程序中 可以使用以下命令打印类路径的内容 String ss System getProperty java class path System out println ss 但是如何打印使用 eclipse PDE
  • cometd 和 jetty 的问题 6 / 7

    我正在尝试开始使用 cometd http cometd org http cometd org 和码头 6 或 7 但我似乎遇到了问题 我有一个 ant 脚本 它将我的代码打包成与 cometd 1 1 1 二进制文件和 jetty 二进
  • 如何在 Android 中从 WorkManager 取消工作?

    我已经保存了 WorkManagerUUID转换成String在领域数据库中 这是代码 Constraints constraints new Constraints Builder setRequiredNetworkType Netwo
  • 如何使用tampermonkey模拟react应用程序中的点击?

    我正在尝试使用 Tampermonkey 脚本模拟对 React 元素的点击 不幸的是 由于 React 有自己的影子 DOM 所以天真的方法使用document querySelector 不工作 我遇到了一些需要修改 React 组件本
  • 您使用什么物理 Android 设备进行测试?

    有什么好的推荐用于测试目的的物理 Android 设备吗 我正在苹果阵营寻找像 iPod touch 这样的设备 可以帮助 iOS 开发人员测试他们的东西 我知道有 Nexus One 但那东西相当昂贵 而且我并不真正关心手机的东西 而是可
  • 如何将 GAE 中一种 Kind 中的所有实体复制到另一种 Kind 中,而无需显式调用每个属性

    我们如何使用function clone entity 如中所述在 Python 中复制 Google App Engine 数据存储中的实体 而无需在 编译 时知道属性名称 https stackoverflow com question
  • pandas - 包含时间序列数据的堆积条形图

    我正在尝试使用时间序列数据在 pandas 中创建堆积条形图 DATE TYPE VOL 0 2010 01 01 Heavy 932 612903 1 2010 01 01 Light 370 612903 2 2010 01 01 Me
  • Pandas 组合不同索引的数据帧

    我有两个数据框df 1 and df 2具有不同的索引和列 但是 有一些索引和列重叠 我创建了一个数据框df索引和列的并集 因此不存在重复的索引或列 我想填写数据框df通过以下方式 for x in df index for y in df
  • qdbusxml2cpp 未知类型

    在使用 qdbusxml2cpp 程序将以下 xml 转换为 Qt 类时 我收到此错误 qdbusxml2cpp c ObjectManager a ObjectManager ObjectManager cpp xml object ma
  • CURL 中的 data-urlencode 是什么意思?

    我搜索了很多个小时试图弄清楚 php curl 中的 data urlencode 是什么 我尝试过这个 但我认为这是不对的 xmlpost object1 file https www lob com goblue pdf 在文档中是 d
  • Spark 3 KryoSerializer 问题 - 无法找到类:org.apache.spark.util.collection.OpenHashMap

    我正在将 Spark 2 4 项目升级到 Spark 3 x 我们遇到了一些现有 Spark ml 代码的问题 var stringIndexers Array StringIndexer for featureColumn lt FEAT