本地主机上的 Django/Celery 多个队列 - 路由不起作用

2024-02-14

我跟着芹菜docs http://celery.readthedocs.org/en/latest/userguide/routing.html#manual-routing在我的开发机器上定义 2 个队列。

我的芹菜设置:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}

我在项目的 virtualenv 中打开了两个终端窗口,并运行以下命令:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

我得到的是所有任务都由两个队列处理。

我的目标是拥有一个队列来仅处理中定义的一项任务CELERY_ROUTES和默认队列来处理所有其他任务。

我也关注了这个所以问题 https://stackoverflow.com/questions/10079816/route-celery-task-to-specific-queue, rabbitmqctl list_queues回报celery 0,并运行rabbitmqctl list_bindings回报exchange celery queue celery []两次。重新启动兔子服务器没有改变任何东西。


好吧,所以我想通了。以下是我的整个设置、设置以及如何运行 celery,对于那些可能想知道与我的问题相同的事情的人。

Settings

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}

如何运行芹菜?

终端 - 选项卡 1:

celery -A proj worker -Q default -l debug -n default_worker

这将启动第一个工作进程来消耗默认队列中的任务。笔记!-n default_worker对于第一个工作人员来说不是必须的,但如果您有任何其他启动并运行的 celery 实例,则这是必须的。环境-n worker_name是相同的--hostname=default@%h.

终端 - 选项卡 2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

这将启动第二个工作人员,从提要队列中消费任务。注意-n feeds_worker,如果你正在运行-l debug(日志级别=调试),您将看到两个工作人员正在它们之间同步。

终端 - 选项卡 3:

celery -A proj beat -l debug

这将开始节拍,根据您的计划执行任务CELERYBEAT_SCHEDULE。 我不必改变任务,或者CELERYBEAT_SCHEDULE.

例如,这就是我的样子CELERYBEAT_SCHEDULE对于应该进入 feeds 队列的任务:

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}

如您所见,无需添加'options': {'routing_key': 'long_tasks'}或指定它应该进入哪个队列。另外,如果您想知道为什么Update是大写的,因为它是一个自定义任务,被定义为celery.Task.

更新芹菜5.0+

Celery 自版本 5 以来进行了一些更改,以下是任务路由的更新设置。

如何创建队列?

Celery 可以自动创建队列。它非常适合简单的情况,其中 celery 的默认路由值就可以了。

task_create_missing_queues=True或者,如果您使用 django 设置并且您正在为所有 celery 配置命名空间CELERY_ key, CELERY_TASK_CREATE_MISSING_QUEUES=True。请注意,它默认处于打开状态。

自动计划任务路由

配置 celery 应用程序后:

celery_app.conf.beat_schedule = {
  "some_scheduled_task": {
    "task": "module.path.some_task",
    "schedule": crontab(minute="*/10"),
    "options": {"queue": "queue1"}
  }
}

自动任务路由

Celery 应用程序仍然需要先配置,然后:

app.conf.task_routes = {
  "module.path.task2": {"queue": "queue2"},
}

手动分配任务

如果您想动态路由任务,则在发送任务时指定队列:

from module import task

def do_work():
  # do some work and launch the task
  task.apply_async(args=(arg1, arg2), queue="queue3")

重新路由的更多详细信息可以在这里找到:https://docs.celeryproject.org/en/stable/userguide/routing.html https://docs.celeryproject.org/en/stable/userguide/routing.html

关于此处的调用任务:https://docs.celeryproject.org/en/stable/userguide/calling.html https://docs.celeryproject.org/en/stable/userguide/calling.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

