Celery/Redis 同一任务并行执行多次

2024-01-13

我有 2 个自定义任务(TaskA and TaskB),两者都继承自celery.Task。调度程序启动TaskA时不时地,并且TaskA发射N times TaskB每次都有不同的论据。但由于某种原因,有时是相同的TaskB使用相同的参数,同时执行两次,这会导致数据库出现不同的问题。

class TaskA(celery.Task):

    def run(self, *args, **kwargs):
        objects = MyModel.objects.filter(processed=False)\
                                 .values_list('id', flat=True)
        task_b = TaskB()
        for o in objects:
            o.apply_async(args=[o, ])

class TaskB(celery.Task):

    def run(self, obj_id, *args, **kwargs):
        obj = MyModel.objects.get(id=obj_id)
        # do some stuff with obj

我尝试过的事情

我尝试使用celery.group希望它能解决这些问题,但我得到的只是错误,说run需要 2 个参数,但没有提供任何参数。

这就是我尝试启动的方式TaskB using celery.group:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()

我也这样尝试过:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()

之前就在那里执行了任务g.apply_async().

Question

问题是出在我启动任务的方式上还是其他原因?这是正常行为吗?

附加信息

在我的本地机器上我运行celery 3.1.13 with RabbitMQ 3.3.4,并在服务器上celery 3.1.13与运行Redis 2.8.9。 在本地计算机上我没有看到这样的行为,每个任务都执行一次。在服务器上,我看到 1 - 10 个此类任务连续执行两次。

这是我在本地计算机和服务器上运行 celery 的方式:

celery_beat: celery -A proj beat -l info

celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50

celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200

可行的解决方法

我引入了一个锁TaskB基于它收到的论据。经过大约 10 个小时的测试,我看到到底执行了两次,但锁可以防止数据库发生冲突。 这确实解决了我的问题,但我仍然想了解为什么会发生这种情况。


您是否设置了fanout_prefix and fanout_patterns如中所述使用Redis http://celery.readthedocs.org/en/latest/getting-started/brokers/redis.html#broker-redis芹菜的文档?我将 Celery 与 Redis 一起使用,但没有遇到这个问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Celery/Redis 同一任务并行执行多次 的相关文章

