在 Spark 中将流式 XML 转换为 JSON

2024-05-28

我是 Spark 新手,正在开发一个简单的应用程序,将从 Kafka 接收的 XML 流转换为 JSON 格式

Using:

  • 火花2.4.5
  • 斯卡拉 2.11.12

在我的用例中,kafka 流采用 xml 格式)。以下是我尝试过的代码。


    val spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName("Spark Demo")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    val inputStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "demo_topic_xml")
      .option("startingOffsets", "earliest") // From starting
      .load()

    inputStream.printSchema()


    val records = inputStream.selectExpr("CAST(value AS STRING)")
    //How to remove value column here while converting xml in to json?
    val jsons = records.toJSON

    jsons.writeStream
      .format("console")
      .option("truncate", false)
      .outputMode("append")
      .start()
      .awaitTermination()

然而,上面的代码在 json 输出中给出了“value”列标题作为字段名称,如下所示:{"value":"<?xml version=\"1.0\" encoding=\"utf-16\"?><employees><employee id=\"be129\"><firstname>Jane</firstname><lastname>Doe</lastname><title>Engineer</title><division>Materials</division><building>327</building><room>19</room><supervisor>be131</supervisor></employee><employees>"}

我真正需要的只是将 xml 有效负载转换为 json,而不需要“值”列部分。看起来我在这里遗漏了一些明显的东西。有人可以帮我吗?谢谢你的时间。


Use org.json.XML要转换的库XML数据到JSON.

检查下面的代码。

创造UDF

scala> import org.json.XML
import org.json.XML

scala> val parse = udf((value: String) => XML.toJSONObject(value).toString) // Defined UDF to parse xml to json
parse: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

定义schema基于XML data.

