我知道可以使用多个线程来使用 SQS 队列。我想保证每条消息都会被消耗一次。我知道可以更改消息的可见性超时,例如等于我的处理时间。如果我的进程花费的时间超过可见性超时(例如连接速度慢),其他线程可以使用相同的消息。
保证消息被处理一次的最佳方法是什么?
保证消息被处理一次的最佳方法是什么?
你要求一个保证 - 你不会得到一个。您可以将一条消息被多次处理的概率降低到非常少量,但你不会得到保证.
我将解释原因以及减少重复的策略。
重复从哪里来
- When you put a message in SQS, SQS might actually receive that message more than once
- 例如:发送消息时出现轻微的网络故障,导致暂时性错误,并自动重试 - 从消息发送者的角度来看,它失败了一次,发送成功了一次,但 SQS 收到了这两条消息。
-
SQS can internally generate duplicates
- 与第一个示例类似 - 有很多计算机在幕后处理消息,SQS 需要确保不会丢失任何内容 - 消息存储在多个服务器上,这可能会导致重复。
在大多数情况下,通过利用SQS消息可见性超时,来自这些来源的重复的可能性已经非常小了——小到百分之零点几。
如果处理重复确实不是that bad (努力让你的消息消费幂等!),我认为这已经足够好了 - 进一步减少重复的机会很复杂并且可能很昂贵......
您的应用程序可以采取哪些措施来进一步减少重复?
好吧,我们在这里深入探讨......在较高的层次上,您将需要为您的消息分配唯一的 id,并在开始处理之前检查正在进行或已完成的 id 的原子缓存:
- Make sure your messages have unique identifiers provided at insertion time
- Handle duplication at the 'end of the line' for messages.
- 如果您的消息接收者需要将消息发送到机外进行进一步处理,那么它可能是另一个重复源(出于与上述类似的原因)
- You'll need somewhere to atomically store and check these unique ids (and flush them after some timeout). There are two important states: "InProgress" and "Completed"
- InProgress 条目应该有一个超时,具体取决于处理失败时需要恢复的速度。
- 已完成的条目应该有一个超时,具体取决于您希望重复数据删除窗口的时长
- 最简单的可能是番石榴缓存,但只适用于单个处理应用程序。如果您有大量消息或分布式消费,请考虑使用数据库来完成此作业(使用后台进程来扫描过期条目)
- 在处理消息之前,尝试将 messageId 存储在“InProgress”中。如果它已经存在,请停止 - 您刚刚处理了一个重复项。
- 检查消息是否“已完成”(如果存在则停止)
- 您的线程现在对该 messageId 具有独占锁 - 处理您的消息
- Mark the messageId as "Completed" - As long as this messageId stays here, you won't process any duplicates for that messageId.
- 从“InProgress”中删除 messageId(或者只是让它从这里过期)
一些笔记
- Keep in mind that chances of duplicate without all of that is already pretty low. Depending on how much time and money deduplication of messages is worth to you, feel free to skip or modify any of the steps
- 例如,您可以省略“InProgress”,但这会导致两个线程同时处理重复消息的可能性很小(第二个线程在第一个线程“完成”之前启动)
- 您的重复数据删除窗口只要您可以将 messageId 保留在“已完成”中即可。由于您可能无法承受无限存储,因此请使其持续时间至少为 SQS 消息可见性超时的 2 倍;此后重复的机会就会减少(除了已经非常低的机会之外,但仍然not保证)。
- Even with all this, there is still a chance of duplication - all the precautions and SQS message visibility timeouts help reduce this chance to very small, but the chance is still there:
- 您的应用程序可能会在处理消息后、messageId“完成”之前崩溃/挂起/执行很长的GC(也许您正在使用数据库进行此存储,并且与它的连接已关闭)
- 在这种情况下,“处理”最终将过期,另一个线程可以处理此消息(在 SQS 可见性超时也过期之后,或者因为 SQS 中有重复消息)。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)