我已经阅读了 dask 文档、博客等,但我仍然不是 100% 清楚如何做到这一点。我的用例:
- 我有大约 10GB 的参考数据。一旦加载,它们就是只读的。通常我们将它们加载到 Dask/Pandas 数据框中
- 我需要这些参考数据来处理(丰富、修改、转换)每天大约 500 个 mio 事件(多个文件)
- “流程”是大约 40 个任务的管道。执行顺序是相关的(依赖性)。
- 每个单独的任务并不复杂或耗时,主要是查找、丰富、映射等。
- 事件之间不存在依赖性。理论上,我可以通过单独的线程处理每个事件,将输出合并到一个文件中,然后就完成了。输出事件甚至不需要与输入事件具有相同的顺序。
总之:
- 我们可以大规模并行化事件处理
- 每个并行线程都需要相同的 10 GB(原始)引用数据
- 处理单个事件意味着将 40 个任务的序列/管道应用于它们
- 每个单独的任务并不耗时(读取参考数据并修改事件)
可能的陷阱/问题:
- 花费更多的时间在序列化/反序列化上,而不是处理数据(我们在一些使用类似管道的方法的试验中确实经历过这种情况)
- 引用数据被多次加载,每个(并行)进程加载一次
- 最好我想在我的笔记本电脑上开发/测试它,但我没有足够的内存来加载参考数据。可能是解决方案是否会利用内存映射?
最有效的解决方案似乎是,如果我们只能将引用数据加载到内存中一次,则使其可供处理事件的多个其他进程只读
通过在每台计算机中加载参考数据来扩展到多台计算机。将文件名推送到计算机以执行。
知道如何实现这一目标吗?
非常感谢你的帮助
我还遇到过运行令人尴尬的并行作业的类似问题,这些作业都在同一个查找“引用”表(或并行进程的每个实例所需的任何大内存只读变量)中获取数据。在遵循“写时复制”语义的环境中(例如linux),将查找表放置在全局范围内总是非常有效,如下所示:多处理中的共享内存对象 https://stackoverflow.com/questions/10721915/shared-memory-objects-in-multiprocessing
这是一个简单的并行工作流程:
from multiprocessing import Pool
# Load your reference data, do that only once
# here in the parent process
my_ref_lookup = load_ref_data(your_data_file)
def your_parallel_function(my_file_path):
my_new_data = load_data(my_file_path)
# process my_new_data with some lookup in my_ref_lookup
# which is known from the parent process.
processed_data = do_stuff(my_new_data)
# you could here write something on disk
# and/or return the processed_data
return processed_data
with Pool(processes = 5) as Pool:
list_of_result = Pool.map(your_parallel_function, your_list_of_file_paths)
这里执行的是your_parallel_function
将并行执行,例如5个worker,在里面取5个文件your_list_of_file_paths
一次所有子进程都可以访问my_ref_lookup
无需复制它们。
在使用 Dask 和 bag 系列一段时间后,我从未发现过比这类似或更简单的行为。在我尝试使用 Dask 时,在全局范围内以这种方式共享的只读变量最终被尽可能多的需要它的工作人员复制,这导致内存爆炸并导致我的内核崩溃。我从未在 Dask 的任何文档中看到过这种情况的处理。 Dask 文档中唯一与此相关的远程参考是关于避免全局状态:https://docs.dask.org/en/latest/delayed-best-practices.html#avoid-global-state https://docs.dask.org/en/latest/delayed-best-practices.html#avoid-global-state但这显示了共享变量被延迟函数修改的情况,这与当前仅共享“只读”数据的问题不同。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)