如何解决 DataSet.toJSON 与结构化流不兼容的问题

2024-01-10

我想将 Twitter 中的数据写入 Kafka。出于教育目的,我尝试使用结构化流来做到这一点。我创建了一个基于套接字源的 Twitter 源,它运行良好。

我按如下方式设置来源:

val tweets = spark
  .readStream
  .format("twitter")
  .option("query", terms)
  .load()
  .as[SparkTweet]

这为我提供了一个很好的用于分析查询的数据集。伟大的!

接下来,我想将略微 Sparkified 模式中的每条推文持久保存到 Kafka 中:

val kafkaOutStream = tweets
  .toJSON.as("value")
  .writeStream
  .queryName("stream_to_kafka")
  .outputMode(OutputMode.Append())
  .trigger(Trigger.ProcessingTime("1 second"))
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("topic","tweets")
  .start

这很容易!除此之外,它不起作用。在QueryExecution.scala呼叫传递到assertSupported最终被抛弃,因为

Exception in thread "main" org.apache.spark.sql.AnalysisException:
    Queries with streaming sources must be executed with writeStream.start();;

我没想到toJSON成为一个纯粹的批处理操作,但没有它,并使用 sayselect($"text" as "value")相反,该代码将起作用。

现在,我有点惊讶,希望有人能解释为什么 toJSON 不应该与流兼容(这是一个错误吗?缺少功能吗?),并告诉我是否有一种结构化流方式来获取序列化表示我的目标是卡夫卡。


这有点冗长,但是to_json函数应该可以解决问题:

import org.apache.spark.sql.functions.{to_json, struct, col}

tweets.select(to_json(struct(df.columns map col: _*)).alias("value"))
  .writeStream
  ...

问题在于toJSON似乎是转换为 RDD https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2738:

val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
  ...

