KryoSerializer 找不到我的 SparkKryoRegistrator

2024-04-28

我在 Amazon emr-5.2.1 上以客户端模式使用 Spark 2.0.2。我使用 Kryo 序列化并在我们自己的 KryoRegistrator 中注册我们的类:

val sparkConf = new SparkConf()
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.kryo.registrator", classOf[de.gaf.ric.workflow.RicKryoRegistrator].getName)
    .set("spark.kryo.registrationRequired", "true")
    .set("spark.kryoserializer.buffer.max", "512m")
implicit val sc = new SparkContext(sparkConf)

该过程开始正常,但几分钟后,我在执行器上收到以下异常:

17/02/02 16:22:34 ERROR RetryingBlockFetcher: Failed to fetch block rdd_3641_12, and will not retry (0 retries)
java.lang.RuntimeException: org.apache.spark.SparkException: Failed to register classes with Kryo
    at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:129)
    at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:274)
    at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:259)
    at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:175)
    at org.apache.spark.serializer.SerializerManager.dataSerializeWithExplicitClassTag(SerializerManager.scala:141)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doGetLocalBytes(BlockManager.scala:499)
    at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:474)
    at org.apache.spark.storage.BlockManager$$anonfun$getLocalBytes$2.apply(BlockManager.scala:474)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.storage.BlockManager.getLocalBytes(BlockManager.scala:474)
    at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:280)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:60)
    at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:60)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:60)
    at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:159)
    at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:107)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:119)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: de.gaf.ric.workflow.RicKryoRegistrator
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:124)
    at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$5.apply(KryoSerializer.scala:124)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
    at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:124)
    ... 43 more

    at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:189)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:121)
    at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)

班上RicKryoRegistrator确实包含在我的 uber JAR 中,我仔细检查了这一点。它还被转移到执行者:

17/02/02 16:19:02 INFO Executor: Fetching spark://172.31.20.106:41032/jars/app-imageprocessing-0.13.0-20170202.112920-1.jar with timestamp 1486048690879
17/02/02 16:19:02 INFO TransportClientFactory: Successfully created connection to /172.31.20.106:41032 after 23 ms (0 ms spent in bootstraps)
17/02/02 16:19:02 INFO Utils: Fetching spark://172.31.20.106:41032/jars/app-imageprocessing-0.13.0-20170202.112920-1.jar to /mnt/yarn/usercache/hadoop/appcache/application_1486039395474_0012/spark-8172edd9-d1c7-40c9-ad9b-74b2bd9dbad9/fetchFileTemp6474512860106916303.tmp
17/02/02 16:19:03 INFO Utils: Copying /mnt/yarn/usercache/hadoop/appcache/application_1486039395474_0012/spark-8172edd9-d1c7-40c9-ad9b-74b2bd9dbad9/-68603321486048690879_cache to /mnt/yarn/usercache/hadoop/appcache/application_1486039395474_0012/container_1486039395474_0012_01_000011/./app-imageprocessing-0.13.0-20170202.112920-1.jar
17/02/02 16:19:03 INFO Executor: Adding file:/mnt/yarn/usercache/hadoop/appcache/application_1486039395474_0012/container_1486039395474_0012_01_000011/./app-imageprocessing-0.13.0-20170202.112920-1.jar to class loader

阅读源码org.apache.spark.serializer.KryoSerializer,我看到它使用以下类加载器:

val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader)

难道是defaultClassLoader未设置,并且我的 uber JAR 未包含在Thread.currentThread.getContextClassLoader?还有什么原因呢?


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

