我正在尝试使用spark-xml 库将宽嵌套的XML 文件解析为DataFrame。
以下是缩写的架构定义 (XSD):
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="ItemExport">
<xs:complexType>
<xs:sequence>
<xs:element name="Item">
<xs:complexType>
<xs:sequence>
<xs:element name="ITEM_ID" type="xs:integer" />
<xs:element name="CONTEXT" type="xs:string" />
<xs:element name="TYPE" type="xs:string" />
...
<xs:element name="CLASSIFICATIONS">
<xs:complexType>
<xs:sequence>
<xs:element maxOccurs="unbounded" name="CLASSIFICATION">
<xs:complexType>
<xs:sequence>
<xs:element name="CLASS_SCHEME" type="xs:string" />
<xs:element name="CLASS_LEVEL" type="xs:string" />
<xs:element name="CLASS_CODE" type="xs:string" />
<xs:element name="CLASS_CODE_NAME" type="xs:string" />
<xs:element name="EFFECTIVE_FROM" type="xs:dateTime" />
<xs:element name="EFFECTIVE_TO" type="xs:dateTime" />
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
包含数据的 XML 文件看起来像这样:
<?xml version="1.0" encoding="utf-8"?>
<ItemExport>
<TIMEZONE>PT</TIMEZONE>
<Item>
<ITEM_ID>56</ITEM_ID>
<CONTEXT>Sample</CONTEXT>
<TYPE>Product</TYPE>
</Item>
...
<Item>
<ITEM_ID>763</ITEM_ID>
<CONTEXT>Sample</CONTEXT>
<TYPE>Product</TYPE>
<CLASSIFICATIONS>
<CLASSIFICATION>
<CLASS_SCHEME>AAU</CLASS_SCHEME>
<CLASS_LEVEL>1</CLASS_LEVEL>
<CLASS_CODE>14</CLASS_CODE>
<CLASS_CODE_NAME>BizDev</CLASS_CODE_NAME>
<EFFECTIVE_FROM />
<EFFECTIVE_TO />
</CLASSIFICATION>
</CLASSIFICATIONS>
</Item>
<ItemExport>
现在,可以明确的是,RowTag
需要是Item
,但我遇到了有关 XSD 的问题。行模式封装在文档模式中。
import com.databricks.spark.xml.util.XSDToSchema
import com.databricks.spark.xml._
import java.nio.file.Paths
import org.apache.spark.sql.functions._
val inputFile = "dbfs:/samples/ItemExport.xml"
val schema = XSDToSchema.read(Paths.get("/dbfs/samples/ItemExport.xsd"))
val df1 = spark.read.option("rowTag", "Item").xml(inputFile)
val df2 = spark.read.schema(schema).xml(inputFile)
我基本上想要得到struct
在根元素下的 Item 下,而不是整个文档架构。
schema.printTreeString
root
|-- ItemExport: struct (nullable = false)
| |-- Item: struct (nullable = false)
| | |-- ITEM_ID: integer (nullable = false)
| | |-- CONTEXT: string (nullable = false)
| | |-- TYPE: string (nullable = false)
...(a few more fields...)
| | |-- CLASSIFICATIONS: struct (nullable = false)
| | | |-- CLASSIFICATION: array (nullable = false)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- CLASS_SCHEME: string (nullable = false)
| | | | | |-- CLASS_LEVEL: string (nullable = false)
| | | | | |-- CLASS_CODE: string (nullable = false)
| | | | | |-- CLASS_CODE_NAME: string (nullable = false)
| | | | | |-- EFFECTIVE_FROM: timestamp (nullable = false)
| | | | | |-- EFFECTIVE_TO: timestamp (nullable = false)
在上面的例子中,使用文档模式解析会产生一个空的 DataFrame:
df2.show()
+-----------+
| ItemExport|
+-----------+
+-----------+
虽然推断的模式基本上是正确的,但它只能在存在嵌套列时推断它们(情况并非总是如此):
df1.show()
+----------+--------------------+----------+---------------+
| ITEM_ID| CONTEXT| TYPE|CLASSIFICATIONS|
+----------+--------------------+----------+---------------+
| 56| Sample | Product| {null}|
| 57| Sample | Product| {null}|
| 59| Part | Component| {null}|
| 60| Part | Component| {null}|
| 61| Sample | Product| {null}|
| 62| Sample | Product| {null}|
| 63| Assembly | Product| {null}|
df1.printSchema
root
|-- ITEM_ID: long (nullable = true)
|-- CONTEXT: string (nullable = false)
|-- TYPE: string (nullable = true)
...
|-- CLASSIFICATIONS: struct (nullable = true)
| |-- CLASSIFICATION: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- CLASS_CODE: long (nullable = true)
| | | |-- CLASS_CODE_NAME: string (nullable = true)
| | | |-- CLASS_LEVEL: long (nullable = true)
| | | |-- CLASS_SCHEME: string (nullable = true)
| | | |-- EFFECTIVE_FROM: string (nullable = true)
| | | |-- EFFECTIVE_TO: string (nullable = true)
如上所述here https://stackoverflow.com/questions/67531343/spark-xml-receiving-only-null-when-parsing-xml-column-using-from-xml-function并在XML 库文档 https://github.com/databricks/spark-xml#features(“用于单独验证每行 XML 的 XSD 文件的路径”),我可以解析为给定的行级架构,如下所示:
import org.apache.spark.sql.types._
val structschema = StructType(
Array(
StructField("ITEM_ID",IntegerType,false),
StructField("CONTEXT",StringType,false),
StructField("TYPE",StringType,false),
)
)
val df_struct = spark.read.schema(structschema).option("rowTag", "Item").xml(inputFile)
不过,我想从 XSD 获取嵌套列的架构。鉴于以下情况,如何解决这个问题schema
?
版本信息:Scala2.12
, Spark 3.1.1
, Spark-XML0.12.0