本地主机上的 Django/Celery 多个队列 - 路由不起作用 的相关文章

  • 如何配置散景图以具有响应宽度和固定高度

    我使用通过组件功能嵌入的散景 实际上我使用 plot sizing mode scale width 它根据宽度进行缩放并保持纵横比 但我想要一个响应宽度但固定或最大高度 这怎么可能实现呢 有stretch both and scale b
  • 使用 GeoDjango 在坐标系之间进行转换

    我正在尝试将坐标信息添加到我的数据库中 添加django contrib gis支持我的应用程序 我正在写一个south数据迁移 从数据库中获取地址 并向 Google 询问坐标 到目前为止 我认为我最好的选择是使用geopy为了这 接下来
  • 同情因子简单关系

    我在 sympy 中有一个简单的因式分解问题 无法解决 我在 sympy 处理相当复杂的积分方面取得了巨大成功 但我对一些简单的事情感到困惑 如何得到 phi 2 2 phi phi 0 phi 0 2 8 因式分解 phi phi 0 2
  • ModuleNotFoundError:没有名为:crispy_forms的模块[关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 我可以导入 Cripy forms 但是当我运行时python3 manage py runserver 它说没有名为 Cripy
  • 将 stdout 重定向到 Python 中的文件? [复制]

    这个问题在这里已经有答案了 如何将 stdout 重定向到 Python 中的任意文件 当长时间运行的 Python 脚本 例如 Web 应用程序 从 ssh 会话内启动并处于后台 并且 ssh 会话关闭时 应用程序将引发 IOError
  • 代理阻止网络套接字?如何绕行

    我有一个用 Python 编写的正在运行的 websocket 服务器 来自https github com opiate SimpleWebSocketServer https github com opiate SimpleWebSoc
  • python 语言环境奇怪的错误。这究竟是怎么回事?

    所以今天我升级到了 bazaar 2 0 2 我开始收到这条消息 顺便说一句 我在雪豹上 bzr warning unknown locale UTF 8 Could not determine what text encoding to
  • Python/Flask:应用程序在关闭后正在运行

    我正在开发一个简单的 Flask Web 应用程序 我使用 Eclipse Pydev 当我开发该应用程序时 由于代码更改 我必须经常重新启动该应用程序 这就是问题所在 当我运行该应用程序时 我可以在本地主机上看到该框架 这很好 但是当我想
  • 超时时杀死或终止子进程?

    我想尽可能快地重复执行子进程 然而 有时这个过程会花费太长的时间 所以我想杀死它 我使用 signal signal 如下所示 ppid pipeexe pid signal signal signal SIGALRM stop handl
  • Python将csv数据导出到文件中

    我有以下运行良好的代码 但我无法修剪数据并将其存储在数据文件中 import nltk tweets love this car this view amazing not looking forward the concert def g
  • 从文档字符串生成 sphinx 文档不起作用

    我有一个具有以下结构的项目 我想保留 my project build here is where sphinx should dump into requirements txt make bat Makefile more config
  • 如何在 Numpy 中实现垃圾收集

    我有一个名为main py 它引用另一个文件Optimisers py它仅具有功能并用于for循环进入main py 这些函数都有不同的优化功能 This Optimisers py然后引用另外两个类似的文件 其中也只有函数 它们位于whi
  • 将 ASCII 字符转换为“”unicode 表示法的脚本

    我正在对 Linux 区域设置文件进行一些更改 usr share i18n locales like pt BR 并且需要格式化字符串 例如 d m Y H M 必须以 Unicode 指定 其中每个 在本例中为 ASCII 字符表示为
  • 如何在 Tkinter 的 Button 小部件中创建多个标签?

    我想知道如何在 Tkinter 中创建具有多个标签的按钮小部件 如下图所示 带有子标签的按钮 https i stack imgur com jOZRw jpg正如您所看到的 在某些按钮中有一个子标签 例如按钮 X 有另一个小标签 A 我试
  • 如何从c++调用python

    我是Python新手 我尝试像这样从 C 调用 python 脚本 在 Raspberry Pi 中 std string pythonCommand python Callee py a b int res system pythonCo
  • if/else 在 while 循环内正确缩进[关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 我开始学习 Python 编程大约几周了 我遇到了一些麻烦 下面的代码是一个小程序 用于检查列表中是否有偶数 如果找到第一个偶数
  • 通过套接字发送字符串(python)

    我有两个脚本 Server py 和 Client py 我心中有两个目标 能够从客户端一次又一次地向服务器发送数据 能够将数据从服务器发送到客户端 这是我的 Server py import socket serversocket soc
  • python 的 fcntl.flock 函数是否提供文件访问的线程级锁定?

    Python 的 fcnt 模块提供了一种名为 flock 1 的方法来证明文件锁定 其描述如下 对文件执行锁定操作op 描述符 fd 文件对象提供 fileno 方法被接受为 出色地 请参阅 Unix 手册集群 2 了解详情 在某些系统上
  • 升级后 pip 损坏

    我做了 pip install U easyinstall 然后 pip install U pip 来升级我的 pip 但是 当我尝试使用 pip 时 我现在收到此错误 root d8fb98fc3a66 which pip usr lo
  • Elastic Beanstalk 上的 Django + MySQL - 查询 MySQL 时出错

    当我在 Elastic beanstalk 上托管的 Django 应用程序上查询 MySQL 时 出现错误 错误说 admin login 处出现操作错误 1045 用户 adminDB 172 30 23 5 的访问被拒绝 使用密码 Y

随机推荐