Celery + SQS 两次接收相同的任务,同时具有相同的任务 ID

2024-01-17

在 Flask 应用程序中使用带有 SQS 的 celery
but celery 同时接收两次具有相同任务 ID 的相同任务,

像这样运行工人,
celery worker -A app.jobs.run -l info --pidfile=/var/run/celery/celery.pid --logfile=/var/log/celery/celery.log --time-limit=7200 --concurrency=8

这是芹菜的日志

[2019-11-29 08:07:35,464: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,465: INFO/MainProcess] Received task: app.jobs.booking.bookFlightTask[657985d5-c3a3-438d-a524-dbb129529443]  
[2019-11-29 08:07:35,471: WARNING/ForkPoolWorker-4] in booking funtion1
[2019-11-29 08:07:35,473: WARNING/ForkPoolWorker-3] in booking funtion1
[2019-11-29 08:07:35,537: WARNING/ForkPoolWorker-3] book_request_pp
[2019-11-29 08:07:35,543: WARNING/ForkPoolWorker-4] book_request_pp

同一任务收到两次并且两者同时运行,

在Python Flask中使用 celery==4.4.0rc4 , boto3==1.9.232, kombu==4.6.6 和 SQS 。
在SQS中,默认可见性超时为30分钟,我的任务是没有ETA并且没有确认

我的任务.py

from app import app as flask_app
from app.jobs.run import capp
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(flask_app)

class BookingTasks:
    def addBookingToTask(self):
        request_data = request.json
        print ('in addBookingToTask',request_data['request_id'])
        print (request_data)
        bookFlightTask.delay(request_data)
        return 'addBookingToTask added'

@capp.task(max_retries=0)
def bookFlightTask(request_data):
    task_id = capp.current_task.request.id
    try:
        print ('in booking funtion1')
        ----

我的配置文件,config.py

import os
from urllib.parse import quote_plus

aws_access_key = quote_plus(os.getenv('AWS_ACCESS_KEY'))
aws_secret_key = quote_plus(os.getenv('AWS_SECRET_KEY'))

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)
imports = ('app.jobs.run',)


## Using the database to store task state and results.
result_backend = 'db' + '+' + os.getenv('SQLALCHEMY_DATABASE_URI')

最后是我的芹菜应用程序文件 run.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from flask import Flask
from app import app as flask_app
import sqlalchemy
capp = Celery()

capp.config_from_object('app.jobs.config')

# Optional configuration, see the capplication user guide.
capp.conf.update(
    result_expires=3600,
)
 
# SQS_QUEUE_NAME is like 'celery_test.fifo' , .fifo is required
capp.conf.task_default_queue = os.getenv('FLIGHT_BOOKINNG_SQS_QUEUE_NAME')
if __name__ == '__main__':
    capp.start()

默认 SQS visiblity_timeout 为 30 秒。您需要更新 celery 配置值:broker_transport_options={'visibility_timeout': 3600}.

当 celery 创建队列时,它会将可见性超时设置为 1 小时。

注意:如果指定了task_default_queue,并且队列已经创建而没有指定broker_transport_options={'visibility_timeout': 3600}, celery 在重新启动时不会更新可见性超时broker_transport_options={'visibility_timeout': 3600}。您将需要删除队列并让 celery 重新创建它。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Celery + SQS 两次接收相同的任务,同时具有相同的任务 ID 的相关文章

