在SQS队列中使用许多消费者

2023-11-22

我知道可以使用多个线程来使用 SQS 队列。我想保证每条消息都会被消耗一次。我知道可以更改消息的可见性超时,例如等于我的处理时间。如果我的进程花费的时间超过可见性超时(例如连接速度慢),其他线程可以使用相同的消息。

保证消息被处理一次的最佳方法是什么?


保证消息被处理一次的最佳方法是什么?

你要求一个保证 - 你不会得到一个。您可以将一条消息被多次处理的概率降低到非常少量,但你不会得到保证.

我将解释原因以及减少重复的策略。

重复从哪里来

  1. When you put a message in SQS, SQS might actually receive that message more than once
    • 例如:发送消息时出现轻微的网络故障,导致暂时性错误,并自动重试 - 从消息发送者的角度来看,它失败了一次,发送成功了一次,但 SQS 收到了这两条消息。
  2. SQS can internally generate duplicates
    • 与第一个示例类似 - 有很多计算机在幕后处理消息,SQS 需要确保不会丢失任何内容 - 消息存储在多个服务器上,这可能会导致重复。

在大多数情况下,通过利用SQS消息可见性超时,来自这些来源的重复的可能性已经非常小了——小到百分之零点几。

如果处理重复确实不是that bad (努力让你的消息消费幂等!),我认为这已经足够好了 - 进一步减少重复的机会很复杂并且可能很昂贵......


您的应用程序可以采取哪些措施来进一步减少重复?

好吧,我们在这里深入探讨......在较高的层次上,您将需要为您的消息分配唯一的 id,并在开始处理之前检查正在进行或已完成的 id 的原子缓存:

  1. Make sure your messages have unique identifiers provided at insertion time
    • 如果没有这个,您将无法区分重复项。
  2. Handle duplication at the 'end of the line' for messages.
    • 如果您的消息接收者需要将消息发送到机外进行进一步处理,那么它可能是另一个重复源(出于与上述类似的原因)
  3. You'll need somewhere to atomically store and check these unique ids (and flush them after some timeout). There are two important states: "InProgress" and "Completed"
    • InProgress 条目应该有一个超时,具体取决于处理失败时需要恢复的速度。
    • 已完成的条目应该有一个超时,具体取决于您希望重复数据删除窗口的时长
    • 最简单的可能是番石榴缓存,但只适用于单个处理应用程序。如果您有大量消息或分布式消费,请考虑使用数据库来完成此作业(使用后台进程来扫描过期条目)
  4. 在处理消息之前,尝试将 messageId 存储在“InProgress”中。如果它已经存在,请停止 - 您刚刚处理了一个重复项。
  5. 检查消息是否“已完成”(如果存在则停止)
  6. 您的线程现在对该 messageId 具有独占锁 - 处理您的消息
  7. Mark the messageId as "Completed" - As long as this messageId stays here, you won't process any duplicates for that messageId.
    • 不过,您可能买不起无限的存储空间。
  8. 从“InProgress”中删除 messageId(或者只是让它从这里过期)

