Spark SQL 与 Python 的等价物是pyspark.sql.functions.arrays_zip https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=arrays_zip#pyspark.sql.functions.arrays_zip:
pyspark.sql.functions.arrays_zip(*cols)
集合函数:返回一个合并的结构体数组,其中第 N 个结构体包含输入数组的所有第 N 个值。
因此,如果您已经有两个数组:
from pyspark.sql.functions import split
df = (spark
.createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
.toDF("column_1", "column_2")
.withColumn("column_1", split("column_1", "\s*,\s*"))
.withColumn("column_2", split("column_2", "\s*,\s*")))
您可以将其应用到结果上
from pyspark.sql.functions import arrays_zip
df_zipped = df.withColumn(
"zipped", arrays_zip("column_1", "column_2")
)
df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+
现在要合并结果,您可以transform
(如何使用变换高阶函数? https://stackoverflow.com/q/53761600/10465355, TypeError:列不可迭代 - 如何迭代 ArrayType()? https://stackoverflow.com/q/48993439/10465355):
df_zipped_concat = df_zipped.withColumn(
"zipped_concat",
expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
)
df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+
Note:
高阶函数transform
and arrays_zip
Apache Spark 2.4 中已引入。