我正在尝试使用 repartition() 方法更改 RDD 的分区大小。 RDD 上的方法调用成功,但是当我使用 RDD 的 partition.size 属性显式检查分区大小时,我得到了与最初具有相同数量的分区:-
scala> rdd.partitions.size
res56: Int = 50
scala> rdd.repartition(10)
res57: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at repartition at <console>:27
在这个阶段,我执行一些像 rdd.take(1) 这样的操作只是为了强制评估,以防万一这很重要。然后我再次检查分区大小:-
scala> rdd.partitions.size
res58: Int = 50
正如人们所看到的,它没有改变。有人可以回答为什么吗?
首先,您运行一个操作确实很重要repartition
确实是懒。第二,repartition
返回一个新的RDD
分区已更改,因此您必须使用返回的RDD
否则您仍在使用旧的分区。最后,在缩小分区时,您应该使用coalesce
,因为这不会重新排列数据。相反,它将保留节点数量的数据并拉入剩余的孤立节点。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)