我正在寻找一种使用 spring amqp 和 Rabbit MQ 的退避策略来实现重试的好方法,但要求是侦听器不应被阻止(因此可以自由地处理其他消息)。我在这里看到了类似的问题,但它不包括“后退”的解决方案:
RabbitMQ 和 Spring amqp 重试而不阻塞消费者 https://stackoverflow.com/questions/24460651/rabbitmq-spring-amqp-retry-without-blocking-consumers
我的问题是:
-
默认的 spring-retry 实现在重试时是否会阻塞线程?这在github上实现 https://github.com/spring-projects/spring-retry/blob/c2c3acbf6bb216dc1278fa7561daf352be12ecf6/src/main/java/org/springframework/retry/backoff/ExponentialBackOffPolicy.java#L181表明确实如此。
-
如果上述假设成立,那么唯一的方法是实现一个单独的重试队列(DLQ?),并为每个消息设置一个 TTL(假设我们不想在退避间隔内阻塞线程)。
-
如果我们采用上面的方法(DLQ 或单独的队列),是否每次重试都需要单独的队列?如果我们只使用 1 个队列进行重试,则同一个队列将包含 TTL 范围从最小重试间隔到最大重试间隔的消息,如果队列前面的消息具有最大 TTL,则后面的消息将不会被重试。即使它有最小 TTL,也会被拾取。这是根据 Rabbit MQ TTL 文档here https://www.rabbitmq.com/ttl.html(参见注意事项):
-
有没有另一种方法来实现非阻塞的退避重试机制?
添加一些配置信息以帮助解决 @garyrussel 问题:
队列配置:
<rabbit:queue name="regular_requests_queue"/>
<rabbit:queue name="retry_requests_queue">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="regular_exchange" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:direct-exchange name="regular_exchange">
<rabbit:bindings>
<rabbit:binding queue="regular_requests_queue" key="regular-request-key"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:direct-exchange name="retry_exchange">
<rabbit:bindings>
<rabbit:binding queue="retry_requests_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="retryRecoverer" class="com.testretry.RetryRecoverer">
<constructor-arg ref="retryTemplate"/>
<constructor-arg value="retry_exchange"/>
</bean>
<rabbit:template id="templateWithOneRetry" connection-factory="connectionFactory" exchange="regular_exchange" retry-template="retryTemplate"/>
<rabbit:template id="retryTemplate" connection-factory="connectionFactory" exchange="retry_exchange"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="1"/>
</bean>
</property>
</bean>
- Yes
- 通过 4 ...
您可以使用 retry max attempts = 1 和子类RepublishMessageRecoverer
并实施additionalHeaders
要添加,请说重试计数标头。
然后,您可以为每次尝试重新发布到不同的队列。
恢复器的结构并不是真正可以发布到不同的队列(我们应该更改它),因此您可能需要编写自己的恢复器并委托给多个队列之一RepublishMessageRecoverer
.
考虑贡献 https://github.com/spring-projects/spring-amqp#contributing-to-spring-amqp您对框架的解决方案。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)