和(正如所指出的maasg https://stackoverflow.com/users/764040/maasg in 评论 https://stackoverflow.com/questions/45614364/sparks-dataset-tojson-is-not-structuredstreaming-compatible-how-to-convert-a-d/45614984#comment78190232_45614984)似乎已经在开发版本中解决了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何解决 DataSet.toJSON 与结构化流不兼容的问题 的相关文章

随机推荐

  • Swift 仅针对某些错误类型组合重试

    我有一个自定义管道 我想对一些可恢复的错误代码进行 3 次重试 并且我想为可恢复的错误添加一些短暂的延迟 有人知道我该怎么做吗 func createRequest for message Message gt AnyPublisher
  • 编译期间未包含在目标中的 .h 文件会发生什么情况?

    我有一个 Common h 文件 其中存储了在我的项目中重复使用的所有字符串 namespace Common static const std string mystring IamAwesum 因此 在任何需要特定字符串的文件中 我都包
  • 哪些 std::async 实现使用线程池?

    使用的优点之一std async而不是手动创建std thread对象应该是std async可以在幕后使用线程池来避免超额订阅问题 但是哪些实现可以做到这一点呢 我的理解是微软的实现确实如此 但是其他的呢 async实施 Gnu 的 li
  • 使用 MapReduce 实施 PageRank

    我正在尝试解决使用 MapReduce 实现 PageRank 的理论问题 我有以下具有三个节点的简单场景 A B C 邻接矩阵在这里 A B C B A 例如 B 的 PageRank 等于 1 d N d PR A C A N numb
  • Matlab 中打印函数的 Ghostscript 错误

    我正在尝试使用 Matlab 保存图像print功能 myImage magic 500 myFigure figure visible off r 1 set myFigure PaperUnits inches PaperPositio
  • 业务对象、验证和异常

    我一直在阅读一些有关异常及其使用的问题和答案 似乎有一种强烈的观点认为 仅应针对异常 未处理的情况提出异常 因此 这让我想知道验证如何与业务对象一起工作 假设我有一个业务对象 其中包含对象属性的 getter setter 假设我需要验证该
  • 我正在尝试创建一个情节性的旭日图,但收到错误消息:“dtype:对象,”不是叶子。

    我正在尝试创建一个旭日图 其中不同的行具有不同的长度 并收到错误消息 dtype 对象 不是叶子 我读过这篇文章 请注意 None 条目的父项必须是叶子 即它不能有除 None 之外的其他子项 否则会引发 ValueError 在情节页面上
  • gulp-filter 过滤掉所有文件

    我正在努力将我的工作流程转移到 Gulp 到目前为止我很喜欢它 然而 我似乎误解了 gulp filter 插件的工作原理 我有以下任务 gulp task assets function var stylesFilter gulpFilt
  • YAML 中的管道符号有什么用?

    我是 yaml 新手 我对用于多行的管道符号 有疑问 YAML 是否有类似于下面的语法 test 6 在下面的两个 YAML 文件中 第一个有效 第二个无效 我不知道是什么原因造成的 第一个文件 Name testing val1 seco
  • 将一个 TForm 嵌入另一个 TForm 时如何避免出现问题?

    我经常嵌入一个TForm后代成为另一个TForm后代是这样的 var Form1 TForm1 Form2 TForm2 begin Form2 Parent Form1 Form2 BorderStyle bsNone Form2 Ali
  • AS400 角色扮演模拟器

    我有一个迫切的需求 从java调用一个RPG程序 正如本文中所建议的从 Java 访问 iSeries 上的 RPG https stackoverflow com questions 184864 accessing rpg on ise
  • 为自定义设计器编写 Visual Studio 扩展

    所以 我有一些我想尝试的东西 我的想法是拥有一个作为 Visual Studio 扩展的视觉设计器 我希望能够拖出事件处理程序并连接行为 任何曾经玩过 魔兽争霸 III 脚本编辑器的人都会很清楚我的意思 我想做 这种事情可以在 Visual
  • 如何连接浮点数和字符串?

    我试过这个 ostringstream myString float x string s if x myString lt
  • 包含列表的 Angular2 反应形式

    我正在尝试为用户创建一个表单 该表单允许将多个电话号码与该用户关联 这对于当前反应式表单的实现来说是可能的吗 例如 我希望下面的表格接受可能的许多电话号码 我的前端实现将显示电话号码字段 并且有一个按钮允许添加额外的电话号码字段 userF
  • 从 OpenCart 中删除 index.php?route=common/home

    我目前有User SEO URL s在 OpenCart 管理中设置为是 System gt Settings gt Store gt Server gt User SEO URL s 到目前为止 所有标签和 SEO 链接都正常工作 该命令
  • 如何在android中添加填充矢量文件

    如何在矢量文件中添加填充左 右 上和下变量 我变了 android 视口宽度 and android 视口高度 但一切都没有改变 我的矢量
  • 以编程方式将自定义 WCF 标头添加到端点以实现可靠会话

    我正在构建一个 WCF 路由器 我的客户端使用可靠会话 在这种情况下 当客户端打开通道时 会发送一条消息 建立可靠会话 其内容如下
  • 如何创建一个根据参数使用 $resource 返回数据的服务

    我想创建一个根据参数调用后端的服务 这段代码不起作用 但我希望它能显示我想要实现的目标 myproject factory Item function resource if id undefined return resource res
  • 如何强制 GPG 接受来自 STDIN 的输入而不是尝试打开文件?

    我正在尝试将 GPG 的文本清晰签名合并到 PHP 脚本的字符串中 我可以让 GPG 加密字符串中的文本 如下所示 encrypted shell exec echo text gpg e a r email protected cdn c
  • 如何解决 DataSet.toJSON 与结构化流不兼容的问题

    我想将 Twitter 中的数据写入 Kafka 出于教育目的 我尝试使用结构化流来做到这一点 我创建了一个基于套接字源的 Twitter 源 它运行良好 我按如下方式设置来源 val tweets spark readStream for