Spark 任务无法使用滞后窗口函数进行序列化

2024-02-09

我注意到,在 DataFrame 上使用 Window 函数后,如果我使用函数调用 map(),Spark 将返回“任务不可序列化”异常 这是我的代码:

val hc:org.apache.spark.sql.hive.HiveContext =
    new org.apache.spark.sql.hive.HiveContext(sc)

import hc.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

def f() : String = "test"
case class P(name: String, surname: String)
val lag_result: org.apache.spark.sql.Column = 
    lag($"name",1).over(Window.partitionBy($"surname"))
val lista: List[P] = List(P("N1","S1"), P("N2","S2"), P("N2","S2"))
val data_frame: org.apache.spark.sql.DataFrame = 
    hc.createDataFrame(sc.parallelize(lista))

df.withColumn("lag_result", lag_result).map(x => f)

// This works
// df.withColumn("lag_result", lag_result).map{ case x =>
//     def f():String = "test";f}.collect

这是堆栈跟踪:

org.apache.spark.SparkException:任务无法序列化 org.apache.spark.util.ClosureCleaner$.ensureSerialized(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 在 org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 处 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) 在 org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) 在... 以及更多原因:java.io.NotSerializedException: org.apache.spark.sql.Column 序列化堆栈:

  • 对象不可序列化(类:org.apache.spark.sql.Column,值:'lag(name,1,null)windowspecdefinition(surname,UnspecifiedFrame))
  • 字段(类:$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, 名称:lag_result,类型:class org.apache.spark.sql.Column) ... 和 更多的

lag回报o.a.s.sql.Column这是不可序列化的。同样的事情也适用于WindowSpec。在交互模式下,这些对象可以作为闭包的一部分包含在内map:

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val df = Seq(("foo", 1), ("bar", 2)).toDF("x", "y")
df: org.apache.spark.sql.DataFrame = [x: string, y: int]

scala> val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@307a0097

scala> val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)

scala> def f(x: Any) = x.toString
f: (x: Any)String

scala> df.select(lag_y).map(f _).first
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
...
Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@307a0097)

一个简单的解决方案是将两者标记为瞬态:

scala> @transient val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7dda1470

scala> @transient val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)

