aggregateByKey() 与reduceByKey 有很大不同。所发生的情况是,reduceByKey 是aggregateByKey 的一种特殊情况。
aggregateByKey() 将组合特定键的值,这种组合的结果可以是您指定的任何对象。您必须指定如何在一个分区(在同一节点中执行)内组合(“添加”)值以及如何组合来自不同分区(可能位于不同节点中)的结果。 reduceByKey 是一种特殊情况,因为组合的结果(例如求和)与值的类型相同,并且从不同分区组合时的操作也与组合内部值时的操作相同。分割。
一个例子:
想象一下你有一个配对列表。您将其并行化:
val pairs = sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
现在你想通过按键“组合”它们来产生总和。在这种情况下,reduceByKey 和aggregateByKey 是相同的:
val resReduce = pairs.reduceByKey(_ + _) //the same operation for everything
resReduce.collect
res3: Array[(String, Int)] = Array((b,7), (a,9))
//0 is initial value, _+_ inside partition, _+_ between partitions
val resAgg = pairs.aggregateByKey(0)(_+_,_+_)
resAgg.collect
res4: Array[(String, Int)] = Array((b,7), (a,9))
现在,假设您希望聚合是一组值,这是与整数不同的类型(整数之和也是整数):
import scala.collection.mutable.HashSet
//the initial value is a void Set. Adding an element to a set is the first
//_+_ Join two sets is the _++_
val sets = pairs.aggregateByKey(new HashSet[Int])(_+_, _++_)
sets.collect
res5: Array[(String, scala.collection.mutable.HashSet[Int])] =Array((b,Set(7)), (a,Set(1, 5, 3)))