这是一个非常简单的问题:在 Spark 中,broadcast
可用于有效地将变量发送给执行器。这是如何运作的 ?
更确切地说:
- 何时发送值:我一打电话就发送
broadcast
,或者何时使用这些值?
- 数据到底发送到哪里:发送给所有执行者,还是只发送给需要它的执行者?
- 数据存储在哪里?在内存中,还是在磁盘上?
- 简单变量和广播变量的访问方式有区别吗?当我致电时,幕后会发生什么
.value
方法 ?
简短回答
- 值在执行器第一次需要时发送。没有发送任何内容时
sc.broadcast(variable)
叫做。
- 数据仅发送到包含需要它的执行器的节点。
- 数据存储在内存中。如果没有足够的可用内存,则使用磁盘。
- 是的,访问局部变量和访问广播变量之间有很大的区别。广播变量必须在第一次访问时下载。
长答案
答案就在 Spark 的源代码中TorrentBroadcast.scala https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala.
-
When sc.broadcast
被称为,一个新的TorrentBroadcast
对象实例化自BroadcastFactory.scala
。以下情况发生在writeBlocks() https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L98,在初始化 TorrentBroadcast 对象时调用:
- 该对象使用以下方式在本地缓存非序列化MEMORY_AND_DISK http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence policy.
- 它是连载的。
- The serialized version is split into 4Mb blocks, that are compressed[0], and saved locally[1].
当新的执行者被创建时,他们只有轻量级的TorrentBroadcast
对象,仅包含广播对象的标识符及其块数。
-
The TorrentBroadcast
object has a lazy[2] property that contains its value. When the value
method is called, this lazy property is returned. So the first time this value function is called on a task, the following happens:
- 以随机顺序从本地块管理器获取块并进行解压缩。
- 如果它们不存在于本地块管理器中,getRemoteBytes https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L567调用块管理器来获取它们。网络流量发生只是在那个时候。
- 如果该块在本地不存在,则使用以下命令对其进行缓存MEMORY_AND_DISK_SER http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
[0] Compressed with lz4 by default. This can be tuned http://spark.apache.org/docs/latest/configuration.html#compression-and-serialization.
[1] The blocks are stored in the local block manager https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-blockmanager.html, using MEMORY_AND_DISK_SER http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence, which means that it spills partitions that don't fit in memory to disk. Each block has an unique identifier, computed from the identifier of the broadcast variable, and its offset. The size of blocks can be configured http://spark.apache.org/docs/latest/configuration.html#execution-behavior; it is 4Mb by default.
[2] A lazy val in scala is a variable whose value is evaluated the first time it is accessed, and then cached. See the documentation http://www.scala-lang.org/files/archive/spec/2.11/04-basic-declarations-and-definitions.html.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)