我是 Apache Spark 的新手,正在学习基本功能。
有一个小疑问。假设我有一个元组(键,值)的 RDD,并且想从中获取一些唯一的元组。我使用distinct()函数。我想知道该函数基于什么基础认为元组是不同的..?是基于键、值还是两者?
.distinct()
肯定是跨分区进行随机播放。要了解更多发生的情况,请运行.toDebugString
在你的 RDD 上。
val hashPart = new HashPartitioner(<number of partitions>)
val myRDDPreStep = <load some RDD>
val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)
对于我的 RDD 示例(myRDDPreStep 已按键进行哈希分区,由 StorageLevel.MEMORY_AND_DISK_SER 保存并设置检查点),返回:
(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
| myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]
请注意,可能有更有效的方法来获得涉及更少洗牌的不同,特别是如果您的 RDD 已经以智能方式分区并且分区没有过度倾斜。
See 有没有办法重写Spark RDD unique以使用mapPartitions而不是distinct? https://stackoverflow.com/questions/31082066/is-there-a-way-to-rewrite-spark-rdd-distinct-to-use-mappartitions-instead-of-di
and
Apache Spark:使用 RDD.aggregateByKey() 的 RDD.groupByKey() 的等效实现是什么? https://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)