首先,让我们澄清一下 celery 库(你可以用它得到的)之间的区别pip install
或者在你的setup.py
)和芹菜worker- 这是从代理中取出任务并处理它们的实际过程。当然你可能想要多个workers/processes(例如,用于将不同的任务分离给不同的工作人员)。
假设您有两个任务:calculate_recommendations_task
and periodic_update_task
并且您想在单独的工作人员上运行它们,即recommendation_worker
and periodic_worker
。
另一个过程将是celery beat
这只是排队periodic_update_task
每 x 小时进入经纪商。
另外,假设您有一个简单的 Web 服务器,使用以下方法实现bottle https://bottlepy.org/docs/dev/.
我假设您也想将 celery 代理和后端与 docker 一起使用,我将选择 celery 的推荐用法 -RabbitMQ https://www.rabbitmq.com/作为经纪人和Redis https://redis.io/作为后端。
现在我们有 6 个容器,我将它们写在docker-compose.yml
:
version: '2'
services:
rabbit:
image: rabbitmq:3-management
ports:
- "15672:15672"
- "5672:5672"
environment:
- RABBITMQ_DEFAULT_VHOST=vhost
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
redis:
image: library/redis
command: redis-server /usr/local/etc/redis/redis.conf
expose:
- "6379"
ports:
- "6379:6379"
recommendation_worker:
image: recommendation_image
command: celery worker -A recommendation.celeryapp:app -l info -Q recommendation_worker -c 1 -n recommendation_worker@%h -Ofair
periodic_worker:
image: recommendation_image
command: celery worker -A recommendation.celeryapp:app -l info -Q periodic_worker -c 1 -n periodic_worker@%h -Ofair
beat:
image: recommendation_image
command: <not sure>
web:
image: web_image
command: python web_server.py
两个 dockerfiles,它构建了recommendation_image
和web_image
应该安装芹菜library。只有recommendation_image
应该有任务代码,因为工作人员将处理这些任务:
推荐 Dockerfile:
FROM python:2.7-wheezy
RUN pip install celery
COPY tasks_src_code..
WebDocker 文件:
FROM python:2.7-wheezy
RUN pip install celery
RUN pip install bottle
COPY web_src_code..
其他图像(rabbitmq:3-management
& library/redis
可以从 docker hub 获得,当你运行时它们会自动拉取docker-compose up
).
现在事情是这样的:在你的网络服务器中,你可以通过字符串名称触发 celery 任务,并通过任务 ID 提取结果(无需共享代码)网络服务器.py:
import bottle
from celery import Celery
rabbit_path = 'amqp://guest:guest@rabbit:5672/vhost'
celeryapp = Celery('recommendation', broker=rabbit_path)
celeryapp.config_from_object('config.celeryconfig')
@app.route('/trigger_task', method='POST')
def trigger_task():
r = celeryapp.send_task('calculate_recommendations_task', args=(1, 2, 3))
return r.id
@app.route('/trigger_task_res', method='GET')
def trigger_task_res():
task_id = request.query['task_id']
result = celery.result.AsyncResult(task_id, app=celeryapp)
if result.ready():
return result.get()
return result.state
最后一个文件配置.celeryconfig.py:
CELERY_ROUTES = {
'calculate_recommendations_task': {
'exchange': 'recommendation_worker',
'exchange_type': 'direct',
'routing_key': 'recommendation_worker'
}
}
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']