我应用了问题中的算法Spark:如何转置和分解具有嵌套数组的列 https://stackoverflow.com/questions/69418239/spark-how-to-transpose-and-explode-columns-with-nested-arrays使用动态数组转置和分解嵌套 Spark 数据框。
我已添加到数据框"""{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
,有新列c
,其中数组有新的val_dynamic
可以随机出现的字段。
我正在寻找所需的输出 2(转置和爆炸),但即使是所需的输出 1(转置)的示例也将非常有用。
输入 df:
+------------------+--------+-----------+---+
| a| b| c| id|
+------------------+--------+-----------+---+
|[{1, 1}, {11, 11}]| null| null| 1|
| null|[{2, 2}]| null| 2|
| null| null|[{3, 3, 3}]| 3| !!! NOTE: Added `val_dynamic`
+------------------+--------+-----------+---+
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
| | |-- val_dynamic: long (nullable = true) !!! NOTE: Added `val_dynamic`
|-- id: long (nullable = true)
所需输出 1 (transpose_df):
+---+------+-------------------+
| id| cols | arrays |
+---+------+-------------------+
| 1| a | [{1, 1}, {11, 11}]|
| 2| b | [{2, 2}] |
| 3| c | [{3, 3, 3}] | !!! NOTE: Added `val_dynamic`
+---+------+-------------------+
所需输出 2 (explode_df):
+---+----+----+---+-----------+
| id|cols|date|val|val_dynamic|
+---+----+----+---+-----------+
| 1| a| 1| 1| null |
| 1| a| 11| 11| null |
| 2| b| 2| 2| null |
| 3| c| 3| 3| 3 | !!! NOTE: Added `val_dynamic`
+---+----+----+---+-----------+
当前代码:
import pyspark.sql.functions as f
df = spark.read.json(sc.parallelize([
"""{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
"""{"id":2,"b":[{"date":2,"val":2}]}}""",
"""{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
]))
df.show()
cols = [ 'a', 'b', 'c']
#expr = stack(2,'a',a,'b',b,'c',c )
expr = f"stack({len(cols)}," + \
",".join([f"'{c}',{c}" for c in cols]) + \
")"
transpose_df = df.selectExpr("id", expr) \
.withColumnRenamed("col0", "cols") \
.withColumnRenamed("col1", "arrays") \
.filter("not arrays is null")
transpose_df.show()
explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
explode_df.show()
目前的结果
AnalysisException: cannot resolve 'stack(3, 'a', `a`, 'b', `b`, 'c', `c`)' due to data type mismatch: Argument 2 (array<struct<date:bigint,val:bigint>>) != Argument 6 (array<struct<date:bigint,val:bigint,val_dynamic:bigint>>); line 1 pos 0;
'Project [id#2304L, unresolvedalias(stack(3, a, a#2301, b, b#2302, c, c#2303), Some(org.apache.spark.sql.Column$$Lambda$2580/0x00000008411d3040@4d9eefd0))]
+- LogicalRDD [a#2301, b#2302, c#2303, id#2304L], false
ref :