使用 UDF 的 DataFrame 给出任务不可序列化异常

2024-04-16

尝试在数据帧上使用 show() 方法。它给出了任务不可序列化异常。

我尝试扩展可序列化对象,但错误仍然存​​在。

object App extends Serializable{
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache").setLevel(Level.WARN);

    val spark = SparkSession.builder()
      .appName("LearningSpark")
      .master("local[*]")
      .getOrCreate()
    val sc = spark.sparkContext
    val inputPath = "./src/resources/2015-03-01-0.json"
    val ghLog = spark.read.json(inputPath)
    val pushes = ghLog.filter("type = 'PushEvent'")
    val grouped = pushes.groupBy("actor.login").count
    val ordered = grouped.orderBy(grouped("count").desc)
    ordered.show(5)
    val empPath = "./src/resources/ghEmployees.txt"
    val employees = Set() ++ (
      for {
        line <- fromFile(empPath).getLines
      } yield line.trim)
    val bcEmployees = sc.broadcast(employees)
    import spark.implicits._
    val isEmp = user => bcEmployees.value.contains(user)
    val isEmployee = spark.udf.register("SetContainsUdf", isEmp)
    val filtered = ordered.filter(isEmployee($"login"))
    filtered.show()
  }
}

使用 Spark 的默认 log4j 配置文件:

