另一种方法是使用aggregateByKey
,专门用于将值组合成与原始值不同的类型:
measures.keyBy(_.getId)
.aggregateByKey(List[Measure]())(_ :+ _, _ ++ _)
这会为每个分区中的每个键创建一个空列表,将所有值附加到每个分区中的这些值,然后最后打乱列表以连接每个键的所有值。
在 Scala 中向列表追加是 O(n),最好是在前面添加,即 O(1),但看起来不太干净:
measures.keyBy(_.getId)
.aggregateByKey(List[Measure]())(_.+:(_), _ ++ _)
or:
measures.keyBy(_.getId)
.aggregateByKey(List[Measure]())((l, v) => v +: l, _ ++ _)
这可能比你的更有效率reduceByKey
示例,但情况reduceByKey
and aggregateByKey
远远优于groupByKey
您可以首先大幅减少数据大小,然后仅对较小的结果进行打乱。在这种情况下,您没有这种减少:中间列表包含您开始使用的所有数据,因此当组合每个分区列表时,您仍在对完整数据集进行洗牌(这对于使用reduceByKey
).
此外,正如 Zero323 指出的那样,groupByKey
在这种情况下实际上更有效,因为它知道正在构建所有数据的列表,并且可以专门为此执行优化:
- 它禁用映射端聚合,从而阻止使用所有数据构建大哈希映射
- 它使用智能缓冲区(
CompactBuffer
),与逐一构建不可变列表相比,这显着减少了内存分配量。
另一种情况是两者之间的差异groupByKey
and reduceByKey
or aggregateByKey
当键的数量不比值的数量少很多时,可能是最小的。