我正在将 Spark 与 scala 一起使用,并且我有一个充满 tuple2 的 RDD,其中包含一个复杂对象作为键和一个 double 。目的是如果对象相同,则加倍(频率)。
为此,我将我的对象定义如下:
case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Ordered[SimpleCoocurrence]{
def compare(that: SimpleCoocurrence) = {
if(this.word.equals(that.word)&&this.word_pos.equals(that.word_pos)
&&this.cooc.equals(that.cooc)&&this.cooc_pos.equals(that.cooc_pos))
0
else
this.toString.compareTo(that.toString)
}
}
现在我尝试像这样使用reduceBykey:
val coocRDD = sc.parallelize(coocList)
println(coocRDD.count)
coocRDD.map(tup=>tup).reduceByKey(_+_)
println(coocRDD.count)
但是,结果表明,处理reducebykey之前和之后的RDD包含完全相同数量的元素。
如何使用 tuple2[SimpleCoocurrence,Double] 执行 reduceByKey ?
实现 Ordered 特征是告诉 Spark 如何比较我的对象的好方法吗?
我应该只使用 tuple2[String,Double] 吗?
thx,
reduceByKey
不使用排序,但是hashCode
and equals
确定哪些键是相同的。特别是,hashPartitioner
将按哈希对键进行分组,以便具有相同 hashCode 的键落在同一分区上,从而可以在每个分区上进一步减少。
案例类有一个默认实现equals
and hashCode
。可能使用的测试数据具有不同的字段值distance:Double
使每个实例成为唯一的对象。使用它作为键将导致仅相同的对象被减少为一个。
解决这个问题的一种方法是为您的case class
以及对象的添加方法,如下所示:
case class SimpleCoocurrence(word:String, word_pos:String, cooc:String, cooc_pos:String, distance:Double) extends Serializable {
val key = word + word_pos + cooc + cooc_pos
}
object SimpleCoocurrence {
val add: (SimpleCoocurrence, SimpleCoocurrence) => SimpleCoocurrence = ???
}
val coocList:List[SimpleCoocurrence] = ???
val coocRDD = sc.parallelize(coocList)
val coocByKey = coocRDD.keyBy(_.key)
val addedCooc = coocByKey.reduceByKey(SimpleCoocurrence.add)
(*) 作为指导示例提供的代码 - 未编译或测试。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)