Apache Spark SQL get_json_object java.lang.String 无法转换为 org.apache.spark.unsafe.types.UTF8String

2023-12-09

我正在尝试使用结构化流从 Apache Spark 中的 MQTT 代理读取 json 流,读取传入 json 的一些属性并将它们输出到控制台。我的代码看起来像这样:

val spark = SparkSession
  .builder()
  .appName("BahirStructuredStreaming")
  .master("local[*]")
  .getOrCreate()

import spark.implicits._

val topic = "temp"
val brokerUrl = "tcp://localhost:1883"
val lines = spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", topic).option("persistence", "memory")
  .load(brokerUrl)
  .toDF().withColumn("payload", $"payload".cast(StringType))

val jsonDF = lines.select(get_json_object($"payload", "$.eventDate").alias("eventDate"))

    val query = jsonDF.writeStream
      .format("console")
      .start()

    query.awaitTermination()

但是,当 json 到达时,我收到以下错误:

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 14d28475-d435-49be-a303-8e47e2f907e3, runId = b5bd28bb-b247-48a9-8a58-cb990edaf139]
Current Committed Offsets: {MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: temp clientId: paho7247541031496]: -1}
Current Available Offsets: {MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: temp clientId: paho7247541031496]: 0}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [get_json_object(payload#22, $.id) AS eventDate#27]
+- Project [id#10, topic#11, cast(payload#12 as string) AS payload#22, timestamp#13]
   +- StreamingExecutionRelation MQTTStreamSource[brokerUrl: tcp://localhost:1883, topic: temp clientId: paho7247541031496], [id#10, topic#11, payload#12, timestamp#13]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:300)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.SparkException: Writing job aborted.
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3384)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2783)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3365)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:537)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:533)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:532)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    ... 1 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 8, localhost, executor driver): java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1887)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1875)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1874)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
    ... 34 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我使用 mosquitto 代理发送 JSON 记录,它们如下所示:

mosquitto_pub -m '{"eventDate": "2020-11-11T15:17:00.000+0200"}' -t "temp"

似乎来自 Bahir 流源提供商的每个字符串都会引发此错误。例如以下代码也会引发此错误:

spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", topic).option("persistence", "memory")
  .load(brokerUrl)
  .select("topic")
  .writeStream
  .format("console")
  .start()

Spark 似乎无法识别来自 Bahir 的字符串,可能是某种奇怪的字符串类版本问题。我已尝试以下操作来使代码正常工作:

  • 将java版本设置为8
  • 将spark版本从2.4.0升级到2.4.7
  • 将 scala 版本设置为 2.11.12
  • use decode函数具有所有可能的编码组合,而不是.cast(StringType)将列“payload”转换为字符串
  • use 子串在“有效负载”列上使用函数来重新创建兼容的字符串。

最后,我通过使用构造函数和数据集重新创建字符串来获得工作代码:

val lines = spark.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", topic).option("persistence", "memory")
  .load(brokerUrl)
  .select("payload")
  .as[Array[Byte]]
  .map(payload => new String(payload))
  .toDF("payload")

这个解决方案相当丑陋,但至少它有效。

