我在 1.6 上尝试了新的“枢轴”功能更大的堆叠数据集 https://www.kaggle.com/worldbank/world-development-indicators/downloads/world-development-indicators-release-2016-01-28-06-31-53.zip。它有 5,656,458 行,IndicatorCode
列有 1344 个不同的代码。
这个想法是使用枢轴来“拆散”(用 pandas 术语)这个数据集,并为每个 IndicatorCode 有一列。
schema = StructType([ \
StructField("CountryName", StringType(), True), \
StructField("CountryCode", StringType(), True), \
StructField("IndicatorName", StringType(), True), \
StructField("IndicatorCode", StringType(), True), \
StructField("Year", IntegerType(), True), \
StructField("Value", DoubleType(), True) \
])
data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv',
format='com.databricks.spark.csv',
header='true',
schema=schema)
data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
.select(["CountryCode", "IndicatorCode2", "Year", "Value"])
columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]
data3 = data2.groupBy(["Year", "CountryCode"])\
.pivot("IndicatorCode2", columns)\
.max("Value")
虽然这成功返回了,data3.first()
从未返回结果(10 分钟后我使用 3 个核心中断了我的独立操作)。
我的方法使用RDD
and aggregateByKey
效果很好,所以我不是在寻找有关如何执行此操作的解决方案,而是使用 DataFrames 进行透视是否也可以解决问题。