scala> df.select(lag_y).map(f _).first
res1: String = [null]     
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 任务无法使用滞后窗口函数进行序列化 的相关文章

  • 使用 Pyspark 从 S3 读取时,内容长度分隔消息正文过早结束 SparkException

    我正在使用下面的代码来阅读S3 csv 文件从我的本地机器 from pyspark import SparkConf SparkContext from pyspark sql import SparkSession import con
  • Spark Dataframe 中的分析

    在这个问题中 我们有两个经理 M1 和 M2 在经理 M1 的团队中有两个员工 e1 和 e2 在 M2 的团队中有两个员工 e4 和 e5 以下是经理和员工的层次结构 1 M1 a e1 b e2 2 M2 a e4 b e5 我们有以下
  • Spark Dataframe/Parquet 中的枚举等效项

    我有一个包含数亿行的表 我想将其存储在 Spark 的数据帧中并作为 parquet 文件持久保存到磁盘 我的 Parquet 文件的大小现在超过 2TB 我想确保我已经对此进行了优化 这些列中很大一部分是字符串值 它们可能很长 但值通常也
  • 如何捕获反序列化异常?

    PHP 是否可以在以下情况下捕获异常 unserialize 产生错误 一个简单的方法是 ret unserialize foo if ret null Error case 但这不是最现代的解决方案 最好的方法是像前面提到的那样有一个自定
  • Akka Stream - 根据 Flow 中的元素选择 Sink

    我正在使用 Akka 流创建一个简单的消息传递服务 该服务就像邮件递送一样 其中来自源的元素包括destination and content like case class Message destination String conte
  • 通过 SSH 的 sbt (Scala) 结果找不到命令,但如果我自己这样做就可以工作

    所以我正在尝试做一些涉及跑步的事情sbt通过 SSH 命令 这就是我正在尝试的 ssh my username
  • Pyspark - 一次聚合数据帧的所有列[重复]

    这个问题在这里已经有答案了 我想将数据框分组到单个列上 然后对所有列应用聚合函数 例如 我有一个包含 10 列的 df 我希望对第一列 1 进行分组 然后对所有剩余列 均为数字 应用聚合函数 sum 与此等效的 R 是 summarise
  • 在 pyspark 中包装 java 函数

    我正在尝试创建一个用户定义的聚合函数 我可以从 python 调用它 我试图遵循答案this https stackoverflow com questions 33233737 spark how to map python with s
  • 如何不让 Gradle 立即退出 Scala 的 REPL?

    这些简单的线条在build gradle暴露一个repl理想情况下会启动 scala REPL 的任务 点燃并保持活力就是这样 repl 加载后 它立即收到 quit 命令并退出 的重要部分build gradle dependencies
  • 使用混淆的 Proto-buf 序列化

    我正在寻找一些有关使用带有混淆功能的 proto buf 网络 Dotfuscator 时发生的情况的指导 该项目的一半是 DLL 另一半是其他地方的 EXE 它们使用 proto buf NET 完美地交换数据 直到我混淆了 DLL 此时
  • 这种奇怪的 Scala 内存泄漏的原因是什么? [复制]

    这个问题在这里已经有答案了 即使有 7G 的堆空间 这也会耗尽内存 import scala collection mutable Set class Foo val anEmptySet Set Int Set def bar ints
  • 为 Apache Spark 示例运行 Cypher (CAPS)

    我知道这是一个广泛的问题 但这会对neo4j不属于某个领域的用户scala编程 我需要使用Apache Spark 项目的 Cypher https github com opencypher cypher for apache spark
  • 将spark.local.dir设置为不同的驱动器

    我正在尝试在 Windows 10 上设置独立 Spark 我想设置spark local dir to D spark tmp tmp 目前它似乎正在使用C Users
  • 具有定期更新的静态数据集的结构化流

    将流媒体与静态数据集合并是结构化流媒体的一个重要功能 但在每个批次中 数据集都会从数据源刷新 由于这些源并不总是那么动态 因此在指定的时间段 或批次数 内缓存静态数据集会提高性能 在指定的时间段 批次数之后 将从源重新加载数据集 否则从缓存
  • Scala 中的模式匹配是如何在字节码级别实现的?

    Scala 中的模式匹配是如何在字节码级别实现的 是不是像一系列if x instanceof Foo 构造 还是其他什么 它对性能有何影响 例如 给出以下代码 来自Scala 示例 http www scala lang org docu
  • 我们可以在 UDF 中使用关键字参数吗

    我的问题是我们可以像下面那样在 Pyspark 中使用关键字参数和 UDF 吗 conv 方法有一个关键字参数 conv type 默认情况下它被分配给特定类型的格式化程序 但是我想在某些地方指定不同的格式 由于关键字参数 这在 udf 中
  • 什么时候有2.13的sbt版本?

    我想开发一个sbt插件其依赖项仅适用于斯卡拉2 13 我发现https github com sbt sbt issues 5032 https github com sbt sbt issues 5032这个列表 SBT 0 x 仅在 S
  • 发送 FakeRequest 时如何为 akka.stream.Materializer 提供隐式值?

    我正在尝试理解下面看到的错误 并学习如何修复它 could not find implicit value for parameter materializer akka Stream Materializer val fut Future
  • Spark LDA 困境 - 预测和 OOM 问题

    我正在评估 Spark 1 6 0 来构建大型 数百万个文档 数百万个特征 数千个主题 LDA 模型并进行预测 这是我可以使用 Yahoo 轻松完成的任务 LDA 从小处开始 按照 Java 示例 我使用分布式模型 EM 优化器构建了 10
  • 带有泛型参数的抽象类的 JsonFormat

    我正在尝试为具有通用参数的抽象类编写 JsonFormat 如下所示 abstract class Animal A def data A def otherStuff String stuff case class CatData cat

