我已经使用 Celery 一段时间了,在生产中我使用 RabbitMQ 作为代理,使用 Redis 作为 K8s 集群中的后端,到目前为止没有任何问题。在本地,我运行一个包含一些服务(Flask API、2 个不同的 Workers、Beat、Redis、Flower、Hasura)的 docker compose,使用 Redis 作为代理和后端。
在过去的几个月里,我没有遇到过这种设置的问题,但昨天我在访问任务结果时开始出现不稳定的行为。
任务被发送到队列,工作人员识别它并执行任务,但是在查询任务状态时我有时会得到DisabledBackend
。通常在第一次请求时,然后它就起作用了。无法找到何时有效、何时无效的模式,它是不稳定的。
我在某处读到 Celery 不能很好地与 Flask 的内置服务器一起工作,所以我切换到 uWSGI,其设置与我在生产中的设置几乎相同:
[uwsgi]
wsgi-file = app/uwsgi.py
callable = application
http = :8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true
buffer-size = 32768
enable-threads = true
req-logger = python:uwsgi
我见过一个类似的问题 https://stackoverflow.com/questions/67604229/disabledbackend-error-with-every-second-reload-with-mod-wsgi-django-celery-and在 Django 中,问题似乎出在带有 Apache 的 WSGI Mod 上,这不是我的情况,但行为似乎相似。我看到的所有其他问题都与后端配置错误有关,这不是我的情况。
关于可能导致这种情况的原因有什么想法吗?
谢谢。
所以看来我需要访问AsyncResult
仅通过我的 Celery 应用程序实例,而不是通过 Celery,或将 Celery 应用程序实例作为参数传递。
所以,这是行不通的:
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id)
return task.state
这有效:
from app import my_celery # Your own Celery Application Instance
@app.route('/status/<task_id>')
def get_status(task_id):
task = my_celery.AsyncResult(task_id)
return task.state
这也有效:
from app import my_celery
from celery.result import AsyncResult
@app.route('/status/<task_id>')
def get_status(task_id):
task = AsyncResult(task_id, app=my_celery)
return task.state
我猜发生的事情是通过调用AsyncResult
直接从 Celery 中,它不会访问 Celery 的配置,因此它认为没有配置用于查询结果的后端。
但这只能解释功能的完全失败,而不能解释不稳定的行为。我猜这是因为不同的线程,以及应用程序实例很重要的情况,所以 Celery 找到了它,但不太确定。
我已经运行了几次测试,在更改导入后似乎再次正常工作AsyncResult
,但我会继续挖掘。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)