在 Spark UDF 中操作数据帧

2023-12-04

我有一个 UDF,可以从数据帧中过滤和选择值,但它遇到“对象不可序列化”错误。详细信息如下。

假设我有一个数据框 df1 ,其中包含名称为(“ID”,“Y1”,“Y2”,“Y3”,“Y4”,“Y5”,“Y6”,“Y7”,“Y8”,“Y9”的列”,“Y10”)。我想根据另一个数据帧 df2 中匹配的“ID”和“值”对“Y”列的子集进行求和。我尝试了以下方法:

val y_list = ("Y1", "Y2", "Y3", "Y4", "Y5", "Y6", "Y7", "Y8", "Y9", "Y10").map(c => col(c))

def udf_test(ID: String, value: Int): Double = {
  df1.filter($"ID" === ID).select(y_list:_*).first.toSeq.toList.take(value).foldLeft(0.0)(_+_)
}
sqlContext.udf.register("udf_test", udf_test _)

val df_result = df2.withColumn("Result", callUDF("udf_test", $"ID", $"Value"))

这给了我以下形式的错误:

java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: Y1)

我查了一下,发现 Spark Column 是不可序列化的。我想知道:

1) 有什么方法可以在 UDF 中操作数据帧吗?

2)如果不是,实现上述操作类型的最佳方法是什么?我的真实案例比这更复杂。它要求我根据大数据帧中的某些列从多个小数据帧中选择值,然后计算回大数据帧的值。

我使用的是 Spark 1.6.3。谢谢!


您不能在 UDF 内使用数据集操作。 UDF 只能操作现有列并生成一个结果列。它不能过滤数据集或进行聚合,但可以在过滤器内部使用。 UDAF 还可以聚合值。

相反,您可以使用.as[SomeCaseClass]从 DataFrame 生成数据集,并在过滤器、映射、归约中使用普通的强类型函数。

编辑:如果你想将你的bigDF与smallDFs列表中的每个小DF一起加入,你可以这样做:

import org.apache.spark.sql.functions._
val bigDF = // some processing
val smallDFs = Seq(someSmallDF1, someSmallDF2)
val joined = smallDFs.foldLeft(bigDF)((acc, df) => acc.join(broadcast(df), "join_column"))

broadcast是给小DF添加Broadcast Hint的功能,这样小DF会使用更高效的Broadcast Join而不是Sort Merge Join

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Spark UDF 中操作数据帧 的相关文章

