我正在尝试以一种更非常规的方式使用 RabbitMq(尽管此时我可以根据需要选择任何其他消息队列实现)。消费者不会将 Rabbit 推送消息留给我的消费者,而是连接到一个队列并获取一批 N 条消息(在此期间它会消费一些消息,并可能拒绝一些消息),然后跳转到另一个队列,依此类推。这样做是为了冗余。如果某些消费者崩溃,所有消息都保证被其他消费者消费。
问题是我有多个消费者,我不希望他们竞争同一个队列。有没有办法保证队列上的锁?如果不是,我至少可以确保如果两个消费者连接到同一个队列,他们不会读取相同的消息吗?事务可能在某种程度上对我有帮助,但我听说它们将从 RabbitMQ 中删除。
其他架构建议也受到欢迎。
Thanks!
EDIT:正如评论中指出的,我需要如何处理消息有一个特殊性。它们只有在分组时才有意义,并且相关消息很有可能聚集在队列中。例如,如果我提取一批 100 条消息,那么我很有可能能够对消息 1-3、4-5,6-10 等执行某些操作。如果我无法找到某些消息的组,我会会将它们重新提交到队列中。 WorkQueue 不起作用,因为它将消息从同一组传播到多个工人,而这些工人不知道如何处理它们。
您看过这本免费的在线书籍吗?企业集成模式 http://eaipatterns.com/?
听起来您确实需要一个工作流程,在消息到达您的工作人员之前,您需要一个批处理组件。使用 RabbitMQ 有两种方法可以做到这一点。要么使用一种可以为您进行批处理的交换类型(和消息格式),要么拥有一个队列,以及一个对批次进行排序并将每个批次放入其自己的队列中的工作人员。批处理程序可能还应该向控制队列发送“批处理就绪”消息,以便工作人员可以发现新批处理队列的存在。处理完批次后,工作人员可以删除批次队列。
如果您可以控制消息格式,则可以让 RabbitMQ 通过多种方式隐式执行批处理。通过主题交换,您可以确保每条消息上的路由密钥的格式为 work.batchid.something,然后获悉批次 xxyzz 存在的工作人员将使用像 #.xxyzz.# 这样的绑定密钥来仅消费这些消息。无需重新发布。
另一种方法是在标头中包含批次 ID 并使用较新的标头交换类型。当然,如果您愿意编写少量的 Erlang 代码,您也可以实现自己的自定义交换类型。
不过,我确实建议您查看这本书,因为它比大多数人开始使用的典型工作队列概念更好地概述了消息传递架构。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)