如果您使用 Spark 1.4+,这会变得非常非常容易,这要归功于数据帧API https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html。 (DataFrames 是在 Spark 1.3 中引入的,但是partitionBy()
,我们需要的是1.4中引入 http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.partitionBy.)
如果您开始使用 RDD,则首先需要将其转换为 DataFrame:
val people_rdd = sc.parallelize(Seq((1, "alice"), (1, "bob"), (2, "charlie")))
val people_df = people_rdd.toDF("number", "name")
在 Python 中,同样的代码是:
people_rdd = sc.parallelize([(1, "alice"), (1, "bob"), (2, "charlie")])
people_df = people_rdd.toDF(["number", "name"])
一旦有了 DataFrame,根据特定键写入多个输出就很简单了。更重要的是——这就是 DataFrame API 的美妙之处——Python、Scala、Java 和 R 中的代码几乎相同:
people_df.write.partitionBy("number").text("people")
如果需要,您可以轻松使用其他输出格式:
people_df.write.partitionBy("number").json("people-json")
people_df.write.partitionBy("number").parquet("people-parquet")
在每个示例中,Spark 都会为我们对 DataFrame 进行分区的每个键创建一个子目录:
people/
_SUCCESS
number=1/
part-abcd
part-efgh
number=2/
part-abcd
part-efgh