我是 Spark 新手,正在使用 JSON,但我在做一些相当简单的事情时遇到了困难(我认为)。我尝试过使用类似问题的部分解决方案,但不太正确。我目前有一个 Spark 数据框,其中有几列代表变量。每行都是变量值的唯一组合。然后,我有一个应用于每一行的 UDF,它将每一列作为输入,进行一些分析,并以 JSON 字符串的形式输出每行的汇总表,并将这些结果保存在表的新列中。一些小样本数据如下所示:
+------+-----+------+-------------------------------------------------------------------
|Var 1 |Var 2|Var 3 |JSON Table
+------+------------+-------------------------------------------------------------------
|True |10% |200 |[{"Out_1": "Mean", "Out_2": "25"}, {"Out_1": "Median", "Out_2": "21"}]
|False |15% |150 |[{"Out_1": "Mean", "Out_2": "19"}, {"Out_1": "Median", "Out_2": "18"}]
|True |12% |100 |[{"Out_1": "Mean", "Out_2": "22"}, {"Out_1": "Median", "Out_2": "20"}]
我想将其转换为以下格式:
+------+-----+------+------+-----+
|Var 1 |Var 2|Var 3 |Out_1 |Out_2|
+------+------------+------+-----+
|True |10% |200 |Mean |25 |
|True |10% |200 |Median|21 |
|False |15% |150 |Mean |19 |
|False |15% |150 |Median|18 |
|True |12% |100 |Mean |22 |
|True |12% |100 |Median|20 |
实际上,有更多的变量、数百万行和更大的 JSON 字符串以及更多的输出,但核心问题仍然相同。我基本上尝试获取 JSON 模式并使用 from_json ,如下所示:
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = spark.read.json(df.rdd.map(lambda row: row["JSON Table"])).schema
df = df\
.withColumn("JSON Table", from_json("JSON Table", schema))\
.select(col('*'), col('JSON Table.*'))\
df.show()
这似乎正确地获取了 JSON 结构(尽管每个值都被读取为字符串,但大多数都是整数),但生成的数据帧是空的,尽管具有正确的列标题。关于如何处理这个问题有什么建议吗?