如何使用 pyspark Collect_list 函数检索所有列

2023-12-10

我有一个 pyspark 2.0.1。我正在尝试对我的数据框进行分组并从我的数据框中检索所有字段的值。我找到

z=data1.groupby('country').agg(F.collect_list('names')) 

将为我提供国家/地区和名称属性的值以及名称属性的值,它将提供列标题collect_list(names)。但对于我的工作,我有大约 15 列的数据框,我将运行一个循环,每次在循环内更改 groupby 字段,并需要所有剩余字段的输出。您能否建议我如何使用collect_list( )或任何其他 pyspark 函数?

我也尝试过这段代码

from pyspark.sql import functions as F 
fieldnames=data1.schema.names 
names1= list() 
for item in names: 
   if item != 'names': 
     names1.append(item) 
 z=data1.groupby('names').agg(F.collect_list(names1)) 
 z.show() 

但收到错误消息

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.collect_list. Trace: py4j.Py4JException: Method collect_list([class java.util.ArrayList]) does not exist 

在调用 groupBy 之前使用 struct 组合列

假设你有一个数据框

df = spark.createDataFrame(sc.parallelize([(0,1,2),(0,4,5),(1,7,8),(1,8,7)])).toDF("a","b","c")

df = df.select("a", f.struct(["b","c"]).alias("newcol"))
df.show()
+---+------+
|  a|newcol|
+---+------+
|  0| [1,2]|
|  0| [4,5]|
|  1| [7,8]|
|  1| [8,7]|
+---+------+
df = df.groupBy("a").agg(f.collect_list("newcol").alias("collected_col"))
df.show()
+---+--------------+
|  a| collected_col|
+---+--------------+
|  0|[[1,2], [4,5]]|
|  1|[[7,8], [8,7]]|
+---+--------------+

聚合操作只能对单列进行。

聚合后,您可以收集结果并对其进行迭代以分离组合列,生成索引字典。或者你可以写一个 udf 来分隔合并的列。

from pyspark.sql.types import *
def foo(x):
    x1 = [y[0] for y in x]
    x2 = [y[1] for y in x]
    return(x1,x2)

st = StructType([StructField("b", ArrayType(LongType())), StructField("c", ArrayType(LongType()))])
udf_foo = udf(foo, st)
df = df.withColumn("ncol", 
                  udf_foo("collected_col")).select("a",
                  col("ncol").getItem("b").alias("b"), 
                  col("ncol").getItem("c").alias("c"))
df.show()

+---+------+------+
|  a|     b|     c|
+---+------+------+
|  0|[1, 4]|[2, 5]|
|  1|[7, 8]|[8, 7]|
+---+------+------+
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 pyspark Collect_list 函数检索所有列 的相关文章