我假设您正在寻找下面显示的 JSON 输出。
from pyspark.sql.functions import col, collect_list, struct
df = sc.parallelize([('A','P1',3000), ('A','P2',1500),
('B','P1',3000), ('B','P3',2000)]).toDF(["user_id", "product_id","price"])
> Spark2.0
df1 = df.\
groupBy("user_id").agg(collect_list(struct(col("product_id"),col("price"))).alias("contents_json"))
df1.show()
Spark1.6
zipCols = psf.udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("product_id", StringType()),
StructField("price", IntegerType())
]))
)
df1 = df.\
groupBy("user_id").agg(
zipCols(
collect_list(col("product_id")),
collect_list(col("price"))
).alias("contents_json")
)
for row in df1.toJSON().collect():
print row
输出是:
{"user_id":"B","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P3","price":2000}]}
{"user_id":"A","contents_json":[{"product_id":"P1","price":3000},{"product_id":"P2","price":1500}]}