将 celery 任务结果链接到通讯组中

2024-04-15

Like in 这另一个问题 https://stackoverflow.com/questions/13271056/how-to-chain-a-celery-task-that-returns-a-list-into-a-group,我想从 celery 任务返回的列表创建一个 celery 组。这个想法是,第一个任务将返回一个列表,第二个任务将该列表分解为列表中每个项目的并发任务。

计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是下载页面、处理页面,然后将其上传到 s3 的链。最后,一旦所有子页面完成,该网站就会在我们的数据库中标记为已完成。就像是:

chain(
    get_links_from_website.si('https://www.google.com'),
    dmap.s(  # <-- Distributed map
        download_sub_page.s() | 
        process_sub_page.s() | 
        upload_sub_page_to_s3.s()
    ),
    mark_website_done.s()
)

到目前为止我看到的解决方案似乎在这方面做得足够好,但是当第二个任务是一个链时,由于以下问题而失败clone不进行深层复制(请参阅对此答案的评论 https://stackoverflow.com/q/13271056/64911详情):

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

它还存在一个问题,如果迭代的长度为 10,000 个项目,它将创建一个包含 10,000 个项目的组。正如您可以想象的那样,这会增加我们的内存使用量。

所以,我正在寻找一种方法dmap that:

  • 不会通过创建巨大的组来耗尽 RAM(也许有一种方法可以对可迭代对象进行分块?)
  • 适用于 celery 链,没有深度复制问题。

芹菜画布提供chunks https://celery.readthedocs.io/en/latest/userguide/canvas.html#chunks将任务分成块。不幸的是,这不适用于链、组等原语。

您可以使用 celery 信号来防止 dmap/clone 出现问题。

ch = chain(
    download_sub_page.s(),
    process_sub_page.s(),
    upload_sub_page.s(),
)

@task_success.connect(sender='get_links_from_website')
def task_success_handler(sender=None, headers=None, body=None, **kwargs):
    result = kwargs['result']    
    header = [ch(i) for i in result]
    callback = mark_website_done.si()
    chord(header)(callback)

创建一个用于处理页面的链,并使用弦将最后一个任务挂接到它。每当get_links_from_website运行成功。

根据链所花费的时间,您还可以保存结果get_links_from_website某处。然后迭代一批它们以对链进行排队,并且对于最后一批,您可以将回调挂钩到最后一个任务。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 celery 任务结果链接到通讯组中 的相关文章

随机推荐

  • UIMenuController 在 iOS 13.2 中不可见

    我有长按处理程序显示UIMenuController 它在 private func longPressHandler sender UILongPressGestureRecognizer guard sender state began
  • HMM 如何用于手写识别?

    这个问题与传统的手写识别有点不同 我有一个包含数千个以下内容的数据集 对于一个绘制的角色 我有几个连续的 x y 按下笔的坐标 所以 这是一个顺序 时间 问题 我希望能够根据这些数据对手写字符进行分类 并且希望实现 HMM 来实现学习目的
  • Dapper 是否支持在单个查询中插入多行?

    Does StackExchange Dapper https github com StackExchange Dapper支持以下SQL语法吗 INSERT INTO MyTable a b c VALUES 1 2 3 4 5 6 7
  • 在 Visual Studio 2013 中禁用 git

    我有一个由 tfs 管理的存储库 然而在本地 我想通过 git 管理它 并将更改推送到 tfs 一旦我在团队资源管理器中创建 git 存储库 VS2013 就会显示该解决方案仅由 git 管理 如果我尝试编辑任何文件 它会抱怨该文件是只读的
  • 如何通过php按修改日期对文件进行排序

    背景 我有一个匿名登录 ftp 服务器 ftp nlist 仅按字母顺序列出文件 我想根据上次修改日期获取文件列表 最近的在前 我尝试了 ftp exec conn ls t 但出现了 权限被拒绝 错误 不知道为什么它不起作用 好吧 我正在
  • 使用 Plot 的线图重叠

    我有以下用于图表的选项 div div
  • 通过浏览器使用 PDO 将 MySQL 表中的数据存储为 CSV

    我有一个将数据写入 MySQL 数据库的表单 我希望用户能够下载他们的数据CSV最终提交后的格式 我的代码当前正在将数据库的内容转储到浏览器中 即它被写入页面 而不是写入 csv 文件 我想将他们发送到一个链接并提供下载文件的选项 这是我当
  • 从互联网读取数据

    我在网络服务器上有一个包含数据的远程文件夹 我使用以下方式访问数据 myData lt read table http myData csv sep header T 有没有办法对远程文件夹进行密码保护并在上述命令中输入授权 Thx 你可以
  • 使用不受支持的 WebKit 属性会产生什么影响?

    我有兴趣使用 webkit line clamp在混合 iOS 应用程序中 我已阅读苹果文档 https developer apple com library safari documentation AppleApplications
  • 合并时忽略文件/文件夹

    我目前正在使用 SVN 对我的软件项目进行版本控制 在一个正在进行的项目中 我有用于客户通用功能和规范的主干 以及用于客户特定功能和规范的分支 有什么方法可以标记一些文件 文件夹 这些文件 文件夹不应在每次执行此类操作时合并到分支中 我没有
  • Python 在调用之前修饰函数

    我有一个由其他人编写的相当复杂的装饰器 我想做的是根据决定一次调用该函数的修饰版本 或者另一次调用原始函数 未修饰 这可能吗 With decorator original function Without original functio
  • 如果输入等于字符串,则执行某些操作... python 2.7 [重复]

    这个问题在这里已经有答案了 使用以下代码时遇到问题 start over 1 question input Do you wish to try again y n if question y start over 1 else raise
  • 如何使用 xsmtp-api 和 php 库向 SendGrid 中的电子邮件主题添加替换标签

    我正在尝试在电子邮件的主题中设置我的客户的姓名 这对我的申请非常重要 从我在SendGrid API 文档 http sendgrid com docs API Reference SMTP API substitution tags ht
  • 将 JComboBox 放入 JTable 中

    我想将单独的 JComboBox 放入 JTable 的每个单元格中 IE 每个单元格的 JComboBox 内容都不相同 我基本上希望能够调用以下代码来将一行 JComboBox 添加到 JTable 中 有人有什么想法吗 谢谢 JCom
  • 如何在移动滑块时锁定页面滚动

    在 Flutter 中 我有一个应用程序 在自定义 ListView 小部件内有一个自定义滑块 唯一的问题是 当您移动滑块手柄 从左到右 时 由于 ListView 页面仍然会滚动 向上和向下 而我需要页面锁定到位 直到用户停止移动滑块 我
  • 读取 NTFS 格式的 MFT

    在网上寻找如何读 写 MFT 的解释时 我发现了以下部分 http www installsetupconfig com win32programming 1996 20AppE apnilife pdf http www installs
  • 将 nl2br 与 html 标签一起使用

    I use nl2br当显示保存在某处的一些信息时 但是当使用 HTML 标签时我不想添加 br 他们的标签 例如 如果我使用 table th th table 它将被转换为 table br th th br table br 这为这张
  • n 层架构 - BLL、DAL 和接口。什么是最佳实践?

    我有一个关于 n 层架构的问题 在问这个问题之前 我想了很久 因为这里已经有很多类似的问题了 但是 在看了一天半并阅读了其他答案之后 我仍然不确定 各种看似相似的术语和不同的方法让我感到困惑 如果我在不同的类库中有一个 BLL 和一个 DA
  • 购物车 API V3:无法为具有选项的产品创建购物车

    当我创建一个包含没有选项的产品的购物车时 一切正常 但如果任何产品有产品选项 则它不起作用 这里我得到了产品选项 它有一个 id 21 的选项 当我在创建 API 时使用此选项 id 时 它不起作用 如果您要将产品添加到购物车 并且该产品具
  • 将 celery 任务结果链接到通讯组中

    Like in 这另一个问题 https stackoverflow com questions 13271056 how to chain a celery task that returns a list into a group 我想