我相信问题中提供的代码没有任何问题,并且我怀疑 Bahir 或 Spark 方面存在错误,阻止 Spark 处理来自 Bahir 源的字符串。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Spark SQL get_json_object java.lang.String 无法转换为 org.apache.spark.unsafe.types.UTF8String 的相关文章

  • Swift 1.2 和 Swift 2.0 中的字符串长度[重复]

    这个问题在这里已经有答案了 在以前版本的 Swift 中 我有以下代码 func myfunc mystr String if mystr utf16Count gt 3 使用最新版本的 Swift 1 2 我现在收到以下错误 utf16C
  • C++ 中的字符串到 LPCWSTR

    我正在尝试从字符串转换为 LPCWSTR 我使用多位 1 例如 LPCWSTR ToLPCWSTR string text LPCWSTR sw LPCWSTR text c str return sw 2 返回中文字符 LPCWSTR T
  • 将 JSON 字符串传递给 Django 模板

    我一直在用头撞墙 试图找出为什么我无法将从 Django 模型生成的 JSON 字符串传递到模板的 javascript 静态文件中 事实证明 问题不在模型级别 使用serializers serialize 在脚本本身中放入相同的字符串将
  • 将表类型添加到 JSON 编辑器

    我想了解的代码这个 JSON 编辑器 http mb21 github io JSONedit 并修改它 In 指令 js https github com mb21 JSONedit blob gh pages js directives
  • string.empty 和 string[0] == '\0' 之间的区别

    假设我们有一个字符串 std string str some value is assigned 有什么区别str empty and str 0 0 C 11 及更高版本 string variable 0 如果字符串为空 则需要返回空字
  • 在大文件中查找重复字符串

    一个文件包含大量 例如100亿 字符串 您需要查找重复的字符串 您有 N 个可用系统 您将如何找到重复项 埃里克森的答案可能是提出这个问题的人所期望的 您可以将 N 台机器中的每台机器用作哈希表中的一个存储桶 对于每个字符串 按顺序说出字符
  • 性能方面插值(直接插入字符串)VS串联[关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 在python中将列表转换为字符串

    我对 python 语言相当陌生 我一直在寻找这个问题的答案 我需要一个如下所示的列表 Kevin went to his computer He sat down He fell asleep 转换为如下字符串 Kevin went to
  • std::regex 转义正则表达式中使用的特殊字符

    我是字符串来创建一个std regex FILE 作为单元测试的一部分 检查一些打印文件名的异常输出 在 Windows 上失败并显示 regex error error escape 表达式包含无效的转义字符或尾随转义 因为 FILE 宏
  • 如何在 ECMAScript 6 中导入 JSON 文件?

    如何访问 ECMAScript 6 中的 JSON 文件 以下不起作用 import config from config json 如果我尝试导入 JavaScript 文件 这可以正常工作 https www stefanjudis c
  • 使用 Javascript 删除字符串的最后一个字符

    我有一个DIV与一些字符 如何在每次单击时删除文本中的最后一个字符DIV itself 删除第一个字符 div on click function this text function index text return text repl
  • 如何从 mysql 数据库中提取数据并使用 D3.JS 进行可视化?

    我有一个数据库MySQL我想在其中可视化D3 JS 为了做到这一点 首先我想parse中的数据JSON格式 然后编写一个基本代码 从数据库中提取数据并使用D3 JS 我环顾四周 但找不到我想要的东西 因为我是新手D3 JS 我怎样才能做到这
  • 如何使用 Zend 2 http 发送 json 数据?

    我已经为此苦苦挣扎了几天 我需要将一组以 json 编码的数据发送到 api 我正在尝试使用 Zend 2 http 来实现这一点 但到目前为止我还没有运气 以下是 api 手册的内容 Bulk Create Contacts This c
  • 使用 Android 解析 JSON 的最有效方法

    我编写了一些代码来解析我的 Android 程序收到的 Google 距离矩阵 JSON 响应 我唯一感兴趣的数据是 距离 值 节点 我的代码可以工作 但似乎必须有一种更简单的方法来做到这一点 距离值节点在 JSON 中嵌套得很深 但是真的
  • 从 Spark 数据帧中过滤大量 ID

    我有一个大型数据框 其格式类似于 ID Cat date 12 A 201602 14 B 201601 19 A 201608 12 F 201605 11 G 201603 我需要根据大约 500 万个 Is 的列表来过滤行 最直接的方
  • Node.js 中的 JSON Zip 响应

    我对 node js 还很陌生 我正在尝试发回包含 JSON 结果的 zip 文件 我一直在尝试弄清楚如何去做 但还没有达到预期的结果 我正在使用 NodeJS ExpressJS LocomotiveJS Mongoose 和 Mongo
  • 将类型信息传递给 Scala 中的函数

    我有对 json 对象执行一些常见操作的代码 即提取 所以我想创建一个通用函数 它接受哪个类的类型参数 代码如下所示 def getMessageType T json JValue Either GenericError T try Ri
  • JSON 编码和大引号

    我在 PHP 5 的本机实现中遇到了一个有趣的行为json encode 显然 当将对象序列化为 json 字符串时 编码器将清空包含 卷曲 引号的字符串的任何属性 这种类型可能会在启用自动转换的情况下从 MS Word 文档中复制粘贴 这
  • Windows Phone 的 JSON 反序列化

    我正在尝试反序列化以下 JSON 但我真的不知道如何使用 JSON net 来完成这项工作 我正在使用 C 和 JSON Net 库 我的 JSON 如下 found 3 bounds 43 54919 172 62148 43 54487
  • Scalaz 状态 monad 示例

    我还没有看到很多 scalaz 状态单子的例子 有这个例子 http scalaz github com scalaz scalaz 2 9 1 6 0 2 doc sxr scalaz example ExampleState scala

随机推荐