dask:并行模型中的共享内存

2024-01-10

我已经阅读了 dask 文档、博客等,但我仍然不是 100% 清楚如何做到这一点。我的用例:

  • 我有大约 10GB 的参考数据。一旦加载,它们就是只读的。通常我们将它们加载到 Dask/Pandas 数据框中
  • 我需要这些参考数据来处理(丰富、修改、转换)每天大约 500 个 mio 事件(多个文件)
  • “流程”是大约 40 个任务的管道。执行顺序是相关的(依赖性)。
  • 每个单独的任务并不复杂或耗时,主要是查找、丰富、映射等。
  • 事件之间不存在依赖性。理论上,我可以通过单独的线程处理每个事件,将输出合并到一个文件中,然后就完成了。输出事件甚至不需要与输入事件具有相同的顺序。

总之:

  • 我们可以大规模并行化事件处理
  • 每个并行线程都需要相同的 10 GB(原始)引用数据
  • 处理单个事件意味着将 40 个任务的序列/管道应用于它们
  • 每个单独的任务并不耗时(读取参考数据并修改事件)

可能的陷阱/问题:

  • 花费更多的时间在序列化/反序列化上,而不是处理数据(我们在一些使用类似管道的方法的试验中确实经历过这种情况)
  • 引用数据被多次加载,每个(并行)进程加载一次
  • 最好我想在我的笔记本电脑上开发/测试它,但我没有足够的内存来加载参考数据。可能是解决方案是否会利用内存映射?

最有效的解决方案似乎是,如果我们只能将引用数据加载到内存中一次,则使其可供处理事件的多个其他进程只读

通过在每台计算机中加载参考数据来扩展到多台计算机。将文件名推送到计算机以执行。

知道如何实现这一目标吗?

非常感谢你的帮助


我还遇到过运行令人尴尬的并行作业的类似问题,这些作业都在同一个查找“引用”表(或并行进程的每个实例所需的任何大内存只读变量)中获取数据。在遵循“写时复制”语义的环境中(例如linux),将查找表放置在全局范围内总是非常有效,如下所示:多处理中的共享内存对象 https://stackoverflow.com/questions/10721915/shared-memory-objects-in-multiprocessing

这是一个简单的并行工作流程:

from multiprocessing import Pool

# Load your reference data, do that only once 
# here in the parent process
my_ref_lookup = load_ref_data(your_data_file)

def your_parallel_function(my_file_path):
    my_new_data = load_data(my_file_path)
    # process my_new_data with some lookup in my_ref_lookup 
    # which is known from the parent process. 

    processed_data = do_stuff(my_new_data)

    # you could here write something on disk
    # and/or return the processed_data

    return processed_data

with Pool(processes = 5) as Pool:
   list_of_result = Pool.map(your_parallel_function, your_list_of_file_paths)

这里执行的是your_parallel_function将并行执行,例如5个worker,在里面取5个文件your_list_of_file_paths一次所有子进程都可以访问my_ref_lookup无需复制它们。

在使用 Dask 和 bag 系列一段时间后,我从未发现过比这类似或更简单的行为。在我尝试使用 Dask 时,在全局范围内以这种方式共享的只读变量最终被尽可能多的需要它的工作人员复制,这导致内存爆炸并导致我的内核崩溃。我从未在 Dask 的任何文档中看到过这种情况的处理。 Dask 文档中唯一与此相关的远程参考是关于避免全局状态:https://docs.dask.org/en/latest/delayed-best-practices.html#avoid-global-state https://docs.dask.org/en/latest/delayed-best-practices.html#avoid-global-state但这显示了共享变量被延迟函数修改的情况,这与当前仅共享“只读”数据的问题不同。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

