我有一个来自 Google Analytics 的 Spark 数据框,如下所示:
id customDimensions (Array<Struct>)
100 [ {"index": 1, "value": "Earth"}, {"index": 2, "value": "Europe"}]
101 [ {"index": 1, "value": "Mars" }]
我还有一个“自定义维度元数据”数据框,如下所示:
index name
1 planet
2 continent
我将使用元数据 df 中的索引,以便将自定义维度扩展到列中。结果应如下所示:
id planet continent
100 Earth Europe
101 Mars null
我尝试过以下方法,效果很好,但性能极差。我想知道是否有更好的方法。
# Select the two relevant columns
cd = df.select('id', 'customDimensions')
# Explode customDimensions so that each row now has a {index, value}
cd = cd.withColumn('customDimensions', F.explode(cd.customDimensions))
# Put the index and value into their own columns
cd = cd.select('id', 'customDimensions.index', 'customDimensions.value')
# Join with metadata to obtain the name from the index
metadata = metadata.select('index', 'name')
cd = (cd
.join(metadata, cd.index == metadata.index, 'left')
.drop(metadata.index))
# Pivot cd so that each row has the id, and we have columns for each custom dimension
piv = cd.groupBy('id').pivot('name').agg(F.first(F.col('value')))
# Join back to restore the other columns
return df.join(piv, df.id == piv.id).drop(piv.id)
假设:
- 最多有 250 个自定义维度索引,并且名称只能通过元数据数据框得知
- 原始数据框还有我想维护的其他几个列(因此在我的解决方案末尾加入了连接)