KryoSerializer 找不到我的 SparkKryoRegistrator 的相关文章

  • Spark:有没有办法打印出spark-shell和spark的类路径?

    我可以在 Spark shell 中成功运行 Spark 作业 但是当它打包并通过 Spark submit 运行时 我收到 NoSuchMethodError 这向我表明类路径存在某种不匹配 有没有办法可以比较两个类路径 某种日志记录语句
  • Twitter API 与 Scala 2.12 一起使用

    我正在使用 Scala 2 12 使用 SBT 构建 构建 Spark 3 0 0 流应用程序 鉴于所有用于执行此操作的库均适用于 Scala EDIT 我尝试使用库构建时得到的示例输出 object twitter is not a me
  • Spark - java.lang.OutOfMemoryError:请求的数组大小超出 VM 限制

    我正在尝试对 Cloudera 的 Spark 2 1 0 中的数据帧进行 groupBy 操作 该集群位于总 RAM 约为 512GB 的 7 节点集群上 我的代码如下 ndf ndf repartition 20000 by user
  • 将案例类传递给函数参数

    抱歉问了一个简单的问题 我想将案例类传递给函数参数 并且想在函数内部进一步使用它 到目前为止我已经尝试过这个TypeTag and ClassTag但由于某种原因 我无法正确使用它 或者可能是我没有看到正确的位置 用例与此类似 case c
  • Spark:Aggregator和UDAF有什么区别?

    在Spark的文档中 Aggregator 抽象类聚合器 IN BUF OUT 扩展可序列化 用户定义聚合的基类 可以是 在数据集操作中用于获取组中的所有元素并 将它们减少到单个值 用户定义的聚合函数是 抽象类 UserDefinedAgg
  • Delta Lake 独立于 Apache Spark?

    我一直在探索数据湖屋概念和 Delta Lake 它的一些功能看起来真的很有趣 就在项目主页上https delta io https delta io 有一个图表显示 Delta Lake 运行在 您现有的数据湖 上 但没有提及 Spar
  • 计算行的排名

    我想根据一个字段对用户 ID 进行排名 对于相同的字段值 排名应该相同 该数据位于 Hive 表中 e g user value a 5 b 10 c 5 d 6 Rank a 1 c 1 d 3 b 4 我怎样才能做到这一点 可以使用ra
  • Spark 中的广播 Annoy 对象(对于最近邻居)?

    由于 Spark 的 mllib 没有最近邻居功能 我正在尝试使用Annoy https github com spotify annoy为近似最近邻 我尝试广播 Annoy 对象并将其传递给工人 然而 它并没有按预期运行 下面是可重复性的
  • 在 Spark Dataframe 中提取数组索引

    我有一个带有数组类型列的数据框 例如 val df List a Array 1d 2d 3d b Array 4d 5d 6d toDF ID DATA df org apache spark sql DataFrame ID strin
  • Spark 执行器 STDOUT 到 Kubernetes STDOUT

    我在 Spark Worker 中运行的 Spark 应用程序将执行程序日志输出到特定文件路径 worker home directory app xxxxxxxx 0 stdout I used log4j properties将日志从
  • 对于“迭代算法”,转换为 RDD 然后再转换回 Dataframe 有什么优势

    我在读高性能火花作者提出以下主张 虽然 Catalyst 优化器非常强大 但它目前遇到挑战的情况之一是非常大的查询计划 这些查询计划往往是迭代算法的结果 例如图算法或机器学习算法 一个简单的解决方法是将数据转换为 RDD 并在每次迭代结束时
  • Spark Scala:按小时或分钟计算两列的 DateDiff

    我在数据框中有两个时间戳列 我想获取它们的分钟差异 或者小时差异 目前我可以通过四舍五入获得日差 val df2 df1 withColumn time datediff df1 ts1 df1 ts2 但是 当我查看文档页面时https
  • Spark:并行转换多个数据帧

    了解如何在并行转换多个数据帧时实现最佳并行性 我有一系列路径 val paths Array path1 path2 我从每个路径加载数据帧 然后转换并写入目标路径 paths foreach path gt val df spark re
  • 通过过滤对 Pyspark Dataframe 进行分组

    我有一个数据框如下 cust id req req met 1 r1 1 1 r2 0 1 r2 1 2 r1 1 3 r1 1 3 r2 1 4 r1 0 5 r1 1 5 r2 0 5 r1 1 我必须观察客户 看看他们有多少要求 看看
  • 如何将模型结果保存到文本文件?

    我正在尝试将从模型生成的频繁项集保存到文本文件中 该代码是 Spark ML 库中 FPGrowth 示例的示例 Using saveAsTextFile直接在模型上写入 RDD 位置而不是实际值 import org apache spa
  • HashPartitioner 是如何工作的?

    我阅读了文档HashPartitioner http spark apache org docs 1 3 1 api java index html org apache spark HashPartitioner html 不幸的是 除了
  • 如何设置SPARK_HOME变量?

    按照链接中的气泡水步骤进行操作http h2o release s3 amazonaws com sparkling water rel 2 2 0 index html http h2o release s3 amazonaws com
  • Scala Spark 包含与不包含

    我可以使用 contains 过滤 RDD 中的元组 如下所示 但是使用 不包含 来过滤 RDD 又如何呢 val rdd2 rdd1 filter x gt x 1 contains 我找不到这个的语法 假设这是可能的并且我没有使用Dat
  • Spark 2.2 无法将 df 写入 parquet

    我正在构建一个聚类算法 我需要存储模型以供将来加载 我有一个具有以下架构的数据框 val schema new StructType add StructField uniqueId LongType add StructField tim
  • 从 pyspark.sql 中的列表创建数据框

    我完全陷入了有线的境地 现在我有一个清单li li example data map lambda x get labeled prediction w x collect print li type li 输出就像 0 0 59 0 0

随机推荐