org/apache/spark/log4j-defaults.properties
19/09/01 10:21:48 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:850)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:151)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
    at App$.main(App.scala:33)
    at App.main(App.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
    - object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
    - element of array (index: 2)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.catalyst.expressions.ScalaUDF, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/catalyst/expressions/ScalaUDF.$anonfun$f$2:(Lscala/Function1;Lorg/apache/spark/sql/catalyst/expressions/Expression;Lscala/runtime/LazyRef;Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/catalyst/InternalRow;)Ljava/lang/Object;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2364/2031154005, org.apache.spark.sql.catalyst.expressions.ScalaUDF$$Lambda$2364/2031154005@1fd37440)
    - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF:SetContainsUdf(actor#6.login))
    - writeObject data (class: scala.collection.immutable.List$SerializationProxy)
    - object (class scala.collection.immutable.List$SerializationProxy, scala.collection.immutable.List$SerializationProxy@3b65084e)
    - writeReplace data (class: scala.collection.immutable.List$SerializationProxy)
    - object (class scala.collection.immutable.$colon$colon, List(isnotnull(type#13), (type#13 = PushEvent), UDF:SetContainsUdf(actor#6.login)))
    - field (class: org.apache.spark.sql.execution.FileSourceScanExec, name: dataFilters, type: interface scala.collection.Seq)
    - object (class org.apache.spark.sql.execution.FileSourceScanExec, FileScan json [actor#6,type#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
    - field (class: org.apache.spark.sql.execution.FilterExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
    - object (class org.apache.spark.sql.execution.FilterExec, Filter ((isnotnull(type#13) && (type#13 = PushEvent)) && UDF:SetContainsUdf(actor#6.login))
+- FileScan json [actor#6,type#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
    - field (class: org.apache.spark.sql.execution.ProjectExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
    - object (class org.apache.spark.sql.execution.ProjectExec, Project [actor#6]
+- Filter ((isnotnull(type#13) && (type#13 = PushEvent)) && UDF:SetContainsUdf(actor#6.login))
   +- FileScan json [actor#6,type#13] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
    - field (class: org.apache.spark.sql.execution.aggregate.HashAggregateExec, name: child, type: class org.apache.spark.sql.execution.SparkPlan)
    - object (class org.apache.spark.sql.execution.aggregate.HashAggregateExec, HashAggregate(keys=[actor#6.login AS actor#6.login#53], functions=[partial_count(1)], output=[actor#6.login#53, count#43L])
+- Project [actor#6]
   +- Filter ((isnotnull(type#13) && (type#13 = PushEvent)) && UDF:SetContainsUdf(actor#6.login))
      +- FileScan json [actor#6,type#13] Batched:+------------------+-----+
|             login|count|
+------------------+-----+
|      greatfirebot|  192|
|diversify-exp-user|  146|
|     KenanSulayman|   72|
|        manuelrp07|   45|
|    mirror-updates|   42|
+------------------+-----+
only showing top 5 rows

 false, Format: JSON, Location: InMemoryFileIndex[file:/C:/Users/abhaydub/Scala-Spark-workspace/LearningSpark/src/resources/2015-..., PartitionFilters: [], PushedFilters: [IsNotNull(type), EqualTo(type,PushEvent)], ReadSchema: struct<actor:struct<avatar_url:string,gravatar_id:string,id:bigint,login:string,url:string>,type:...
)
    - element of array (index: 0)
    - array (class [Ljava.lang.Object;, size 14)
    - element of array (index: 1)
    - array (class [Ljava.lang.Object;, size 3)
    - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
    - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3])
    - writeReplace data (class: java.lang.invoke.SerializedLambda)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1297/815648243, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$1297/815648243@27438750)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 48 more

我有 Spark 2.4.4 和 Scala“2.12.1”。我遇到了同样的问题(对象不可序列化(类:scala.runtime.LazyRef,值:LazyRef thunk)),这让我发疯。我将 Scala 版本更改为“2.12.10”,现在问题已解决!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 UDF 的 DataFrame 给出任务不可序列化异常 的相关文章

  • 减少/折叠幺半群列表,但减少器返回任一

    我发现自己遇到过几次这样的情况 我有一个减速器 组合 fn 如下所示 def combiner a String b String Either String String a b asRight String 它是一个虚拟实现 但 fn
  • 在 Akka 中配置嵌套 Router

    我有一些嵌套的路由器 应创建它FromConfig 我想要的是这样的 test akka actor deployment worker router round robin nr of instances 5 slave router b
  • 如何将模型结果保存到文本文件?

    我正在尝试将从模型生成的频繁项集保存到文本文件中 该代码是 Spark ML 库中 FPGrowth 示例的示例 Using saveAsTextFile直接在模型上写入 RDD 位置而不是实际值 import org apache spa
  • 多个 scala 库导致 intellij 出错?

    我正在使用 intellij 14 和 scala 2 11 6 使用 homebrew 安装并使用符号链接 ln s usr local Cellar scala 2 11 6 libexec src usr local Cellar s
  • Scala 宏的位置怎么了?

    我试图获取宏参数的原始输入字符串 但返回的位置似乎有点偏离 考虑这个宏 例如 object M import scala reflect macros Context import language experimental macros
  • 从 HList 获取元素

    我尝试了 HList 并按预期进行了以下工作 val hl 1 foo HNil val i Int hl 0 val s String hl 1 但是 我无法让以下代码正常工作 让我们暂时假设对列表进行随机访问是一个聪明的主意 class
  • 高效序列化案例类

    对于我正在工作的图书馆 我需要提供一个高效 便捷 typesafe序列化 scala 类的方法 理想的情况是用户可以创建一个案例类 并且只要所有成员都是可序列化的 它似乎也应该如此 我准确地知道序列化和反序列化阶段的类型 因此不需要 也不能
  • 可选择将项目添加到 Scala 映射

    我正在寻找这个问题的惯用解决方案 我正在构建一个valScala 不可变 Map 并希望有选择地添加一项或多项 val aMap Map key1 gt value1 key2 gt value2 if condition key3 gt
  • 对两种类型之间的二元关系进行建模

    有企业 也有人 用户可以对某个企业点赞或发表评论 但效果是一样的can not发生在一个人身上 当用户发布有关某个企业的内容或对其点赞时 该企业就被称为target喜欢或帖子 trait TargetingRelation Targetin
  • 使用 net.liftweb.json 或 scala.util.parsing.json 解析大型 (30MB) JSON 文件会出现 OutOfMemoryException。有什么建议吗?

    我有一个包含大量测试数据的 JSON 文件 我想解析这些数据并推送我正在测试的算法 它的大小约为 30MB 包含大约 60 000 个元素的列表 我最初在 scala util parsing json 中尝试了简单的解析器 如下所示 im
  • 如何在映射中将字符串转换为 Seq[String]

    我有一个Map String String 以及需要的第三方功能Map String Seq String 有没有一种简单的方法来转换它 以便我可以将地图传递给函数 original mapValues Seq 注意mapValues返回地
  • Scala中有类似Java Stream的“peek”操作吗?

    在Java中你可以调用peek x gt println x 在 Stream 上 它将对每个元素执行操作并返回原始流 这与 foreach 不同 foreach 是 Unit Scala 中是否有类似的东西 最好是适用于所有 Monady
  • Spark DataFrame 序列化为无效 json

    TL DR 当我倾倒 Spark 时DataFrame作为 json 我总是得到类似的结果 key1 v11 key2 v21 key1 v12 key2 v22 key1 v13 key2 v23 这是无效的 json 我可以手动编辑转储
  • Akka-Streams 收集数据(Source -> Flow -> Flow (collect) -> Sink)

    我对 Scala 和 Akka 完全陌生 我有一个简单的 RunnableFlow Source gt Flow do some transformation gt Sink runForeach 现在我想要这样的东西 Source gt
  • 如何设置SPARK_HOME变量?

    按照链接中的气泡水步骤进行操作http h2o release s3 amazonaws com sparkling water rel 2 2 0 index html http h2o release s3 amazonaws com
  • 如何在 scala repl 和 sbt 控制台中关闭/打开 typer 阶段

    是否可以在不退出当前会话的情况下切换阶段 我尝试进入 power 模式 但它仍然不打印类型 在SBT中只需添加以下设置 set scalacOptions in Compile console Xprint typer 在 REPL 中你可
  • 为什么自类型类可以声明类

    我知道 Scala 只能混合特征 这对于依赖注入和蛋糕模式是有意义的 我的问题是为什么我仍然可以声明一个需要另一个 类 但不需要特征的类 Code class C class D self C gt 这仍然编译成功 我认为它应该编译失败 因
  • Scala 使用的 Redis 客户端库建议

    我正在计划使用 Scala 中的 Redis 实例进行一些工作 并正在寻找有关使用哪些客户端库的建议 理想情况下 如果存在一个好的库 我希望有一个为 Scala 而不是 Java 设计的库 但如果现在这是更好的方法 那么仅使用 Java 客
  • 为什么这些类型参数不符合类型细化?

    为什么此 Scala 代码无法进行类型检查 trait T type A trait GenFoo A0 S lt T type A A0 trait Foo S lt T extends GenFoo S A S 我不明白为什么 类型参数
  • Spark scala 模拟 Spark.implicits 用于单元测试

    当尝试使用 Spark 和 Scala 简化单元测试时 我使用 scala test 和mockito scala 以及mockito Sugar 这只是让你做这样的事情 val sparkSessionMock mock SparkSes

随机推荐