scala> val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""" // Define Schema of your xml data in json.
schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}

scala> val schema = DataType.fromJson(schema_json).asInstanceOf[StructType] // Convert Json schema data to schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true), StructField(division,StringType,true), StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),true)),true))

Final Schema

scala>
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .printSchema

root
 |-- building: long (nullable = true)
 |-- division: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- room: long (nullable = true)
 |-- supervisor: string (nullable = true)
 |-- title: string (nullable = true)

写作转换为JSON数据到console.

scala> 
    inputStream
    .selectExpr("CAST(value AS STRING)")
    .select(from_json(parse($"data"),schema).as("emp_data"))
    .select($"emp_data.employees.employee.*")
    .writeStream
    .format("console")
    .option("truncate", false)
    .outputMode("append")
    .start()
    .awaitTermination()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Spark 中将流式 XML 转换为 JSON 的相关文章

  • 在 Scala 中定义具有多个隐式参数的函数

    如何定义具有多个隐式参数的函数 def myfun arg String implicit p1 String implicit p2 Int doesn t work 它们必须全部放入一个参数列表中 并且该列表必须是最后一个 def my
  • 抽象类型与类型参数

    在什么情况下抽象类型应该优先于类型参数 添加到我的之前关于抽象类型与参数的回答 https stackoverflow com questions 1154571 scala abstract types vs generics 11547
  • Scala Spark:将数据框中的双列转换为日期时间列

    我正在尝试编写代码来将日期时间列 date 和 last updated date 转换为 mm dd yyyy 格式以进行显示 它们实际上是 unix 时间转换为双精度数 我该怎么做呢 import org joda time impor
  • Spark sql 每组前 n 个

    我怎样才能获得每组的前n名 比如说前10名或前3名 spark sql http www xaprb com blog 2006 12 07 how to select the firstleastmax row per group in
  • 超时对“Future”进行排序

    我利用了TimeoutScheduler介绍于Scala Futures 内置超时 https stackoverflow com questions 16304471 scala futures built in timeout 但是 现
  • mssql 的 UUID 疯狂

    我的数据库条目有一个 UUID 及其值 使用 Microsoft SQL Server Management Studio 提取 CDF86F27 AFF4 2E47 BABB 2F46B079E98B 将其加载到我的 Scala 应用程序
  • Spark中DataFrame、Dataset、RDD的区别

    我只是想知道有什么区别RDD and DataFrame Spark 2 0 0 DataFrame 只是一个类型别名Dataset Row 在阿帕奇火花 你能将其中一种转换为另一种吗 首先是DataFrame是从SchemaRDD 是的
  • 使用 Scala 的解析器组合器时如何忽略不匹配的前面文本?

    我真的很喜欢解析器组合器 但是当我不关心相关文本之前的文本时 我对我提出的提取数据的解决方案并不满意 考虑这个小型解析器来获取货币金额 import scala util parsing combinator case class Amou
  • 如何列出Resources文件夹中的所有文件(java/scala)

    我正在编写一个函数 需要访问资源中的文件夹 并循环遍历所有文件名 如果这些文件符合条件 则加载这些文件 new File getClass getResource images sprites getPath listFiles 返回空指针
  • 选择排序通用类型实现

    我以自己的方式实现了选择和快速排序的递归版本 我试图以一种可以对任何泛型类型的列表进行排序的方式修改代码 我想假设提供的泛型类型可以转换为 Comparable at运行 有人有关于如何执行此操作的链接 代码或教程吗 我正在尝试修改这个特定
  • Spark 有效地过滤大数据框中存在于小数据框中的条目

    我有一个 Spark 程序 它读取一个相对较大的数据帧 3 2 TB 其中包含 2 列 id name 和另一个相对较小的数据帧 20k 条目 其中包含单个列 id 我想做的是从大数据框中获取 id 和名称 如果它们出现在小数据框中 我想知
  • 如何使用 monocle 修改嵌套映射和 scala 中的另一个字段

    我第一次尝试单片眼镜 这是案例类 case class State mem Map String Int pointer Int 当前的修改 使用标准 scala 我想做 def add1 s State gt s copy mem s m
  • scala案例类复制实现

    我找不到 scala 中案例类的复制是如何实现的 我可以以某种方式检查一下吗 我虽然 Intellij 可以指出我的实现 但它不想跳转 我不知道为什么 您可以使用以下命令检查 scala 案例类输出scalac print ClassNam
  • 为什么流式聚合总是延迟到两批数据?

    我使用 Spark 2 3 0 我的问题是 每当我在输入目录中添加第三批数据时 第一批数据就会被处理并打印到控制台 为什么 val spark SparkSession builder appName micro1 enableHiveSu
  • 我的sparkDF.persist(DISK_ONLY)数据存储在哪里?

    我想更多地了解spark中hadoop的持久化策略 当我使用 DISK ONLY 策略保存数据帧时 我的数据存储在哪里 路径 文件夹 我在哪里指定这个位置 对于简短的答案 我们可以看看文档 https spark apache org do
  • 在 Scala 中实现“.clone”

    我正在想办法 clone我自己的对象 在 Scala 中 这是为了模拟 因此可变状态是必须的 由此产生了克隆的全部需要 在提前模拟时间之前 我将克隆整个状态结构 这是我目前的尝试 abstract trait Cloneable A See
  • 结构化流式自定义重复数据删除

    我有一个从 kafka 进入 dataFrame 的流数据 我想根据 Id 删除重复项并根据时间戳保留最新记录 样本数据是这样的 Id Name count timestamp 1 Vikas 20 2018 09 19T10 10 10
  • enableHiveSupport 在 java Spark 代码中引发错误[重复]

    这个问题在这里已经有答案了 我有一个非常简单的应用程序 尝试使用 Spark 从 src main resources 读取 orc 文件 我不断收到此错误 无法实例化具有 Hive 支持的 SparkSession 因为找不到 Hive
  • Shapeless 中 TypeClass 特征的 emptyCoproduct 和 coproduct 方法的用途是什么

    我并不完全清楚这样做的目的是什么emptyCoProduct and coproduct的方法TypeClass无形中的特质 什么时候会使用TypeClass特质而不是ProductTypeClass 这两种方法的实施方式有哪些示例 假设我
  • 在 Scala Spark 和 PySpark 之间传递 SparkSession

    我的要求是从现有的 PySpark 程序调用 Spark Scala 函数 将 PySpark 程序中创建的 SparkSession 传递给 Scala 函数的最佳方法是什么 我将 scala jar 传递给 Pyspark 如下所示 s

随机推荐