如何确保在子任务失败时调用 Celery 和弦回调?

2024-03-03

我在 Celery 中使用 Chord 来进行回调,当一组并行任务完成执行时会调用该回调。具体来说,我有一组函数来包装对外部 API 的调用。我想等待所有这些返回,然后再处理结果并在 Chord 回调中更新我的数据库。我希望回调在所有 API 调用完成时执行,无论其状态如何。

我的问题是,只有当组的子任务都没有引发异常时,才会调用回调函数。但是,如果一个子任务引发异常,则可选的错误处理程序on_error()被调用时使用字符串表示形式task_id和弦的。组中的其余任务继续执行,但回调函数永远不会被调用。

我将用下面的例子来说明这一点:

@app.task
def maybe_succeed():
  divisor = randint(0, 10)
  return 1 / divisor

@app.task
def master_task():
 g = group([maybe_succeed.s() for i in range(100)])
 c = g | chord_callback.s()
 return c.delay()

@app.task
def chord_callback(results):
  print 'Made it here!'

在上面的例子中,调用master_task()将运行组中的所有任务,但是,回调将永远不会被调用,因为其中之一maybe_succeed()将会失败(除非你超级幸运!)。


现在,我正在通过捕获等效的所有异常来处理这个问题maybe_succeed()这样和弦就永远不会失败。我想这是一个很好的解决方案,但事实并非如此feel right.

所以,我的问题是: 有没有办法让 Celery Chord 回调执行,无论其组子任务的返回状态如何?


您可以尝试在 errback 中调用原始回调:

@celery.task
def plus(x, y):
    print(f'Running plus {x}, {y}')
    return x + y


@celery.task
def failure():
    print('Running failure')
    raise ValueError('BAD')


@celery.task
def callme(stuff):
    print('Callback')
    print(f'Callback arg: {stuff}')


@celery.task
def on_chord_error(task_id, extra_info):
    print('ON ERROR CALLBACK')
    print(f'Task ID: {task_id}')
    print(f'Extra info: {extra_info}')
    callme.delay(extra_info)


@celery.task
def chord_test():
    tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]
    callback = callme.s().on_error(on_chord_error.s('extra info'))
    chord(tasks)(callback)

结果是:

Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7]  ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):
   File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chord
    ret = j(timeout=3.0, propagate=True)
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in join
   interval=interval, no_ack=no_ack, on_interval=on_interval,
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in get
   self.maybe_throw(callback=callback)
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throw
   self.throw(value, self._to_remote_traceback(tb))
   File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throw
    self.on_ready.throw(*args, **kwargs)
   File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throw
    reraise(type(exc), exc, tb)
   File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraise
    raise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何确保在子任务失败时调用 Celery 和弦回调? 的相关文章

