使用spring-amqp和rabbitmq实现带退避的非阻塞重试

2024-03-01

我正在寻找一种使用 spring amqp 和 Rabbit MQ 的退避策略来实现重试的好方法,但要求是侦听器不应被阻止(因此可以自由地处理其他消息)。我在这里看到了类似的问题,但它不包括“后退”的解决方案:

RabbitMQ 和 Spring amqp 重试而不阻塞消费者 https://stackoverflow.com/questions/24460651/rabbitmq-spring-amqp-retry-without-blocking-consumers

我的问题是:

  1. 默认的 spring-retry 实现在重试时是否会阻塞线程?这在github上实现 https://github.com/spring-projects/spring-retry/blob/c2c3acbf6bb216dc1278fa7561daf352be12ecf6/src/main/java/org/springframework/retry/backoff/ExponentialBackOffPolicy.java#L181表明确实如此。

  2. 如果上述假设成立,那么唯一的方法是实现一个单独的重试队列(DLQ?),并为每个消息设置一个 TTL(假设我们不想在退避间隔内阻塞线程)。

  3. 如果我们采用上面的方法(DLQ 或单独的队列),是否每次重试都需要单独的队列?如果我们只使用 1 个队列进行重试,则同一个队列将包含 TTL 范围从最小重试间隔到最大重试间隔的消息,如果队列前面的消息具有最大 TTL,则后面的消息将不会被重试。即使它有最小 TTL,也会被拾取。这是根据 Rabbit MQ TTL 文档here https://www.rabbitmq.com/ttl.html(参见注意事项):

  4. 有没有另一种方法来实现非阻塞的退避重试机制?

添加一些配置信息以帮助解决 @garyrussel 问题:

队列配置:

    <rabbit:queue name="regular_requests_queue"/>
    <rabbit:queue name="retry_requests_queue">
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="regular_exchange" />
        </rabbit:queue-arguments>
    </rabbit:queue>

    <rabbit:direct-exchange name="regular_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="regular_requests_queue" key="regular-request-key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:direct-exchange name="retry_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="retry_requests_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <bean id="retryRecoverer" class="com.testretry.RetryRecoverer">
         <constructor-arg ref="retryTemplate"/>
         <constructor-arg value="retry_exchange"/>
    </bean>

    <rabbit:template id="templateWithOneRetry" connection-factory="connectionFactory" exchange="regular_exchange" retry-template="retryTemplate"/>
    <rabbit:template id="retryTemplate" connection-factory="connectionFactory" exchange="retry_exchange"/>

    <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
        <property name="retryPolicy">
            <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                <property name="maxAttempts" value="1"/>
            </bean>
        </property>
    </bean>

  1. Yes
  2. 通过 4 ...

您可以使用 retry max attempts = 1 和子类RepublishMessageRecoverer并实施additionalHeaders要添加,请说重试计数标头。

然后,您可以为每次尝试重新发布到不同的队列。

恢复器的结构并不是真正可以发布到不同的队列(我们应该更改它),因此您可能需要编写自己的恢复器并委托给多个队列之一RepublishMessageRecoverer.

考虑贡献 https://github.com/spring-projects/spring-amqp#contributing-to-spring-amqp您对框架的解决方案。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用spring-amqp和rabbitmq实现带退避的非阻塞重试 的相关文章

随机推荐