Use org.json.XML
要转换的库XML
数据到JSON
.
检查下面的代码。
创造UDF
scala> import org.json.XML
import org.json.XML
scala> val parse = udf((value: String) => XML.toJSONObject(value).toString) // Defined UDF to parse xml to json
parse: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
定义schema
基于XML
data.
scala> val schema_json = """{"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}""" // Define Schema of your xml data in json.
schema_json: String = {"type":"struct","fields":[{"name":"employees","type":{"type":"struct","fields":[{"name":"employee","type":{"type":"struct","fields":[{"name":"building","type":"long","nullable":true,"metadata":{}},{"name":"division","type":"string","nullable":true,"metadata":{}},{"name":"firstname","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"lastname","type":"string","nullable":true,"metadata":{}},{"name":"room","type":"long","nullable":true,"metadata":{}},{"name":"supervisor","type":"string","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}
scala> val schema = DataType.fromJson(schema_json).asInstanceOf[StructType] // Convert Json schema data to schema.
schema: org.apache.spark.sql.types.StructType = StructType(StructField(employees,StructType(StructField(employee,StructType(StructField(building,LongType,true), StructField(division,StringType,true), StructField(firstname,StringType,true), StructField(id,StringType,true), StructField(lastname,StringType,true), StructField(room,LongType,true), StructField(supervisor,StringType,true), StructField(title,StringType,true)),true)),true))
Final Schema
scala>
inputStream
.selectExpr("CAST(value AS STRING)")
.select(from_json(parse($"data"),schema).as("emp_data"))
.select($"emp_data.employees.employee.*")
.printSchema
root
|-- building: long (nullable = true)
|-- division: string (nullable = true)
|-- firstname: string (nullable = true)
|-- id: string (nullable = true)
|-- lastname: string (nullable = true)
|-- room: long (nullable = true)
|-- supervisor: string (nullable = true)
|-- title: string (nullable = true)
写作转换为JSON
数据到console
.
scala>
inputStream
.selectExpr("CAST(value AS STRING)")
.select(from_json(parse($"data"),schema).as("emp_data"))
.select($"emp_data.employees.employee.*")
.writeStream
.format("console")
.option("truncate", false)
.outputMode("append")
.start()
.awaitTermination()