当我编写 RDD 转换时,例如
val rdd = sc.parallelise(1 to 1000)
rdd.map(x => x * 3)
据我了解,关闭(x => x * 3
)这只是一个 Function1 需要可序列化,并且我想我在某处读过编辑:它就在文档中暗示的地方:http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark http://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark它被“发送”给工人执行。 (例如 Akka 向工作人员发送一段“可执行代码”来运行)
是这样的吗?
我参加的一次聚会上有人评论说,它实际上并没有发送任何序列化代码,但由于每个工作人员无论如何都会获得 jar 的“副本”,因此它只需要引用要运行的函数或类似的东西(但我不确定我是否正确引用了那个人的话)
我现在对它的实际工作原理感到非常困惑。
所以我的问题是
如何向工人发送转型关闭信息?通过akka连载?或者它们“已经在那里”,因为 Spark 将整个 uber jar 发送给每个工作人员(对我来说听起来不太可能......)
如果是这样,那么罐子的其余部分如何发送给工人呢?这是“cleanupClosure”在做什么吗?例如仅向工作人员发送相关字节码而不是整个 uberjar? (例如,只有闭包的依赖代码?)
总而言之,spark 是否在任何时候都会同步--jars
与工人的类路径以某种方式?或者它是否向工作人员发送“适量”的代码?如果它确实发送了闭包,它们是否会被缓存以供重新计算的需要?或者每次安排任务时都会发送带有任务的闭包?抱歉,如果这是愚蠢的问题,但我真的不知道。
如果可以的话,请添加来源作为您的答案,我在文档中找不到明确的内容,而且我太谨慎了,无法仅通过阅读代码来得出结论。
闭包肯定是在运行时序列化的。我在 pyspark 和 scala 中看到过很多在运行时出现 Closure Not Serialized 异常的实例。有一个复杂的代码称为
From ClosureCleaner.scala
def clean(
closure: AnyRef,
checkSerializable: Boolean = true,
cleanTransitively: Boolean = true): Unit = {
clean(closure, checkSerializable, cleanTransitively, Map.empty)
}
试图缩小正在序列化的代码。然后,代码通过网络发送 - 如果它是可序列化的。否则会抛出异常。
这是 ClosureCleaner 的另一个摘录,用于检查序列化传入函数的能力:
private def ensureSerializable(func: AnyRef) {
try {
if (SparkEnv.get != null) {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)