现在我有3个这样的RDD:
rdd1:
1 2
3 4
5 6
7 8
9 10
rdd2:
11 12
13 14
rdd3:
15 16
17 18
19 20
我想这样做:
rdd1.zip(rdd2.union(rdd3))
我想要的结果是这样的:
1 2 11 12
3 4 13 14
5 6 15 16
7 8 17 18
9 10 19 20
但我有一个例外是这样的:
线程“main”中的异常 java.lang.IllegalArgumentException:无法压缩分区数量不等的 RDD
有人告诉我我可以毫无例外地做到这一点:
rdd1.zip(rdd2.union(rdd3).repartition(1))
不过好像是有点成本。所以我想知道是否还有其他方法可以解决这个问题。
我不确定你所说的“成本”是什么意思,但你的怀疑是对的repartition(1)
不是正确的解决方案。它将 RDD 重新分区为单个分区。
- 如果您的数据不适合单台机器,则此操作将会失败。
- 它仅在以下情况下有效
rdd1
有一个分区。当您拥有更多数据时,这可能不再成立。
-
repartition
执行一个shuffle,因此您的数据最终可能会以不同的方式排序。
我认为正确的解决方案是放弃使用zip
,因为您可能无法确保分区匹配。创建密钥并使用join
反而:
val indexedRDD1 = rdd1.zipWithIndex.map { case (v, i) => i -> v }
val indexedRDD2 = rdd2.zipWithIndex.map { case (v, i) => i -> v }
val offset = rdd2.count
val indexedRDD3 = rdd3.zipWithIndex.map { case (v, i) => (i + offset) -> v }
val combined =
indexedRDD1.leftOuterJoin(indexedRDD2).leftOuterJoin(indexedRDD3).map {
case (i, ((v1, v2Opt), v3Opt)) => i -> (v1, v2Opt.getOrElse(v3Opt.get))
}
无论分区如何,这都将起作用。如果愿意,您可以对结果进行排序并删除末尾的索引:
val unindexed = combined.sortByKey().values
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)