描述:
我们的spark版本是1.4.1
我们想要连接两个巨大的 RDD,其中之一带有倾斜数据。所以spark rdd操作join可能会导致内存问题。我们尝试将较小的一个分割成多个片段,然后分批广播它们。在每个广播回合中,我们尝试将较小的rdd的一部分收集到驱动程序,然后将其保存到HashMap,然后广播HashMap。每个执行器使用广播值对较大的rdd进行map操作。我们通过这种方式实现倾斜数据连接。
但是当它每轮处理广播值时。我们发现处理后我们不能破坏我们的广播值。如果我们使用broadcast.destroy(),下一轮我们处理数据将
触发错误。像这样:
java.io.IOException: org.apache.spark.SparkException: Attempted to use Broadcast(6) after it was destroyed (destroy at xxx.java:369)
我们查看了spark的源码,发现这个问题是由rdd依赖关系导致的。 if rdd3 -> rdd2 -> rdd1 (箭头显示依赖关系)。 rdd1是使用名为b1的广播变量生成的,rdd2使用b2生成的。生成rdd3时,源代码显示需要序列化b1和b2。如果b1或b2在rdd3生产过程之前被破坏。它将导致我上面列出的失败。
Question:
是否存在一种方法可以让rdd3忘记其依赖关系,使其在生成过程中不需要b1、b2,只需要rdd2?
或者是否存在处理倾斜连接问题的方法?
顺便说一句,我们为每个回合设置了检查点。并将spark.cleaner.ttl设置为600。问题仍然存在。如果我们不销毁广播变量,执行器将在第五回合中失败。
我们的代码是这样的:
for (int i = 0; i < times; i++) {
JavaPairRDD<Tuple2<String, String>, Double> prevItemPairRdd = curItemPairRdd;
List<Tuple2<String, Double>> itemSplit = itemZippedRdd
.filter(new FilterByHashFunction(times, i))
.collect();
Map<String, Double> itemSplitMap = new HashMap<String, Double>();
for (Tuple2<String, Double> item : itemSplit) {
itemSplitMap.put(item._1(), item._2());
}
Broadcast<Map<String, Double>> itemSplitBroadcast = jsc
.broadcast(itemSplitMap);
curItemPairRdd = prevItemPairRdd
.mapToPair(new NormalizeScoreFunction(itemSplitBroadcast))
.persist(StorageLevel.DISK_ONLY());
curItemPairRdd.count();
itemSplitBroadcast.destroy(true);
itemSplit.clear();
}