我的问题可能非常基本,但我仍然无法在官方文档中找到解决方案。我在 Django 应用程序中定义了一个 Celery 链,执行一组相互依赖的任务:
chain( tasks.apply_fetching_decision.s(x, y),
tasks.retrieve_public_info.s(z, x, y),
tasks.public_adapter.s())()
显然第二个和第三个任务需要父任务的输出,这就是我使用链的原因。
现在的问题是:我需要以编程方式revoke如果第一个任务中的测试条件失败,则执行第二个和第三个任务。如何以干净的方式做到这一点?我知道我可以从定义链的方法中撤销链的任务(请参阅this问题和this doc) but inside第一个任务我看不到后续任务或链本身。
临时解决方案
我目前的解决方案是skip后续任务中的计算基于前一个任务的结果:
@shared_task
def retrieve_public_info(result, x, y):
if not result:
return []
...
@shared_task
def public_adapter(result, z, x, y):
for r in result:
...
但这个“解决方法”有一些缺陷:
- 向每个任务添加不必要的逻辑(基于前一个任务的结果),从而影响重用
- Still executes后续任务以及由此产生的所有开销
我没有过多地将链的引用传递给任务,以免搞砸事情。我也承认我没有尝试过抛出异常的方法,因为我认为选择不继续执行链可能是一种功能性(因此非例外)场景......
感谢您的帮助!