一些笔记

  • Keep in mind that chances of duplicate without all of that is already pretty low. Depending on how much time and money deduplication of messages is worth to you, feel free to skip or modify any of the steps
    • 例如,您可以省略“InProgress”,但这会导致两个线程同时处理重复消息的可能性很小(第二个线程在第一个线程“完成”之前启动)
  • 您的重复数据删除窗口只要您可以将 messageId 保留在“已完成”中即可。由于您可能无法承受无限存储,因此请使其持续时间至少为 SQS 消息可见性超时的 2 倍;此后重复的机会就会减少(除了已经非常低的机会之外,但仍然not保证)。
  • Even with all this, there is still a chance of duplication - all the precautions and SQS message visibility timeouts help reduce this chance to very small, but the chance is still there:
    • 您的应用程序可能会在处理消息后、messageId“完成”之前崩溃/挂起/执行很长的GC(也许您正在使用数据库进行此存储,并且与它的连接已关闭)
    • 在这种情况下,“处理”最终将过期,另一个线程可以处理此消息(在 SQS 可见性超时也过期之后,或者因为 SQS 中有重复消息)。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在SQS队列中使用许多消费者 的相关文章

  • Azure 服务总线中的死信队列中的消息是否会过期?

    Azure 服务总线中的死信队列中的消息是否会过期 一些解释 我有这些队列设置 var queueDescription new QueueDescription MyTestQueue RequiresSession false Defa
  • C# 创建函数队列

    我写了一个名为 QueueManager 的类 class QueueManager Queue functionsQueue public bool IsEmpty get if functionsQueue Count 0 return
  • Spring JMS监听器即使在异常时也会确认

    我正在使用 JMS 向 SQS 队列发送 接收消息 但是即使在使用 client acknowledge 时出现异常 我也无法重新传递消息 如何实现这一目标 我尝试了一个简单的测试 JmsListener destination test
  • 有队列实现吗?

    任何人都可以建议使用 Go 容器来实现简单快速的 FIF 队列 Go 有 3 种不同的容器 heap list and vector 哪一种更适合实现队列 事实上 如果您想要的是一个基本且易于使用的 fifo 队列 slice 可以满足您所
  • 通过 Amazon SQS 将压缩文本从 PHP 发送到 NodeJS

    我似乎一直坚持通过 Amazon SQS 将压缩消息从 PHP 发送到 NodeJS 在 PHP 方面我有 SQS gt sendMessage Array QueueUrl gt queueUrl MessageBody gt artic
  • 是否可以更改队列中的元素?

    假设我有一个整数队列 或任何 T 类 我可以更改队列中元素的值吗 更具体地说 如果我将队列定义如下 Queue
  • 消息队列与套接字

    我没有太多的套接字编程经验 但我尝试阅读一些相关内容 我对 MDB 和消息队列非常熟悉 有人告诉我队列 例如 MDB 只不过是直接套接字连接 有人可以帮我比较一下这两个吗 两者是无与伦比的 因为它们代表不同的layers 这就像将关系数据库
  • PHP 中消息队列和工作系统的有效架构?

    我正在尝试了解我想要在 PHP 应用程序中实现的消息队列模型和作业 我的目标是卸载需要发送到多个第三方 API 的消息 数据 因此访问它们不会减慢客户端的速度 所以将数据发送到消息队列是理想的 我考虑过仅使用 Gearman 来保存 MQ
  • SNS和SQS访问问题,收不到消息

    我已经经历过AWS SNS SQS 订阅 https docs aws amazon com sns latest dg sns send message to sqs cross account html多次说明 并浏览了一些不同的博客和
  • queue.empty 并在空时执行 put

    假设我有一个包含两个元素的队列 我使用 get 循环遍历队列弹出项目 我担心一旦弹出第二个元素 循环就会停止循环 并且由于某些错误 我需要重新处理它 所以我把它放回队列中 但它不会 因为那时队列是空的 My loop while not q
  • 像 AMQP 这样的面向消息的中间件在哪些领域有用?

    MOM 面向消息的中间件 解决什么问题 可扩展性 一体化 它们通常在哪些领域使用以及它们通常在哪些领域not used 例如 Google 是否将此类解决方案用于其主要搜索引擎或为 GMail 提供支持 沃尔玛 eBay FedEx 几乎是
  • 单台机器最快的 Perl IPC/消息队列是多少?

    我正在开发一个 主要 Perl 项目 并希望使用消息队列来相互隔离进程 我有这样的工作流程 输入 gt 接收器 gt 处理器 gt 输出 我需要每秒处理数百笔交易 所以速度是我最大的动力 对于这种类型的设置来说 最快的消息队列系统是什么 我
  • Dart 中的 DoubleLinkedQueue 和 ListQueue 有什么区别?

    Dart 核心 API 有两个类实现Queue
  • 无法从 EMR 中运行的 Spark 应用程序删除 AWS SQS 消息

    我正在 AWS EMR 集群中运行 Apache Spark 应用程序 该应用程序从 AWS SQS 检索消息 根据消息数据进行一些计算 然后删除每条消息 我正在具有 NAT 实例的私有子网上的 VPC 中运行 EMR 集群 我面临的问题是
  • std::queue 初始化为 NULL

    是否可以初始化一个C std queue with a NULL像其他变量一样的值 像这样 HANDLE variable NULL class Test i e std queue
  • Laravel 作业推送至 Amazon SQS 但未处理

    我正在运行 Laravel 5 3 我正在尝试测试队列作业 并且我已将队列配置为使用 Amazon SQS 我的应用程序能够将作业推送到队列中 并且我可以在 SQS 中看到该作业 但它留在那里 从未被处理 我尝试过跑步php artisan
  • 使用 Matplotlib、PyQt 和 Threading 进行实时绘图导致 python 崩溃

    我一直在努力研究我的 Python 应用程序 但找不到任何答案 我有 PyQT GUI 应用程序 它使用 Matplotlib 小部件 GUI 启动一个新线程来处理 mpl 小部件的绘图 恐怕我现在通过从另一个线程访问 matplotlib
  • .NET 中 UniqueQueue 和 UniqueReplacementQueue 集合最有效的实现

    考虑到入队和出队操作的速度同样重要 NET 中 UniqueQueue 和 UniqueReplacementQueue 集合最有效 就速度而言 的实现是什么 UniqueQueue是一个不可能出现重复的队列 因此 如果我将一个元素推送到队
  • 具有延迟的简单可扩展工作/消息队列

    我需要设置一个作业 消息队列 并可以选择为任务设置延迟 以便空闲工作人员不会立即拾取它 而是在一定时间后 可能因任务而异 我研究了几个 Linux 队列解决方案 rabbitmq gearman memcacheq 但它们似乎都没有提供开箱
  • AWS Lambda。延迟调用

    我需要使用亚马逊工具构建 任务调度程序 主要问题是我需要执行一次任务 延迟很大 可能是几个小时或几周 我尝试研究如何使用 CloudWatch 和 Lambda 函数构建它 据我了解 我需要使用单独的规则来执行一次超时 但 AWS 只允许我