随机推荐

  • 如何从 MVC 控制器在 debian linux 中打印 pdf

    AsP NET MVC 4 应用程序使用 Mono 在 Debian Squeeze Linux 中运行 控制器使用下面的代码从 html 创建 pdf 文件 如何将 pdf 文件打印到连接到服务器的三星打印机 有一些可执行文件可以用于此目
  • Select2 - formatNoMatches 上的链接

    我在用着Select2 http ivaynberg github com select2 来增强我的
  • 在 Python 中验证 HTML/RDFa

    编写可以验证 HTML 尤其是使用嵌入式 RDFa 的 python 模块的最佳方法是什么 我熟悉 validator w3 org 并且有兴趣编写一个执行类似功能的自定义验证器 但针对使用 RDFa 作为元素元数据的不同标准 有哪些值得查
  • (NextAuth) 类型错误:类型“{}”上不存在属性“会话”

    我在 NextJs 项目上使用 NextAuth 时收到错误 Type error Property session does not exist on type 我正在添加session按照此处的建议将属性添加到我的 app tsx 中
  • 四组设计模式如何融入 MVC 范式?

    我仔细考虑过设计模式一段时间以来 我刚刚开始考虑如何真正开始将其中一些更有意地融入到我的开发工作中 然而 我仍然对他们在本书开头对 MVC 的处理以及它与本书其余部分的关系感到困惑 我使用过的大多数框架 Spring Yii ASP NET
  • AWS Cognito - JavaScript 中的开发人员身份验证(浏览器)

    我在浏览器脚本中获取凭据时遇到问题 身份验证服务器返回 cognito identityId 和 cognito token 然后我设置了一个Cookie cookie cognitoidentityId cookie cognito to
  • 如何为弹跳球创建碰撞检测?

    我已经为三个沙滩球在屏幕上弹跳编写了一个动画 用Python 我现在希望它们全部碰撞并能够相互弹开 我非常感谢您能提供的任何帮助 import pygame import random import sys class Ball def i
  • 准备好使用移动应用程序的后端了吗? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 从 github 添加厨师食谱

    我使用以下命令从 opscode 网站下载了许多食谱 Knife Cookbook网站安装git例如 但是现在我想从 github 上的存储库下载一本说明书到我当前的说明书目录中 我该怎么做呢 我应该将存储库克隆到我的食谱目录中吗 谢谢你
  • 在 javax.scripting javascript 环境中导入地图

    我在 javax scripting 地图实现中看到一些奇怪的行为 在线示例显示example http java sun com developer technicalArticles J2SE Desktop scripting 添加到
  • 两个对象上的 Linq 完全外连接

    我有两个名为 CountryMobility 的对象 我相信我需要将它们与完整的外部联接结合起来 我该如何使用 linq 来做到这一点 public class CountryMobility public string countryCo
  • 评论解释的代码和性能

    我总是 尽力 评论我的代码 我已将服务器配置为在交付前删除这些注释 额外的空白 在实时系统代码 Javascript php 中不要添加注释 从而减少这种开销或者删除或解释是否会更好 如果是这样 我怎样才能鱼与熊掌兼得呢 对于 PHP 来说
  • UIScrollView 与居中的 UIImageView,如照片应用程序

    我想要带有图像内容视图的滚动视图 该图像实际上是比屏幕大得多的地图 地图最初应位于滚动视图的中心 就像当您将 iPhone 转向横向时照片应用程序中的照片一样 我没有设法将地图置于中心并同时进行正确的缩放和滚动 假设地图图像从屏幕顶部开始
  • 在spark中设置textinputformat.record.delimiter

    在 Spark 中 可以设置一些 hadoop 配置设置 例如 System setProperty spark hadoop dfs replication 1 这有效 复制因子设置为 1 假设是这种情况 我认为这种模式 在常规 hado
  • 以编程方式检测 Android 中的 USB 连接类型

    是否可以以编程方式检测 USB 连接的类型 是否仅充电 MTP PTP 等 我知道如何检测它是否已连接 几乎每个线程都在谈论这一点 我尝试寻找一些 Intent 事件来注册接收器 但找不到任何合适的事件 注意 我不想以编程方式更改它 我想在
  • 如何将 Sprite 纹理更改为动画

    我有一个每秒生成的精灵 我不想做的是将精灵纹理更改为动画 并且当它被触摸时它将恢复为正常纹理 public void draw SpriteBatch batch enemyIterator enemies iterator arrayli
  • 我可以选择第 n 个 css 列吗?

    我有一个div有 4 个 CSScolumns我想选择第三列和第四列以使文本稍微变暗 因为文本和文本之间没有很好的对比度background image 这可能吗 我可以接受任何 css 或 js 解决方案 这是demo http jsfi
  • Jfreechart - 多个 XY 图表的任何选项(如多重饼图)?

    有没有类似于 multiPiePlot Chart 但用于 xy 图的东西 我有一个应用程序需要在一页上打印两个或三个 xy 图 我知道您可以将多个数据集放在同一个绘图上 但要求指定每个数据集必须是同一页面上的单独图表 是的 只需添加您的C
  • 无法使用 NGXS 更新存储来修补状态。我不断看到类型错误:无法冻结

    我正在使用一个基本的 Angular 11 应用程序 该应用程序已实现身份验证 使用 AWS Cognito 和 Amplify 我在这里想做的事情非常简单 我正在使用内置的 AWS Amplify 方法进行身份验证 我正在使用 NGXS
  • 如何确保在子任务失败时调用 Celery 和弦回调?

    我在 Celery 中使用 Chord 来进行回调 当一组并行任务完成执行时会调用该回调 具体来说 我有一组函数来包装对外部 API 的调用 我想等待所有这些返回 然后再处理结果并在 Chord 回调中更新我的数据库 我希望回调在所有 AP