在数据库中保存 celery 任务(用于重新运行)

2023-12-19

我们的工作流程目前是围绕旧版本的 celery 构建的,因此请记住,事情已经不是最佳的。我们需要运行一个任务并将该任务运行的记录保存在数据库中。如果该任务失败或挂起(这种情况经常发生),我们希望重新运行,就像第一次运行一样。但这不应该自动发生。它需要根据故障的性质手动触发,并且需要将结果记录在数据库中以做出决定(通过前端)。

我们如何在数据库中保存任务的完整记录,以便后续进程可以获取该记录并运行新的相同任务?当前的实现保存了路径@task数据库中的装饰函数作为TaskInfo模型。当任务需要重新运行时,我们有一个get_task()方法上的TaskInfo从数据库获取路径的模型,使用以下命令导入它getattr,还有另一个rerun()再次运行任务的方法*args, **kwargs(也保存在数据库中)。

像这样(这些是方法TaskInfo模型实例):

def get_task(self):
    """Returns the task's decorated function, which can be delayed."""
    module_name, object_name = self.path.rsplit('.', 1)
    module = import_module(module_name)
    task = getattr(module, object_name)
    if inspect.isclass(task):
        task = task()
    # task = current_app.tasks[self.path]
    return task

 def rerun(self):
    """Re-run the task, and replace this one.

    - A new task is scheduled to run.
    - The new task's TaskInfo has the same parent as this TaskInfo.
    - This TaskInfo is deleted.
    """
    args, kwargs = self.get_arguments()
    celery_task = self.get_task()
    celery_task.delay(*args, **kwargs)
    defaults = {
        'path': self.path,
        'status': Status.PENDING,
        'timestamp': timezone.now(),
        'args': args,
        'kwargs': kwargs,
        'parent': self.parent,
    }
    TaskInfo.objects.update_or_create(task_id=celery_task.id, defaults=defaults)
    self.delete()

必须有一个更干净的解决方案来将任务保存在数据库中以便稍后重新运行,对吧?


最新版本的 Celery (4.4.0) 包含一个参数extended_result。您可以将其设置为 True,然后该表(它的名称为celery_taskmeta默认情况下)在Result Backend Database将存储args and kwargs的任务。

这是一个演示:

app = Celery('test_result_backend')

app.conf.update(
    broker_url='redis://localhost:6379/10',
    result_backend='db+mysql://root:passwd@localhost/celery_toys',
    result_extended=True
)


@app.task(bind=True, name='add')
def add(self, x, y): 
    self.request.task_name = 'add'  # For saving the task name.
    time.sleep(5)
    return x + y 

通过MySQL中记录的任务信息,您可以轻松地重新运行您的任务。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在数据库中保存 celery 任务(用于重新运行) 的相关文章

随机推荐