随机推荐

  • Celery 任务可以在重启后继续存在吗?

    我需要构建一个处理两种类型任务的系统 一种类型可以创建更多其自身或另一种类型的任务 工作人员数量很少 2 3 并且只有一名主机 最重要的要求是系统应该优雅地处理重新启动 即重新启动时 正在进行的任务应该从头开始 并且工作人员应该选择重新启动
  • 在 XAML 中使用矩形形状作为剪辑

    有没有办法可以使用普通的矩形 形状 作为 XAML 中另一个对象的剪辑的一部分 看起来我应该能够做到 但解决方案却让我困惑
  • 在pyspark中读取Excel(.xlsx)文件

    我正在尝试从 PySpark 中的本地路径读取 xlsx 文件 我写了下面的代码 from pyspark shell import sqlContext from pyspark sql import SparkSession spark
  • 无法附加或安装卷:等待条件超时

    我本地集群中的一个 pod 无法启动 因为我得到Unable to attach or mount volumes unmounted volumes nats data volume unattached volumes nats dat
  • GWT 2.1 编辑器框架

    我正在寻找一些有关如何使用的文档或示例GWT 2 1 编辑器框架 谷歌的文档呃 有点缺乏 从有限的可用文档中 我了解到编辑器 理论上 允许您更轻松地将 GUI 元素绑定到数据模型 这将减轻将数据复制到 TextArea ListBox Ch
  • 新升级的声纳不显示项目或用户

    您好 我遇到的问题是从 5 1 2 gt 5 6 gt 6 4 升级后 我相信我遵循了记录的升级路径 该系统在 5 1 2 和 5 6 上运行良好 但现在在 6 4 上 加载第一件事的初始项目页面是空的 上面写着 一旦你分析了一些项目 它们
  • require: 'ngModel' 是什么意思?

    这是我的指令的 HTML
  • 如何在 Chrome 打包应用程序中设置 script-src?

    我正在尝试从复杂的网络应用程序创建 Chrome 打包应用程序 我目前收到错误 拒绝执行内联事件处理程序 因为它违反了 以下内容安全策略指令 default src self chrome extension resource 请注意 sc
  • 为什么访问令牌会过期?

    我刚刚开始使用 Google API 和 OAuth2 当客户端授权我的应用程序时 我会获得一个 刷新令牌 和一个短暂的 访问令牌 现在 每次访问令牌过期时 我都可以将刷新令牌发布给 Google 他们会给我一个新的访问令牌 我的问题是访问
  • 如何从以下 .htaccess 重写规则中排除特定文件类型?

    如何从以下重写规则中排除特定文件类型 xml 和 txt RewriteEngine On RewriteBase RewriteCond REQUEST FILENAME f RewriteCond REQUEST URI Rewrite
  • 如何为 Windows 安装 libjpeg?

    我下载了一些使用 libjpeg 的代码 但源文件中不包含 dll 我试图使其正确编译 执行 我使用的是VS2010 在我的源文件中我有 include jpeglib h and in Linker gt Input gt Additio
  • 使用 C# 中的函数返回两个字符串[重复]

    这个问题在这里已经有答案了 我有一个函数想要返回两个值 这可能吗 这是我的代码 但它似乎不喜欢我想返回两个值 public string PlayerCards string player1C1 string player1C2 gener
  • 无法连接到 Kubernetes 集群中的 mongodb 服务

    我在 Google Cloud 上有一个 Kubernetes 集群 有一个数据库服务 它在 mongodb 部署之前运行 我还有一系列微服务 它们正在尝试连接到该数据存储 然而 他们似乎找不到主人 apiVersion v1 kind S
  • Intellij 13 鼠标悬停解决方案上弹出的自动文档不再起作用[重复]

    这个问题在这里已经有答案了 我一直在使用这个解决方案适用于 IntelliJ 12 用于在鼠标悬停时自动弹出快速文档 但是新版本的 IntelliJ 13 当你设置auto show quick doc true in the idea p
  • Durandal:在合成过程中显示“正在加载...”

    当 activate 方法执行其操作时 我可以轻松地显示加载消息 如下所示 div div class text center style margin 75px i class fa fa spinner fa spin i div di
  • 使用 mamp 在本地测试电子邮件

    首先 我对这种本地主机服务器设置完全陌生 过去几天我一直在试图解决这个问题 但没有运气 我正在开发一个带有电子邮件表单的 php 网站 我在一个MAC using codekit and 免费的MAMP一切正常 现在我的问题是 是否可以在本
  • .Net 超时:WaitForSingleObject 与 Timer

    我正在异步操作 一系列网络 IO 上实现超时 但我不确定哪个 更好 从分配 性能 的角度来看 创建 EventWaitHandle 并使用 RegisterWaitForSingleObject 或者只是创建一个 Timer并使用它的Tic
  • 为什么没有 cv2.waitkey() 则 cv2.imshow() 不会渲染?

    如果没有 cv2 waitkey 方法 cv2 imshow 将显示黑色窗口 为什么不等待就无法正常渲染 cap cv2 VideoCapture video path while cap isOpened ret frame cap re
  • 如何在 Swing 应用程序中隐藏光标?

    有没有办法隐藏光标 除了使用透明图像作为光标之外 当用户将鼠标指向 JFrame 中的 JPanel 之外时 我想隐藏光标 看来Cursor类一开始就没有 空白 光标 因此可以使用以下命令定义一个新的 空白 光标Toolkit create
  • 在SQS队列中使用许多消费者

    我知道可以使用多个线程来使用 SQS 队列 我想保证每条消息都会被消耗一次 我知道可以更改消息的可见性超时 例如等于我的处理时间 如果我的进程花费的时间超过可见性超时 例如连接速度慢 其他线程可以使用相同的消息 保证消息被处理一次的最佳方法