coalesce(n, shuffle = true)
这也相当于repartition(n)
可能有,取决于什么mapping或您在父 RDD 中拥有的任何其他处理登录名,这对您的处理方式有相当大的影响job施行。
一般来说,当父分区中的数据分布均匀并且您没有大幅减少分区数量时,您应该避免使用shuffle使用时coalesce
.
但是,在您的情况下,这会大大减少分区数量,并且根据文档 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD@coalesce(numPartitions:Int,shuffle:Boolean,partitionCoalescer:Option%5Borg.apache.spark.rdd.PartitionCoalescer%5D)(implicitord:Ordering%5BT%5D):org.apache.spark.rdd.RDD%5BT%5D
然而,如果你正在进行剧烈的合并,例如至 numPartitions = 1,
这可能会导致您的计算发生在比
您喜欢(例如,在 numPartitions = 1 的情况下一个节点)。为了避免这种情况,
你可以通过 shuffle = true 。这将添加一个随机播放步骤,但意味着
当前的上游分区将并行执行(无论什么
当前的分区是)
鉴于此,现在您需要正确评估并做出选择
-
洗牌潜在的大量数据but在父分区中进行计算在平行下
- 将所有分区收集为一个没有完全改组(当然仍然会有数据移动)但是在内部进行计算单一任务
例如,考虑以下片段,这些片段与您可能拥有的实际逻辑相去甚远,但可以让您了解正在发生的事情
// fast
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = true)
.toDF.write.text("shuffleTrue")
// slow
sc.parallelize(0 to 1000000, 10)
.mapPartitions(it => {Thread.sleep(5000); it.map(_.toString)})
.coalesce(1, shuffle = false)
.toDF.write.text("shuffleFalse")
在我的集群上shuffle = true
显示总时间大约为5秒完成10个任务,在每个父分区上并行执行计算逻辑。
另一个与shuffle = false
大致有50 秒完成单个任务中的所有计算在一名执行人身上。