pyspark:有效地将partitionBy写入与原始表相同数量的总分区

2024-02-17

我有一个与 pyspark 相关的问题repartitionBy()我最初在评论中发布的函数这个问题 https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby/。我被要求将其作为一个单独的问题发布,所以这里是:

我明白那个df.partitionBy(COL)将写入每个值为COL到它们自己的文件夹,并且每个文件夹(假设这些行之前通过某个其他键分布在所有分区上)将具有与整个表中之前大致相同数量的文件。我觉得这种行为很烦人。如果我有一个包含 500 个分区的大表,并且我使用partitionBy(COL)在某些属性列上,我现在有 100 个文件夹,其中each包含 500 个(现在非常小)文件。

我想要的是partitionBy(COL)行为,但文件大小和文件数量与我原来的大致相同。

作为演示,上一个问题分享了一个玩具示例,其中您有一个包含 10 个分区的表并执行以下操作partitionBy(dayOfWeek)现在您有 70 个文件,因为每个文件夹中有 10 个文件。我想要大约 10 个文件,每天一个,如果数据较多,可能需要 2 或 3 个文件。

这可以轻松实现吗?就像是df.write().repartition(COL).partitionBy(COL)看起来它可能有效,但我担心(在一个非常大的表即将被分区为许多文件夹的情况下)必须首先将其组合到一些少量的分区beforepartitionBy(COL)似乎是个坏主意。

任何建议将不胜感激!


您有多种选择。在下面的代码中,我假设您想用镶木地板编写,但当然您可以更改它。

(1) df.repartition(numPartitions, *cols).write.partitionBy(*cols).parquet(writePath)

这将首先使用基于哈希的分区来确保来自 COL 的有限数量的值进入每个分区。取决于您选择的值numPartitions,某些分区可能是空的,而其他分区可能挤满了值——对于任何不确定原因的人,请阅读this https://stackoverflow.com/a/42780452/189336。然后,当你打电话时partitionBy在 DataFrameWriter 上,每个分区中的每个唯一值将放置在其自己的单独文件中。

警告:这种方法可能会导致分区大小和任务执行时间不平衡。当列中的值与许多行关联时(例如,城市列 - 纽约市的文件可能有很多行),而其他值数量较少(例如,小镇的值),就会发生这种情况。

(2) df.sort(sortCols).write.parquet(writePath)

当您希望 (1) 写入的文件大小几乎相等 (2) 精确控制写入的文件数量时,此选项非常有用。这种方法首先对数据进行全局排序,然后找到将数据分解为k大小均匀的分区,其中k在spark配置中指定spark.sql.shuffle.partitions。这意味着具有相同排序键值的所有值都彼此相邻,但有时它们会跨越一个分割,并位于不同的文件中。如果您的用例要求具有相同键的所有行位于同一分区中,则不要使用此方法。

有两个额外的好处:(1) 通过对数据进行排序,通常可以减小其在磁盘上的大小(例如,按 user_id 对所有事件进行排序,然后按时间排序将导致列值出现大量重复,这有助于压缩)和 (2 )如果您写入支持它的文件格式(例如 Parquet),则后续读取器可以通过使用谓词下推以最佳方式读取数据,因为 parquet 编写器将在元数据中写入每列的 MAX 和 MIN 值,从而允许如果查询指定的值超出分区的(最小,最大)范围,则读取器会跳过行。

请注意,Spark 中的排序比仅仅重新分区更昂贵,并且需要额外的阶段。在幕后,Spark 将首先确定一个阶段中的拆分,然后将数据混入另一个阶段中的这些拆分中。

(3) df.rdd.partitionBy(customPartitioner).toDF().write.parquet(writePath)

如果您在 Scala 上使用 Spark,那么您可以编写一个客户分区器,它可以克服基于哈希的分区器的烦人问题。不幸的是,pySpark 中没有这个选项。如果你真的想在 pySpark 中编写自定义分区器,我发现这是可能的,尽管有点尴尬,通过使用rdd.repartitionAndSortWithinPartitions:

df.rdd \
  .keyBy(sort_key_function) \  # Convert to key-value pairs
  .repartitionAndSortWithinPartitions(numPartitions=N_WRITE_PARTITIONS, 
                                      partitionFunc=part_func) \
  .values() # get rid of keys \
.toDF().write.parquet(writePath)

也许其他人知道在 pyspark 中的数据帧上使用自定义分区器的更简单方法?

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

pyspark:有效地将partitionBy写入与原始表相同数量的总分区 的相关文章

随机推荐