好吧,让你的数据集变得更有趣:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
我们有六个要素:
rdd.count
Long = 6
无分区器:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
和八个分区:
rdd.partitions.length
Int = 8
现在让我们定义一个小助手来计算每个分区的元素数量:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
由于我们没有分区器,我们的数据集在分区之间均匀分布(Spark 中的默认分区方案 https://stackoverflow.com/q/34491219/1560062):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
现在让我们重新分区我们的数据集:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
由于参数传递给HashPartitioner
定义我们期望一个分区的分区数量:
rddOneP.partitions.length
Int = 1
由于我们只有一个分区,因此它包含所有元素:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
请注意,随机播放后值的顺序是不确定的。
如果我们使用同样的方式HashPartitioner(2)
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
我们将得到 2 个分区:
rddTwoP.partitions.length
Int = 2
Since rdd
按关键数据分区将不再均匀分布:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
因为 with 具有三个键并且只有两个不同的值hashCode
mod numPartitions
这里没有什么意外的:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
只是为了确认以上内容:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
最后与HashPartitioner(7)
我们得到七个分区,其中三个非空,每个分区有 2 个元素:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
总结和注释
-
HashPartitioner
采用一个定义分区数量的参数
-
使用以下方法将值分配给分区hash
的钥匙。hash
函数可能因语言而异(Scala RDD 可能使用hashCode
, DataSets
使用 MurmurHash 3、PySpark、portable_hash https://github.com/apache/spark/blob/330c3e33bd10f035f49cf3d13357eb2d6d90dabc/python/pyspark/rdd.py#L59-L87).
在像这样的简单情况下,其中 key 是一个小整数,您可以假设hash
是一个身份(i = hash(i)
).
Scala API 使用nonNegativeMod https://github.com/apache/spark/blob/4e27578faa67c7a71a9b938aafbaf79bdbf36831/core/src/main/scala/org/apache/spark/util/Utils.scala#L1663-L1666根据计算的哈希值确定分区,
如果密钥的分布不均匀,您可能会遇到部分集群空闲的情况
-
键必须是可散列的。你可以查看我的回答作为 PySpark 的 reduceByKey 的键的列表 https://stackoverflow.com/a/31404405/1560062阅读有关 PySpark 特定问题的信息。另一个可能的问题突出显示HashPartitioner 文档 https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.HashPartitioner:
Java 数组的 hashCode 是基于数组的身份而不是其内容,因此尝试对 RDD[Array[]] 或 RDD[(数组[], _)] 使用 HashPartitioner 将产生意外或不正确的结果。
在 Python 3 中,您必须确保散列是一致的。看异常:应通过 PYTHONHASHSEED 禁用字符串哈希的随机性在 pyspark 中意味着什么? https://stackoverflow.com/q/36798833/1560062
哈希分区器既不是单射的也不是满射的。可以将多个键分配给单个分区,并且某些分区可以保留为空。
请注意,当前基于哈希的方法在与 REPL 定义的案例类结合使用时在 Scala 中不起作用(Apache Spark 中的案例类相等 https://stackoverflow.com/q/35301998/1560062).
HashPartitioner
(或任何其他Partitioner
) 打乱数据。除非在多个操作之间重用分区,否则它不会减少要洗牌的数据量。