随机推荐

  • 进程是否在远程计算机上运行?

    我有三台远程连接的远程电脑 我正在尝试编写一个简单的 Windows 应用程序 该应用程序将在单个窗口中显示特定进程是否在任意一台机器上运行 例如 Server1 Chrome 未运行 Server2 Chrome 正在运行 Server3
  • 为 Android 创建 PDU

    我目前正在编写和应用程序 即发送 接收短信 出于单元测试的目的 我需要以编程方式创建 PDU 解码非常简单 Bundle bundle intent getExtras if bundle null Get all messages con
  • 有人尝试过用Z3本身来证明Z3吗?

    有没有人尝试证明Z3 http research microsoft com en us um redmond projects z3 与Z3本身 是否有可能使用 Z3 来证明 Z3 是正确的 更理论化的是 是否有可能使用 X 本身来证明工
  • 模型不会在 ng-if 内更新

    我在角度应用程序中遇到了奇怪的行为 我不知道这是一个错误还是已知的限制 use strict var ctrl function scope scope foo false div foo foo div style background
  • 使用另一个表中的数据创建 SQL 表

    如何使用另一个表 表的副本 中已存在的数据创建表 复制表的最便携方法是 使用 CREATE TABLE 语句创建新表 基于旧表中的 SELECT 使用 INSERT INSERT INTO new table SELECT FROM old
  • 以类似于 Windows 的 MessageBox() 的方式使用 UIAlertView?

    我是 iPhone 新手 我希望能够以类似于 Windows 的方式使用 UIAlertViewMessageBox or the MessageDlg in Delphi 例如 我有一个方法需要询问用户对某件事的确认 并根据他们的响应继续
  • 仅返回与 Solr 匹配足够 NGram 的结果

    为了使用 Solr 实现某种程度的容错 我开始使用NGramFilterFactory 以下是来自schema xml
  • RegAsm.exe 和 regsvr32 有什么区别?如何使用regsvr32生成tlb文件?

    谁能告诉我 regsvr32 和 RegAsm 之间有什么区别 我的 Dll 是 C 语言 那么如何将类导入到 C 中 regsvr32将加载库并尝试调用DllRegisterServer 从那个图书馆 它不在乎什么DllRegisterS
  • PostgreSQL 与 SQL Server NVARCHAR 等效的是什么?

    如果 Microsoft SQL Server 数据库中有 NVARCHAR 或 NTEXT 数据类型的字段 那么 PostgreSQL 数据库中的等效数据类型是什么 我很确定 postgres varchar 与 Oracle Sybas
  • 基本字符串输入

    我刚刚遇到了这段代码 它允许用户在命令提示符中输入字符串 我知道他们所做的一切 这一切都很棒 但我有一个关于 cin 和 getline 函数的问题 string name cout lt lt Please enter your full
  • 如何创建这样的渐变进度指示器?

    我注意到基本的CircularProgressIndicator小部件有很少的参数来定制它 我想要达到像 gif 上那样的结果 不幸的是 我的知识不足以从头开始创建这样一个指标 在 pub dev 上搜索没有带来任何结果 Make a Cu
  • PHP 返回错误 500,但没有记录任何内容

    当我有一个 php 应用程序返回内部服务器错误 500 但错误日志中没有显示任何内容时 我遇到了问题 现在我知道我正在尝试运行的内容有错误 我知道我丢失了一些文件以及什么没有 但是 apache 错误日志中应该显示一些内容 否则我应该如何确
  • 使用 opencv 3.0 的 cv2 中的 KNN train()

    我正在尝试使用 cv2 python 2 7 和 opencv 3 0 运行 k 最近邻 我使用类似的代码复制了相同的错误消息http docs opencv org 3 0 beta doc py tutorials py ml py k
  • 使用 AWS SimpleDB 从 Java Servlet 创建域

    我是一名大学生 正在从事一个研究项目 该项目涉及将 Web 应用程序迁移到亚马逊云 上学期我一直在研究 servlet 并且能够使用 Tomcat 中运行的 java servlet 来实现应用程序的大部分功能 我现在正在尝试集成 Amaz
  • 将 TYPO3 extbase storagePageIds / storagePid 设置为当前

    我正在使用 TYPO3 扩展 feupload 它依赖于 extbase 这是我第一次接触extbase 但问题是关于 extbase 的 我希望 TYPO3 在查询中默认包含通常的 IN 当前页面 pid 检查 除非另有说明 但在 ext
  • AngularJS 上传文件并将其发送到数据库

    我一直在努力得到ng文件上传 https github com danialfarid ng file upload工作 以便我可以上传图像并将它们发送到数据库 在我的例子中是mongoLab http mongolab com接受 JSO
  • vue-devtools 总是被 nuxt.js 禁用

    我正在使用 nuxt js 创建一个新项目v2 3 0 当我跑步时npm run dev在我的 IDE 控制台中 一切都可以正确编译 但是当我转到该页面时 出现以下错误 Nuxt js Vue js is detected on this
  • Python - 检查两个单词是否在字符串中

    我想检查Python数组的每个元素中是否有两个单词 汽车 和 摩托车 我知道如何检查一个单词in但不知道如何用两个词来做 非常感谢任何帮助 两字解决方案 for string in array if car in string and mo
  • 如何在 SharePoint 2010 中获取服务应用程序权限

    在 SharePoint 2010 中读取特定服务应用程序的帐户权限的最佳 最简单方法是什么 目前我一直在搞乱 var solution SPFarm Local Solutions Service App Name wsp var sol
  • Spark 任务无法使用滞后窗口函数进行序列化

    我注意到 在 DataFrame 上使用 Window 函数后 如果我使用函数调用 map Spark 将返回 任务不可序列化 异常 这是我的代码 val hc org apache spark sql hive HiveContext n