Like in 这另一个问题 https://stackoverflow.com/questions/13271056/how-to-chain-a-celery-task-that-returns-a-list-into-a-group,我想从 celery 任务返回的列表创建一个 celery 组。这个想法是,第一个任务将返回一个列表,第二个任务将该列表分解为列表中每个项目的并发任务。
计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是下载页面、处理页面,然后将其上传到 s3 的链。最后,一旦所有子页面完成,该网站就会在我们的数据库中标记为已完成。就像是:
chain(
get_links_from_website.si('https://www.google.com'),
dmap.s( # <-- Distributed map
download_sub_page.s() |
process_sub_page.s() |
upload_sub_page_to_s3.s()
),
mark_website_done.s()
)
到目前为止我看到的解决方案似乎在这方面做得足够好,但是当第二个任务是一个链时,由于以下问题而失败clone
不进行深层复制(请参阅对此答案的评论 https://stackoverflow.com/q/13271056/64911详情):
@task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
它还存在一个问题,如果迭代的长度为 10,000 个项目,它将创建一个包含 10,000 个项目的组。正如您可以想象的那样,这会增加我们的内存使用量。
所以,我正在寻找一种方法dmap
that:
- 不会通过创建巨大的组来耗尽 RAM(也许有一种方法可以对可迭代对象进行分块?)
- 适用于 celery 链,没有深度复制问题。
芹菜画布提供chunks https://celery.readthedocs.io/en/latest/userguide/canvas.html#chunks将任务分成块。不幸的是,这不适用于链、组等原语。
您可以使用 celery 信号来防止 dmap/clone 出现问题。
ch = chain(
download_sub_page.s(),
process_sub_page.s(),
upload_sub_page.s(),
)
@task_success.connect(sender='get_links_from_website')
def task_success_handler(sender=None, headers=None, body=None, **kwargs):
result = kwargs['result']
header = [ch(i) for i in result]
callback = mark_website_done.si()
chord(header)(callback)
创建一个用于处理页面的链,并使用弦将最后一个任务挂接到它。每当get_links_from_website
运行成功。
根据链所花费的时间,您还可以保存结果get_links_from_website
某处。然后迭代一批它们以对链进行排队,并且对于最后一批,您可以将回调挂钩到最后一个任务。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)