随机推荐

  • C# 相当于 Google Maps API 的computeArea

    Google 地图 Api 有一个google maps geometry spherical computeArea http code google com apis maps documentation javascript geom
  • 使用 Okhttp 在多部分 POST 中传递数组

    我正在构建一个需要 API 调用的应用程序 该调用在其 POST 正文请求中包含数组 我正在使用 OkHttp 2 6 来请求 API Postman 中的请求如下所示 我尝试过几种写作方式RequestBody为了实现这一目标 第一种方法
  • 如何摆脱旧的善变的头脑?

    您好 我希望您可以从我的 i o 中了解如何继续将所有内容合并到最新更新而不丢失我的更改 hg merge avbryter grenen default har 4 huvuden sammanfoga med en specifik r
  • 如何在 lambda 表达式中设置断点?

    我想调试在表达式树中调用的 lambda 不幸的是 断点永远不会被命中 这是一个完整的控制台程序 private static void Main var evalAndWrite EvalAndWrite x gt x 1 a break
  • 如何解析哈希的字符串表示形式

    我有这个字符串 我想知道如何将其转换为哈希值 account id gt 4444 deposit id gt 3333 miku的回答中建议的方法确实是最简单的unsafest DO NOT RUN IT eval surprise gt
  • 模块化 Selenium RC 测试脚本的最佳实践

    我正在 Visual Studio C 中创建 Selenium RC 测试脚本 我是 努力重构测试 我所有的测试都在一个 文件 我将不胜感激任何意见和 或网站 书籍 等等来了解模块化测试 我必须在不同的站点上运行相同的测试 相同的应用程序
  • 为什么mode_t使用4字节?

    我刚刚读到了有关 mode t 的内容 它基本上存储了以下信息 文件类型的 7 个布尔值 S IFREG S IFDIR S IFCHR S ISBLK S ISFIFO S ISLINK S ISSOCK 3 3 9 个布尔值 用于访问权
  • 无法使用 Selenium webdriver 将解压的扩展加载到 chrome 中

    我是使用网络驱动程序的新手 但我遵循了此处提到的内容 如何使用未打包的扩展程序启动 Chrome https stackoverflow com questions 18994519 how can i launch chrome with
  • Netbeans 自动缩进和大括号 }{

    是否可以将 NetBeans 编辑器设置为自动取消缩进右大括号 我要这个 if something do thing one do thing two Netbeans 给了我这个 if something do thing one do
  • 在 Python 文档字符串中嵌入 reStructuredText

    我希望在我的 Python 文档字符串中看到一些不错的语法突出显示和着色 它们 当然 是有效的 RESt 例如 A section an example some code rest of python code 我最接近的是我的 vim
  • 4.4 和 5.5 英寸的 UICollectionViewCell 动态调整大小

    我有 UICollectionView 与情节提要中的单元格 每个单元格的大小设置为 145x145 它们在 iPhone 4 5s 上看起来不错 但在 iPhone 6 和 6 上尺寸并没有按比例增加 我如何动态地设置而不是为每个设备手动
  • 使用 JSON 字符串搜索 MySQL 列中的特定值

    我目前有一个 MySQL 表 其中包含一个用于存储类别 ID 的列 这些 id 存储在 JSON 字符串中 我正在寻找最有效的方法来查询这些 JSON 字符串以获取特定 id 例如 Table posts Field cats 以下是 JS
  • 如何使用 GSON 创建 JSONArray

    您好 在我的项目中 我需要使用 GSON 类创建 JSONArray 类型 message msg 1 msg 2 msg 3 asec asec 1 asec 2 asec 3 我知道如何在 GSON 中创建带有键值的 JSONArray
  • 如何调试线性模型和预测的“因子具有新水平”错误[重复]

    这个问题在这里已经有答案了 我正在尝试制作并测试线性模型 如下所示 lm model lt lm Purchase data train lm prediction lt predict lm model test 这会导致以下错误 指出P
  • SQL Server 如果存在

    我必须将数据插入表中 但前提是它尚不存在 我使用以下方法检查现有行 IF EXISTS SELECT X FROM Table1 where id id 会使用 X 提高性能而不是使用列名 不 您可以使用 列名 NULL甚至1 0 根据 A
  • 如何将在执行同一数据流管道期间计算的架构写入 BigQuery?

    我的场景是此处讨论的场景的一种变体 如何使用数据流执行期间计算的架构写入 BigQuery https stackoverflow com questions 29440279 how do i write to bigquery usin
  • 通过宏从过滤范围中删除行

    我遇到以下代码问题 该代码试图过滤一组数据 然后删除可见行 目前 我收到 运行时 1004 错误 删除方法或范围类失败 该错误出现在代码的最后一行 我在网站上发现了一个类似的问题 答案似乎正是我在最后一行代码中得到的答案 Dim LastR
  • 如何在 GhostScript 中将 svg 转换为 eps

    你能帮我用 Ghostscript 将 svg 文件转换为 eps 的命令吗 我尝试在 Gimp 中打开 svg 文件 但它非常大 因此看起来 gimp 无法打开它或需要太长时间 我有两个要转换的文件 注意 我已经完成了从 png 到 sv
  • 如何重复 try- except 块

    我在 Python 3 3 中有一个 try except 块 我希望它无限期地运行 try imp int input Importance n t1 High n t2 Normal n t3 Low except ValueError
  • Celery/Redis 同一任务并行执行多次

    我有 2 个自定义任务 TaskA and TaskB 两者都继承自celery Task 调度程序启动TaskA时不时地 并且TaskA发射N times TaskB每次都有不同的论据 但由于某种原因 有时是相同的TaskB使用相同的参数