对于 Spark 专家来说,这是一个很好的问题。
我正在处理数据map
操作(RDD)。在映射器函数中,我需要查找类的对象A
用于处理 RDD 中的元素。
由于这将在执行器上执行并创建类型的元素A
(将被查找)恰好是一个昂贵的操作,我想在每个执行器上预加载和缓存这些对象。最好的方法是什么?
理想情况下,我想通过驱动程序启动期间在任何操作之前可用的参数来指定将在执行器上加载一次的内容(包括流式处理的情况,以便查找表在批次之间保留在内存中)。数据得到处理。
是否有一种干净而优雅的方法来做到这一点,或者是不可能实现的?
这正是目标用例broadcast.
广播变量传输一次并使用 torrent 有效地移动到所有执行器,并保留在内存/本地磁盘中,直到您不再需要它们。
在使用其他人的接口时,序列化经常会成为一个问题。如果您可以强制您使用的对象是可序列化的,那么这将是最好的解决方案。如果这是不可能的,你的生活就会变得更加复杂。如果您无法序列化A
对象,那么您必须在每个任务的执行器上创建它们。如果它们存储在某个文件中,则如下所示:
rdd.mapPartitions { it =>
val lookupTable = loadLookupTable(path)
it.map(elem => fn(lookupTable, elem))
}
请注意,如果您使用此模型,则必须为每个任务加载一次查找表 - 您无法从广播变量的跨任务持久性中受益。
编辑:这是另一个模型,我相信它可以让您在每个 JVM 的任务之间共享查找表。
class BroadcastableLookupTable {
@transient val lookupTable: LookupTable[A] = null
def get: LookupTable[A] = {
if (lookupTable == null)
lookupTable = < load lookup table from disk>
lookupTable
}
}
这个类可以被广播(不传输任何实质性内容),并且第一次每个 JVM 调用它时,您将加载查找表并返回它。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)