Apache Spark 中的混洗与非混洗合并

2024-04-30

在将 RDD 写入文件之前执行以下转换时,它们之间有什么区别?

  1. 合并(1,随机播放= true)
  2. 合并(1,随机播放=假)

代码示例:

val input = sc.textFile(inputFile)
val filtered = input.filter(doSomeFiltering)
val mapped = filtered.map(doSomeMapping)

mapped.coalesce(1, shuffle = true).saveAsTextFile(outputFile)
vs
mapped.coalesce(1, shuffle = false).saveAsTextFile(outputFile)

它与collect()相比如何?我完全知道 Spark 保存方法将使用 HDFS 样式的结构来存储它,但是我对收集()和混洗/非混洗合并()​​的数据分区方面更感兴趣。


coalesce(n, shuffle = true)这也相当于repartition(n)可能有,取决于什么mapping或您在父 RDD 中拥有的任何其他处理登录名,这对您的处理方式有相当大的影响job施行。

一般来说,当父分区中的数据分布均匀并且您没有大幅减少分区数量时,您应该避免使用shuffle使用时coalesce.

但是,在您的情况下,这会大大减少分区数量,并且根据文档 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@coalesce(numPartitions:Int,shuffle:Boolean,partitionCoalescer:Option%5Borg.apache.spark.rdd.PartitionCoalescer%5D)(implicitord:Ordering%5BT%5D):org.apache.spark.rdd.RDD%5BT%5D

然而,如果你正在进行剧烈的合并,例如至 numPartitions = 1, 这可能会导致您的计算发生在比 您喜欢(例如,在 numPartitions = 1 的情况下一个节点)。为了避免这种情况, 你可以通过 shuffle = true 。这将添加一个随机播放步骤,但意味着 当前的上游分区将并行执行(无论什么 当前的分区是)

鉴于此,现在您需要正确评估并做出选择

  • 洗牌潜在的大量数据but在父分区中进行计算在平行下
  • 将所有分区收集为一个没有完全改组(当然仍然会有数据移动)但是在内部进行计算单一任务

例如,考虑以下片段,这些片段与您可能拥有的实际逻辑相去甚远,但可以让您了解正在发生的事情

// fast
sc.parallelize(0 to 1000000, 10)
  .mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
  .coalesce(1, shuffle = true)
  .toDF.write.text("shuffleTrue")
// slow
sc.parallelize(0 to 1000000, 10)
  .mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
  .coalesce(1, shuffle = false)
  .toDF.write.text("shuffleFalse")

在我的集群上shuffle = true显示总时间大约为5秒完成10个任务,在每个父分区上并行执行计算逻辑。 另一个与shuffle = false大致有50 秒完成单个任务中的所有计算在一名执行人身上。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Spark 中的混洗与非混洗合并 的相关文章

随机推荐