在下面的代码中,我尝试组合值:
val rdd: org.apache.spark.rdd.RDD[((String), Double)] =
sc.parallelize(List(
(("a"), 1.0),
(("a"), 3.0),
(("a"), 2.0)
))
val reduceByKey = rdd.reduceByKey((a , b) => String.valueOf(a) + String.valueOf(b))
reduceByValue
应该包含 (a , 1,3,2) 但收到编译时错误:
Multiple markers at this line - type mismatch; found : String required: Double - type mismatch; found : String
required: Double
什么决定了reduce函数的类型?类型不能转换吗?
我可以用groupByKey
达到相同的结果但只是想了解reduceByKey
.
不,给定一个 rdd 类型RDD[(K,V)]
, reduceByKey
将采用类型的关联函数(V,V) => V
.
如果我们想应用减少将值的类型更改为另一种任意类型,那么我们可以使用aggregateByKey
:
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)
使用zeroValue
和seqOp
函数,它在地图端提供类似折叠的操作,而关联函数combOp
结合了结果seqOp
到最终结果,就像reduceByKey 所做的那样。
正如我们从签名中可以看出的,虽然集合值是类型V
的结果aggregateByKey
将是任意类型U
应用到上面的例子中,aggregateByKey
看起来像这样:
rdd.aggregateByKey("")({case (aggr , value) => aggr + String.valueOf(value)}, (aggr1, aggr2) => aggr1 + aggr2)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)