我正在研究rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。如果我要手动 ACK/NACK 消息,我需要将重试计数保留在内存中(例如,通过使用关联 ID 作为映射中的唯一键),或者通过在消息中设置我自己的标头,然后重新传递它(因此将其放在队列的末尾)
然而,这是 spring 处理的情况。具体来说,我指的是 RetryInterceptorBuilder.stateful().maxAttempts(x)。这个计数是特定于 JVM 的,还是以某种方式操纵消息?
例如,我有一个 Web 应用程序部署到 2 台服务器,maxAttempts 设置为 5。重新交付总数是否有可能在 5 到 9 之间,具体取决于 2 台服务器中重新交付和重新处理的顺序服务器?
Rabbit/AMQP 不允许在基于拒绝重新排队时修改消息。
状态(基于 messageId)维护在RetryContextCache
;默认是一个MapRetryContextCache
。这并不真正适合“集群”,因为正如您所说,尝试可能取决于((maxAttempts - 1) * n + 1)
;另外,它还会导致内存泄漏(状态留在某些服务器上)。您可以配置一个SoftReferenceMapRetryContextCache
in the RetryTemplate
(RetryOperations
在构建器中)以避免内存泄漏,但这只能解决内存泄漏。
您需要使用自定义RetryContextCache
与一些持久共享存储(例如redis)。
我通常建议在这种情况下使用无状态恢复 - 重试完全在容器中完成,根本不涉及兔子(直到重试耗尽,在这种情况下,消息将被丢弃或发送到 DLX/DLQ,具体取决于代理配置) 。
如果您不关心消息顺序(并且我假设您没有竞争的消费者),一种有趣的技术是拒绝消息,将其发送到带有到期设置的 DLQ,并且当 DLQ 消息到期时,将其路由回原始队列的尾部(而不是头部)。在这种情况下,可以检查 x-death 标头以确定它已重试了多少次。
这个答案 https://stackoverflow.com/questions/28357820/rabbitmq-with-spring-amqp-messages-stuck-in-case-of-amqpexception/28358510#28358510 and this one https://stackoverflow.com/questions/27640358/spring-amqp-with-rabbitmq-message-is-not-circled-back-to-live-queue-after-falli/27641433#27641433有更多详细信息。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)