如何取消堆叠数据集(使用数据透视)?

2024-01-22

我在 1.6 上尝试了新的“枢轴”功能更大的堆叠数据集 https://www.kaggle.com/worldbank/world-development-indicators/downloads/world-development-indicators-release-2016-01-28-06-31-53.zip。它有 5,656,458 行,IndicatorCode列有 1344 个不同的代码。

这个想法是使用枢轴来“拆散”(用 pandas 术语)这个数据集,并为每个 IndicatorCode 有一列。

schema = StructType([ \
   StructField("CountryName", StringType(), True), \
   StructField("CountryCode", StringType(), True), \
   StructField("IndicatorName", StringType(), True), \
   StructField("IndicatorCode", StringType(), True), \
   StructField("Year", IntegerType(), True), \
   StructField("Value", DoubleType(), True)  \
])

data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv', 
                            format='com.databricks.spark.csv', 
                            header='true', 
                            schema=schema)

data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
                      .select(["CountryCode", "IndicatorCode2", "Year", "Value"])

columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]

data3 = data2.groupBy(["Year", "CountryCode"])\
             .pivot("IndicatorCode2", columns)\
             .max("Value")

虽然这成功返回了,data3.first()从未返回结果(10 分钟后我使用 3 个核心中断了我的独立操作)。

我的方法使用RDD and aggregateByKey效果很好,所以我不是在寻找有关如何执行此操作的解决方案,而是使用 DataFrames 进行透视是否也可以解决问题。


好吧,一般而言,旋转并不是一个非常有效的操作,并且您对此无能为力DataFrameAPI。你可以尝试的一件事是repartition您的数据:

(data2
  .repartition("Year", "CountryCode")
  .groupBy("Year", "CountryCode")
  .pivot("IndicatorCode2", columns)
  .max("Value"))

甚至聚合:

from pyspark.sql.functions import max

(df
    .groupBy("Year", "CountryCode", "IndicatorCode")
    .agg(max("Value").alias("Value"))
    .groupBy("Year", "CountryCode")
    .pivot("IndicatorCode", columns)
    .max("Value"))

申请之前pivot。两种解决方案背后的想法是相同的。而不是移动大扩展Rows移动狭窄的密集数据并在本地扩展。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何取消堆叠数据集(使用数据透视)? 的相关文章

随机推荐