随机推荐

  • 从字节数组创建 8 位图像

    字节数组是这样获得的 BufferedImage image new Robot createScreenCapture new Rectangle screenDimension byte array DataBufferByte get
  • 目录最后修改日期

    我想知道目录上次修改日期何时更改 我修改了特定目录中的文件 通过 FTP 但该目录的 LMD 没有更改 它应该如何运作 当添加 删除或重命名文件或子目录时 目录本身的 mtime 修改时间 会发生变化 修改目录中文件的内容不会更改目录本身
  • iphone:从相机捕获的图像改变方向

    我制作了一个 iPhone 应用程序来从相机捕获图像并在下一个视图中设置该图像 但问题是图像被旋转了 即风景图像变成肖像 肖像图像变成风景 我参考了很多代码但无法得到解决方案 我的代码是 void btnCapturePressed if
  • rust 中的类型 `` 没有实现什么特征 `core::kinds::Sized` ?

    我预计这会起作用 trait Task
  • Java EE 7 CDI - 注入不起作用,发送 NullPointerException

    我的注射有问题 这是我第一次尝试 我正在使用 Wildfly 和 Java EE 7 我有一个NullPointerException当尝试访问时Authenticator实例在LoginController 我使用maven 我的bean
  • 迄今为止的 MySQL 字符串,具有包含时区说明符的给定格式

    我的数据库中有一个字符串列 Wed Aug 13 17 51 06 GMT 05 30 2014 我可以将其转换为日期并在 where 子句中使用它来获取记录吗 where Timecolumn gt CURDATE 7 请注意 时区说明符
  • 使用 NSBorderlessWindowMask 时出现灰色边框

    每当我尝试使用 NSBorderlessWindowMask 创建自定义窗口并将 NSView 例如 NSImageView 设置为其 contentView 时 我都会在 NSView 周围出现 1px 灰色边框 并且似乎无法摆脱它 我遵
  • 将多个文件中的空格转换为制表符 Sublime Text 2

    有没有办法将所有空格转换为制表符 而不是逐个文件转换 如果我打开一个文件并浏览View gt Indentation gt Convert Indentation to Tabs 它仅更改此文件 我想将整个项目中的缩进转换为制表符 Use
  • 如何在 django 中提供创建的临时文件

    我有一个远程存储项目 当用户请求他的文件时 django 服务器会在本地检索该文件 用于某些处理 并将其存储为临时文件 然后使用 mod x sendfile 将其提供给用户 我当然希望临时文件在提供给用户后被删除 文件指出NamedTem
  • Flutter,如何删除对话框周围的空白?

    我在从服务器获取数据时调用此对话框 该对话框周围有空格 我可以删除对话框周围的空白区域吗 这是我的代码 var bodyProgress new Container decoration new BoxDecoration color Co
  • 使

    我在 SO 上看到过类似的主题 但我的略有不同 我试图让我的父级 DIV 和子级 保持焦点 直到我将注意力从 div 上移开 但这似乎很难完成 This solution https stackoverflow com a 3089045
  • 使用 Cocoa 的 Accessibility API 获取应用程序的 Dock 图标的位置

    如何使用 Accessibility API 获取应用程序的 Dock 图标的位置 找到了 使用这个论坛帖子 http cocoadev com forums discussion 1431 getting dock icon positi
  • Elasticsearch.Net 和超时

    我有一个 4 节点的 Elasticsearch 集群 我有一个 net 控制台应用程序 旨在用来自 sql 的数据填充集群 只要我将添加 或删除 记录的速度保持在相当低的水平 一切都会正常 如果我最终增加线程数 我将从控制台应用程序中看到
  • 无法在 AngularJS 中从控制器设置日期选择器日期

    我正在尝试按按钮创建一个 AngularJS 日期选择器 我在用this http angular ui github io bootstrap datepickerbootstrap ui 控件 该控件可以工作 单击按钮就会弹出 我可以选
  • 通过 Go (go 1.18) 泛型创建类型化值的新对象

    我正在 go 1 18 的 beta 版本中使用泛型 下面示例中的创建函数应该创建新的实例 T 所以 Apple 我尝试使用反射包来实现这一点 但没有运气 你能告诉我如何更改功能吗Create从下面的示例中 它创建了实例T而不是返回 nil
  • 将所有代码从 master 转移到新分支并从 master 中删除代码

    我的代码位于项目的主分支中 我希望代码位于单独的分支中 而不是在主分支中 我可以创建一个新的分支master 但是在创建分支之后 是否可以从主分支中删除所有代码 这样如果我稍后重新调整或将我的新分支合并到主分支中 就不会导致任何问题 谢谢
  • NodeJS javascript 中的异步响应循环

    我有一个使用 Express 4 的 NodeJS API 我使用 Sequelize 连接到数据库 并多次调用一个查询 我想将结果累积到一个数组中 问题是res send不等待循环结束来发送答案 my code router post p
  • 具有 SSE4.1 内在函数的双线性滤波器

    我现在正在尝试找出一种一次仅针对一个过滤样本的相当快速的双线性过滤函数 作为习惯使用内在函数的练习 最高可达 SSE41 就可以了 到目前为止我有以下内容 inline m128i DivideBy255 8xUint16 const m1
  • 查找 XML 节点集的最低公共祖先

    我有一个使用 XSLT 中的 xsl key 结构构造的节点集 我想找到该节点集中所有节点的最低共同祖先 LCA 有什么想法吗 我了解 Kaysian intersects 和 XPath 的 intersect 函数 但这些似乎是为了找到
  • Celery + SQS 两次接收相同的任务,同时具有相同的任务 ID

    在 Flask 应用程序中使用带有 SQS 的 celery but celery 同时接收两次具有相同任务 ID 的相同任务 像这样运行工人 celery worker A app jobs run l info pidfile var