我想使用 RDD 对Tuple2<byte[], obj>
, but byte[]
相同内容的内容由于参考值不同而被视为不同的值。
我没有看到任何可以传递自定义比较器的内容。我可以转换byte[]
into a String
具有明确的字符集,但我想知道是否有更有效的方法。
自定义比较器是不够的,因为 Spark 使用hashCode
用于组织分区中的键的对象的数量。 (至少 HashPartitioner 会做到这一点,您可以提供一个可以处理数组的自定义分区器)
包装数组以提供正确的equals
and hashCode
应该解决这个问题。
一个轻量级的包装器应该可以解决这个问题:
class SerByteArr(val bytes: Array[Byte]) extends Serializable {
override val hashCode = bytes.deep.hashCode
override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep
}
快速测试:
import scala.util.Random
val data = (1 to 100000).map(_ => Random.nextInt(100).toString.getBytes("UTF-8"))
val rdd = sparkContext.parallelize(data)
val byKey = rdd.keyBy(identity)
// this won't work b/c the partitioner does not support arrays as keys
val grouped = byKey.groupByKey
// org.apache.spark.SparkException: Default partitioner cannot partition array keys.
// let's use the wrapper instead
val keyable = rdd.map(elem => new SerByteArr(elem))
val bySerKey = keyable.keyBy(identity)
val grouped = bySerKey.groupByKey
grouped.count
// res14: Long = 100
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)