随机推荐

  • 在 MVC6 中创建自定义模型绑定器的正确方法是什么?

    我正在尝试按照中的步骤操作本文使用 vNext 项目和 mvc 6 我一直在阅读代码here但仍然有点不确定如何实现这一点 有没有人有一个可以分享的可行示例或为我指明正确的方向 我特别想知道如何注册自定义绑定器 以及我将从哪些类继承 因为
  • 具有不同数据类型答案的调查的 SQL 设计

    我正在开展一项在线调查 大多数问题的答案等级为 1 5 如果我们需要向调查添加问题 我会使用一个简单的 Web 表单 该表单会插入到相应的表中 瞧 调查提出了新问题 没有新代码或对数据库结构进行更改 我们被要求添加可以有不同数据类型答案的调
  • 如何保存Jenkins配置?

    有没有办法在 Git 或其他任何地方保存管道配置或项目配置 以便当我的 Jenkins 机器崩溃时 我可以将保存的配置迁移到新的 Jenkins 实例中 我会 作为开始 让自己 https wiki jenkins io display J
  • 有没有一个库可以模拟 facebook 的“Link Detect”? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心以获得指导 我正在寻找编写一个库 可以
  • 寻找有 2 个点的向量

    我正在帮助我的朋友玩 pygame 但我们被困住了 所以我们正在尝试获取射弹的方向 但我们不知道如何获取 例如 1 1 将走向东南 1 1 将走向 NE 1 1 将走向西北 和 1 1 将去 SW 我们需要某种方程来获取玩家位置和鼠标位置并
  • 从 Google Assistant 启动我的应用程序(如果有)

    要求是如果用户命令 GA 启动我的 Android 应用程序查找附近的麦当劳餐厅其中 McDonald 是应用程序名称 由于用户没有提及他 她想要在 McDonald 应用程序中进行搜索 因此应用程序无法使用定义的意图过滤器来处理明确的意图
  • NSTimer 不会失效

    我在使计时器失效时遇到问题 property nonatomic strong NSTimer timer 在成功的块内 我在主线程上分配和设置计时器 dispatch async dispatch get main queue self
  • Mongoose(或 MongoDB)中的 TransientTransactionError 是什么?

    I have server js and db js The db js文件使用 Mongoose 与我的数据库交互 我使用server js从中调用函数db js var mongoose require mongoose mongoos
  • Cassandra Datastax 驱动程序在访问器上设置分页状态

    我正在使用 Datastax 可爱的 cassandra java 驱动程序 我试图将所有查询字符串封装到内置访问器中以进行映射 但我需要能够设置查询的分页状态 我发现这可以通过普通的语句 SimpleStatement 实现 但我还没有找
  • 如何以Google方式隐藏库源代码?

    例如 我有一个library我想保护源代码不被查看 我想到的第一个方法是为私有函数创建公共包装器 如下所示 function executeMyCoolFunction param1 param2 param3 return execute
  • 如何在xslt中提取这种格式

    我有一个 xml 结构
  • Bash 故障排除:不是有效的标识符

    初学者试图让管道在 bash 中工作 如果有人能明白为什么当我运行以下命令时我会得到 bash i not a valid identifier 这真的很有帮助 另外如果还有其他错误请告诉我 for i in home regionstex
  • Postgres 9.4 Django 1.9 获取所有 json 键

    我在 django 模型中有一个 JSONField 如下所示 from django db import models from django contrib postgres fields import JSONField class
  • JBoss / HotSpot JVM 崩溃

    我们有一个基于 Jboss 构建的 Web 应用程序 已经投入生产多年 在过去的 18 个月里 它从未下降过 然而 最近两天 jvm崩溃了4次 当 JVM 崩溃时 我们会收到一份我正在努力解读的错误报告 我看过其他几个崩溃报告 在这个网站和
  • Keras 中的自定义损失函数

    我正在研究一种图像类增量分类器方法 使用 CNN 作为特征提取器和全连接块进行分类 首先 我对 VGG 训练网络进行了微调以完成新任务 一旦网络针对新任务进行了训练 我就会为每个类别存储一些示例 以避免忘记新类别何时可用 当某些类可用时 我
  • 从类路径资源文件夹获取文件列表? [复制]

    这个问题在这里已经有答案了 我正在尝试从资源文件夹设置 JFX ImageView 图像 但似乎无法获得不会引发异常的适当 URL 字符串文件路径 var x getRandomImageFromPackage pictures toStr
  • 声明通用项数组时,如何允许推断通用参数?

    我遇到的情况是 我有一系列通用项目 Item 并且在项目本身内 我希望推断和具体的通用参数 也就是说 我想要一组通用项目 但每个项目都可以有不同的通用类型 应该保留这一点 type Item
  • Python 套接字。 OSError: [Errno 9] 错误的文件描述符

    这是我的客户 CLIENT import socket conne socket socket socket AF INET socket SOCK STREAM conne setsockopt socket SOL SOCKET soc
  • 我在 addCase 中指定什么“类型”来返回列?

    我正在尝试使用 case 语句进行查询 但无法弄清楚如何让 case 返回列值而不是常量 我的查询工作得很好 除了我为结果提供的列名被 Cake 或者 PDO 引用或错误处理在我无法挖掘的层中的某处 我已经了解了bindValue 但是我一
  • 在 Spark UDF 中操作数据帧

    我有一个 UDF 可以从数据帧中过滤和选择值 但它遇到 对象不可序列化 错误 详细信息如下 假设我有一个数据框 df1 其中包含名称为 ID Y1 Y2 Y3 Y4 Y5 Y6 Y7 Y8 Y9 的列 Y10 我想根据另一个数据帧 df2