我有一个在 Spark 上运行的迭代应用程序,我将其简化为以下代码:
var anRDD: org.apache.spark.rdd.RDD[Int] = sc.parallelize((0 to 1000))
var c: Long = Int.MaxValue
var iteration: Int = 0
while (c > 0) {
iteration += 1
// Manipulate the RDD and cache the new RDD
anRDD = anRDD.zipWithIndex.filter(t => t._2 % 2 == 1).map(_._1).cache() //.localCheckpoint()
// Actually compute the RDD and spawn a new job
c = anRDD.count()
println(s"Iteration: $iteration, Values: $c")
}
后续作业中的内存分配会发生什么情况?
- 目前是否
anRDD
“覆盖”以前的还是它们都保留在内存中?从长远来看,这可能会引发一些内存异常
- Do
localCheckpoint
and cache
有不同的行为?如果localCheckpoint
用于代替cache
, as localCheckpoint
截断 RDD 谱系,那么我希望之前的 RDD 会被覆盖
不幸的是 Spark 似乎不太适合这样的事情。
您最初的实现是不可行的,因为在每次迭代中,新的 RDD 都会对旧的 RDD 进行内部引用,因此所有 RDD 都会堆积在内存中。
localCheckpoint
是您想要实现的目标的近似值。它确实截断了 RDD 的谱系,但你失去了容错能力。该方法的文档中已明确说明。
checkpoint
也是一种选择。它是安全的,但它会在每次迭代时将数据转储到 hdfs。
考虑重新设计该方法。此类黑客迟早会造成损失。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)