dask:并行模型中的共享内存 的相关文章

  • 如何从字典构造defaultdict?

    如果我有d dict zip range 1 10 range 50 61 我怎样才能建立一个collections defaultdict出于dict 唯一的论点defaultdict似乎采取的是工厂功能 我必须初始化然后再经历原来的d并
  • 字符串的“相关矩阵”。名义数据的相似度

    这是我的数据框 df store 1 store 2 store 3 store 4 0 banana banana plum banana 1 orange tangerine pear orange 2 apple pear melon
  • 通过pip安装lxml时出错:需要Microsoft Visual C++ 14.0

    我使用的是 Windows 10 机器 最近从 python 2 7 迁移到 3 5 当尝试通过 pip 安装 lxml 时 它会停止并抛出此错误消息 构建 lxml etree 扩展错误 需要 Microsoft Visual C 14
  • SQLAlchemy 闭包表关系定义

    我最近开始使用 SQL Alchemy 开展一个涉及攀岩区域和路线的项目 区域是分层的 因为单个区域可以包含多个区域 而多个区域又可以包含其他区域 路线直接与单个区域关联 但也与该区域的父区域关联 等等 为了实现这一点 我选择使用Bill
  • pandas 用 nan 值切割了一系列

    我想将 pandas cut 函数应用于包含 NaN 的序列 期望的行为是它对非 NaN 元素进行存储并为 NaN 元素返回 NaN import pandas as pd numbers with nan pd Series 3 1 2
  • Bokeh 相当于 matplotlib 子图

    我正在寻找一种方法来创建包含多个子图的绘图 例如 fig ax0 ax1 plt subplots nrows 2 sharex True 可以在 matplotlib 中完成 然后可以通过以下方式解决ax0 and ax1 有没有办法在
  • 使用数据库数据模型生成 SQLAlchemy 模型、架构和 JSON 响应

    将 Flask 和 SQLAlchemy 用于 Python Web 应用程序 我的目标是创建一个系统 在其中我可以 从现有 PostgreSQL 数据库导入数据模型 并将它们映射到相应 SQLAlchemy 模型中的字段 使用这些 SQL
  • 抓取多个帐户,即多次登录

    我可以成功抓取单个帐户的数据 我想在一个网站上抓取多个帐户 这意味着多次登录 如何管理登录 注销 您可以在每个帐户会话中使用多个 cookiejar 并行抓取多个帐户 请参阅 cookiejar 请求元密钥http doc scrapy o
  • Python 有哪些 SOAP 客户端库,它们的文档在哪里? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 在 matplotlib 中查看然后自动关闭图形?

    我必须检查我的参数设置是否正确 因此我需要绘制许多图 为了绘制这些图 我选择使用 matplotlib 每次检查后 我需要单击左上角的关闭按钮 这很微不足道 那么有没有什么方法可以让剧情在3 5秒左右显示并且无需点击就自动关闭呢 我知道关于
  • 我的本地 postgresql 数据库 url 的形式是什么?

    我正在学习 Flask sqlalchemy 教程https pythonhosted org Flask SQLAlchemy quickstart html a minimal application https pythonhoste
  • Pandas 无法读取使用 h5py 创建的 hdf5 文件

    当我尝试读取使用 h5py 创建的 HDF5 格式文件时 出现 pandas 错误 我想知道我是否只是做错了什么 import h5py import numpy as np import pandas as pd h5 file h5py
  • PySide2/QML 填充 Gridview 模型/委托并为其设置动画

    我是 QML 的新手 正在寻求以下几点帮助 如何基于 TextField 输入 如 Regex 通过 PySide2 过滤 Gridview 模型中的 QAbstractListModel 数据 标题 如何在鼠标悬停时为 Gridview
  • 如何用不同的颜色填充seaborn.distplot中的区域

    是否可以用颜色填充两条阈值线 line1 和 line2 之外的区域 并通过 distplot 绘制的 KDE 曲线限制 Y 轴 代表我的应用程序的 3 sigmas import pylab as pl import seaborn as
  • 随机数生成器每次仅返回一个数字

    Python 是否有一个随机数生成器 每次只返回一个随机整数next 函数被调用 数字不应该重复并且生成器应返回区间内的随机整数 1 1 000 000 这是独一无二的 我需要生成超过一百万个不同的数字 这听起来好像非常消耗内存 以防所有数
  • 在 matplotlib 中添加新的导航模式

    我正在编写一个 wx matplotlib 应用程序 并且在向 matplotlib 导航工具栏添加新工具时遇到相当大的困难 基本上我想添加选择工具 选取框 套索等 以切换受控子图的鼠标模式 到目前为止 我还没有找到任何功能可以让我轻松地做
  • 使用 PyODBC 选择表中的列名

    我正在编写一个 Python 程序 该程序使用 PyODBC 从 Microsoft Access mdb 文件中选择一些数据 我需要发现几个不同表的列名 在 SQL Server 中 这可以通过使用类似的查询来完成 SELECT c na
  • 如何从已安装的云端硬盘文件夹中永久删除?

    我编写了一个脚本 在每次迭代后将我的模型和训练示例上传到 Google Drive 以防发生崩溃或任何阻止笔记本运行的情况 如下所示 drive path drive My Drive Colab Notebooks models if p
  • 从 C++ 检索 Python 类型

    这个问题实际上是以下两个问题的延伸 如何在 Python 中实现 C 类 以供 C 调用 https stackoverflow com questions 9040669 how can i implement a c class in
  • Django migrate:不创建表

    经过一些错误后 我删除了数据库 删除了所有迁移文件 我留下了init py 现在 当我跑步时 python migrate py makemigrations It creates migrations correctly python m

随机推荐