我有一个发布-订阅用例,我想在发布端进行阻止,直到每个订阅者确认他们已完成处理发布者发送的消息。
我(错误地?)假设我可以使用 RabbitMQ 及其 Java amqp-client 的 Channel.waitForConfirmsOrDie 方法作为我的解决方案的一部分。问题是我还没有发现 waitForConfirmsOrDie 实际上会阻塞的情况。
根据javadocs http://www.rabbitmq.com/javadoc/com/rabbitmq/client/Channel.html#waitForConfirmsOrDie%28%29, waitForConfirmsOrDie 应该:
等到自上次调用以来发布的所有消息均已被代理确认或拒绝。如果任何消息被拒绝,waitForConfirmsOrDie 将抛出 IOException。当在非Confirm通道上调用时,会立即返回。
为了测试这个方法是否真的有效,我开始了此示例代码来自 RabbitMQ 网站 http://hg.rabbitmq.com/rabbitmq-java-client/raw-file/da241e35951b/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java.
该示例代码创建了一个发布者和一个消费者,每个发布者和消费者都在自己单独的线程上。然后发布者将消息发送到交换器,而消费者则消费消息。看来发布者应该阻塞,直到通过调用 waitForConfirmsOrDie() 确认所有消息。
这个示例代码似乎与我想要做的事情完美匹配。但是,它似乎并不像我想象的那样工作。事实上,如果在消费者线程中,我关闭自动确认消息,那么 waitForConfirmsOrDie() 仍然会立即返回。
我通过将一个 false 更改为 true 来关闭自动确认:ch.queueDeclare(QUEUE_NAME, false, false, false, null);
变成ch.queueDeclare(QUEUE_NAME, true, false, false, null);
(第二个参数为 false 而不是 true)。我相信这意味着消费者不应再发送确认。
那么 waitForConfirmsOrDie() 实际上做了什么?什么时候会阻塞?
如果 waitForConfirmsOrDie 没有执行我想要的操作,有没有办法让发布者等到所有订阅者都确认消息后再继续?