我们的工作流程目前是围绕旧版本的 celery 构建的,因此请记住,事情已经不是最佳的。我们需要运行一个任务并将该任务运行的记录保存在数据库中。如果该任务失败或挂起(这种情况经常发生),我们希望重新运行,就像第一次运行一样。但这不应该自动发生。它需要根据故障的性质手动触发,并且需要将结果记录在数据库中以做出决定(通过前端)。
我们如何在数据库中保存任务的完整记录,以便后续进程可以获取该记录并运行新的相同任务?当前的实现保存了路径@task
数据库中的装饰函数作为TaskInfo
模型。当任务需要重新运行时,我们有一个get_task()
方法上的TaskInfo
从数据库获取路径的模型,使用以下命令导入它getattr
,还有另一个rerun()
再次运行任务的方法*args, **kwargs
(也保存在数据库中)。
像这样(这些是方法TaskInfo
模型实例):
def get_task(self):
"""Returns the task's decorated function, which can be delayed."""
module_name, object_name = self.path.rsplit('.', 1)
module = import_module(module_name)
task = getattr(module, object_name)
if inspect.isclass(task):
task = task()
# task = current_app.tasks[self.path]
return task
def rerun(self):
"""Re-run the task, and replace this one.
- A new task is scheduled to run.
- The new task's TaskInfo has the same parent as this TaskInfo.
- This TaskInfo is deleted.
"""
args, kwargs = self.get_arguments()
celery_task = self.get_task()
celery_task.delay(*args, **kwargs)
defaults = {
'path': self.path,
'status': Status.PENDING,
'timestamp': timezone.now(),
'args': args,
'kwargs': kwargs,
'parent': self.parent,
}
TaskInfo.objects.update_or_create(task_id=celery_task.id, defaults=defaults)
self.delete()
必须有一个更干净的解决方案来将任务保存在数据库中以便稍后重新运行,对吧?