自定义类型编码后无法操作? Spark数据集

2023-12-08

假设你有这个(编码自定义类型的解决方案来自这个线程):

// assume we handle custom type
class MyObj(val i: Int, val j: String)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
val ds = spark.createDataset(Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c")))

什么时候做一个ds.show, I got:

+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

据我所知,这是因为内容被编码为内部 Spark SQL 二进制表示形式。但是如何才能像这样显示解码后的内容呢?

+---+---+
| _1| _2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+

UPDATE1

显示内容并不是最大的问题,更重要的是它可能会导致处理数据集时出现问题,请考虑以下示例:

// continue with the above code
val ds2 = spark.createDataset(Seq(new MyObj(2, "a"),new MyObj(6, "b"),new MyObj(5, "c"))) 

ds.joinWith(ds2, ds("i") === ds2("i"), "inner") 
// this gives a Runtime error: org.apache.spark.sql.AnalysisException: Cannot resolve column name "i" among (value); 

意思是不是,kryo- 编码类型无法执行类似操作joinWith方便吗?

我们如何处理自定义类型Dataset then?
如果我们在编码后无法对其进行处理,那么这样做还有什么意义呢?kryo自定义类型的编码解决方案?!

(下面由 @jacek 提供的解决方案很值得了解case class输入,但仍然无法解码定制类型)


以下内容对我有用,但似乎使用高级 API 来执行低级(反序列化)工作。

这并不是说应该这样做,而是表明这是可能的。

我不知道为什么 KryoDeserializer 不将字节反序列化为字节来自的对象。就是这样。

你的类定义和我的类定义之间的一个主要区别是case这让我可以使用以下技巧。同样,不知道为什么它能够实现。

scala> println(spark.version)
3.0.1

// Note that case keyword
case class MyObj(val i: Int, val j: String)
import org.apache.spark.sql.Encoders
implicit val myObjEncoder = Encoders.kryo[MyObj]
// myObjEncoder: org.apache.spark.sql.Encoder[MyObj] = class[value[0]: binary]

val ds = (Seq(new MyObj(1, "a"),new MyObj(2, "b"),new MyObj(3, "c"))).toDS
// the Kryo deserializer gives bytes
scala> ds.printSchema
root
 |-- value: binary (nullable = true)

scala> :type sc
org.apache.spark.SparkContext

// Let's deserialize the bytes into an object
import org.apache.spark.serializer.KryoSerializer
val ks = new KryoSerializer(sc.getConf)
// that begs for a generic UDF
val deserMyObj = udf { value: Array[Byte] => 
  import java.nio.ByteBuffer
  ks.newInstance.deserialize(ByteBuffer.wrap(value)).asInstanceOf[MyObj] }

val solution = ds.select(deserMyObj('value) as "result").select($"result.*")
scala> solution.show
+---+---+
|  i|  j|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

自定义类型编码后无法操作? Spark数据集 的相关文章

随机推荐

  • 不要在提交表单时重定向

    我有一个简单的基于 HTML 的表单 如下所示 它会在单击时不断刷新页面
  • 我可以将跨站点 标记的数据作为 blob 获取吗?

    我正在尝试将网页链接到的几个图像保存到离线存储中 我在 Firefox 上使用 IndexedDB 在 Chrome 上使用 FileSystem API 我的代码实际上是一个扩展 因此在 Firefox 上我在 Greasemonkey
  • Mangle dll 使用 DEF 文件导出名称

    我正在尝试创建一个代理 dll 并遇到了这个问题 假设我有以下文件 a cpp extern C int declspec dllexport func int x return x a def EXPORTS func 4Test QAE
  • 使用Scrapy爬取本地XML文件-Start URL本地文件地址

    我想用 scrapy 抓取我位于下载文件夹中的本地 xml 文件 使用 xpath 提取相关信息 使用 scrapy 介绍作为guide 2016 01 24 12 38 53 scrapy DEBUG Retrying
  • 数据成员 int 引用

    我试图通过引用将 int 存储为类的数据成员 我预计对象会通过引用获取 int 而不是如果我从外部增加引用 则会增加对象内部的值 class A private int x public A int y x y cout lt lt A s
  • LLVM insertvalue 优化不佳?

    当我发出 LLVM 代码时 是否应该避免将 insertvalue 指令与加载和存储结合使用 当我使用它时 我总是得到糟糕的优化本机代码 看下面的例子 ModuleID mod target datalayout e p 64 64 64
  • 如何编码 WAV 以使用 SIPp 播放

    通过观察另一个SIPp相关问题我了解到现在可以使用rtp stream action 我尝试了几个不同的 WAV 文件 但没有成功 我听到的只是一些噪音 而不是预期的声音 在上述问题的一条评论中 有一条简单的指令将 WAV 文件转换为兼容格
  • SQL Server表默认是排序的

    我有一个简单的 SSIS 包 可以将平面文件中的数据导入到 SQL Server 表 SQL Server 005 中 文件包含 70k 行 表没有主键 导入成功 但是当我打开 SQL Server 表时 行的顺序与文件的顺序不同 仔细观察
  • SwiftUI TabView 在添加/删除 CoreData 元素期间给出错误消息

    我目前正在尝试将 TabView 与 CoreData 一起用于轮播视图 PageTabViewStyle 当我按如下顺序添加新页面时 不会发生错误 age name 3 page name 4 page name 5 但是 如果我将数字放
  • 如何从 Swift 中的时间服务器获取当前时区的当前日期?

    我正在开发应用程序 我想根据用户当前时区获取当前日期 即使用户从设备设置菜单更改日期后也是如此 获取我使用的当前时区 let timeZone TimeZone current print timeZone 例如 这里我得到 亚洲 加尔各答
  • 在 Java 中连接 WAV 文件

    这是我的代码 它连接四个 wav 文件并生成 wavAppend wav 这个串联的文件可以在 Windows Media Player 中很好地播放 但通过PlaySound类 只能听到one wav 有人可以帮忙吗 class Play
  • JSON Scraping - 通过 Javascript 将军事时间转换为标准时间

    我正在从 url 中抓取 JSON 数据 时间是军用时间 我想知道在客户端检索后是否有办法将其转换为标准时间 这是 JSON SaturdayClose 21 00 SaturdayOpen 10 00 SundayClose 12 00
  • 如何向直方图添加边缘颜色

    在使用 seaborn 和 Jupyter 笔记本做一些练习问题时 我意识到 distplot 图表在各个 bin 上没有文档中所有示例图表所具有的较暗轮廓 我尝试使用 Pycharm 创建图表并注意到同样的事情 我认为这是一个seabor
  • gvim 病原体问题

    我从github下载了pathogen vim并将其放在 vim下的 autoload 目录中 然而现在当我启动 gvim 并执行 helptags 时 它说 需要参数 我的 vimrc 文件的内容是 call pathogen runti
  • 命中测试 SVG 形状?

    已经实现了 SVG 规范部分内容的浏览器 Firefox 等 免费为我们进行命中测试 如果我在 SVG 对象上附加 mousedown 侦听器 则每当单击该形状时我都会收到通知 这是令人惊奇的 特别是对于复杂的多边形形状 我想知道是否有一种
  • AvalonDock 停靠一个窗口

    我正在尝试将 WPF 中的应用程序转换为可以使用AvalonDock 我有几个窗口 大约 10 个 和主窗体DockingManager 我想把那些窗户放在里面DockingManager 我试过这个
  • 无法在 Kotlin 中替换字符串内的字符串

    我正在尝试替换字符串中的一些子字符串 但我的代码似乎不起作用 val listOfMaleWords listOf him he his val listOfFemaleWords listOf her she her fun modify
  • 如何从Python扩展模块的C代码调用内置函数(或方法)?

    我目前想要完成的是调整Pythonitertools模块功能combinations对通过的进行排序iterable在创建组合之前 目的是对返回的组合进行排序 我是第一次开发 Python 扩展模块 到目前为止我唯一的经验是编写和编译一个像
  • 许多二进制文件同步

    我的办公服务器上有大约 100 000 个文件 图像 pdf 等 文件数量每天都会增加大约 100 500 个项目 并且大约有 20 50 个旧文件发生更改 将 Web 服务器与这些文件同步的最佳方法是什么 像 Mercurial GIT
  • 自定义类型编码后无法操作? Spark数据集

    假设你有这个 编码自定义类型的解决方案来自这个线程 assume we handle custom type class MyObj val i Int val j String implicit val myObjEncoder org