所以,通过阅读一些有趣的东西here https://stackoverflow.com/questions/29483498/append-a-column-to-data-frame-in-apache-spark-1-3,我已经确定你不能真的只是将随机/任意列附加到给定的DataFrame
目的。看来你想要的更多的是zip
than a join
。我环顾四周发现这张票 https://issues.apache.org/jira/browse/SPARK-7460,这让我觉得你不能zip
鉴于你有DataFrame
而不是RDD
对象。
我能够解决你的问题的唯一方法就是离开这个世界DataFrame
对象并返回到RDD
对象。我还需要为连接目的创建一个索引,这可能适合也可能不适合您的用例。
l = sc.parallelize([1, 2, 3])
index = sc.parallelize(range(0, l.count()))
z = index.zip(l)
rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])
rdd_index = index.zip(rdd)
# just in case!
assert(rdd.count() == l.count())
# perform an inner join on the index we generated above, then map it to look pretty.
new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])
new_df = new_rdd.toDF(["product", 'name', 'new_col'])
当我跑步时new_df.show()
, I get:
+-------+----+-------+
|product|name|new_col|
+-------+----+-------+
| p1| a| 1|
| p2| b| 2|
| p3| c| 3|
+-------+----+-------+
旁注:我真的很惊讶这不起作用。看起来像外连接?
from pyspark.sql import Row
l = sc.parallelize([1, 2, 3])
new_row = Row("new_col_name")
l_as_df = l.map(new_row).toDF()
new_df = df.join(l_as_df)
当我跑步时new_df.show()
,我得到:
+-------+----+------------+
|product|name|new_col_name|
+-------+----+------------+
| p1| a| 1|
| p1| a| 2|
| p1| a| 3|
| p2| b| 1|
| p3| c| 1|
| p2| b| 2|
| p2| b| 3|
| p3| c| 2|
| p3| c| 3|
+-------+----+------------+