好的,这里是正在发生的事情的概述:
M <-- Message with unique id of 1234
|
+-Start Queue
|
|
| <-- Exchange
/|\
/ | \
/ | \ <-- bind to multiple queues
Q1 Q2 Q3
\ | / <-- start of the problem is here
\ | /
\ | /
\|/
|
Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
|
C <-- Consumer
所以我有一个推送到多个队列的交换器,每个队列都有一个任务,一旦所有任务完成,只有队列4才能启动。
因此,具有唯一 id 1234 的消息被发送到交换器,交换器将其路由到所有任务队列(Q1、Q2、Q3 等...),当消息 id 1234 的所有任务都已完成时,运行 Q4 来获取消息编号 1234。
我怎样才能实现这个?
使用 Symfony2、RabbitMQBundle 和 RabbitMQ 3.x
资源:
- http://www.rabbitmq.com/tutorials/amqp-concepts.html http://www.rabbitmq.com/tutorials/amqp-concepts.html
- http://www.rabbitmq.com/tutorials/tutorial-6-python.html http://www.rabbitmq.com/tutorials/tutorial-six-python.html
更新#1
好吧,我想这就是我正在寻找的:
- https://github.com/videlalvaro/Thumper/tree/master/examples/parallel_processing https://github.com/videlalvaro/Thumper/tree/master/examples/parallel_processing
具有并行处理功能的 RPC,但是如何将 Correlation Id 设置为我的唯一 ID 来对消息进行分组并识别哪个队列?
你需要实现这个:http://www.eaipatterns.com/Aggregator.html http://www.eaipatterns.com/Aggregator.html但是 Symfony 的 RabbitMQBundle 不支持这一点,因此您必须使用底层的 php-amqplib。
来自捆绑包的普通消费者回调将获得 AMQPMessage。从那里您可以访问通道并手动发布到“管道和过滤器”实施中接下来出现的任何交换
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)