我是新手,但我会尽量简洁。
{INPUT QUEUE}->[INBOUND-GATEWAY-1]-->[ROUTER]----------->(ACTIVATOR)<---------------
\ /
\-->{HOLD QUEUE}--->[INBOUND-GATEWAY-2]--^
我遇到的情况是,我必须像前者一样动态更改流程中的路由条件。来自队列的消息被发送到激活器进行处理,或者发送到另一个队列进行搁置。在某个时间,我必须关闭 INBOUND-GATEWAY-1,以便没有新消息进入流中,并打开 INBOUND-GATEWAY-2 以让来自 HOLD QUEUE 的所有消息得到处理。一旦 HOLD QUEUE 中的所有消息都被使用,两个网关都必须像以前一样关闭/打开。这里的问题是我如何知道 HOLD QUEUE 何时为空,以便我可以触发一个可以启动 gateway-1 的方法?
如果有人能帮助我,我将不胜感激。
提前致谢
经过一番调试和阅读,终于找到了解决这个问题的方法。入站网关是一个 JmsMessageDrivenEndpoint,基于两个内部组件:MessageListenerContainer 和 MessageListener。 MessageListenerContainer 是负责调度 MessageListener 行为的人,因此,重写 noMessageReceived 和 messageReceived,并添加一些属性来控制所需的行为,我可以实现“魔法”。
我的 MessageListenerContainer 实现是这样的。
public class ControlMessageListenerContainer extends DefaultMessageListenerContainer{
private JmsMessageDrivenEndpoint mainInputGateway;
private long timeOut;
private long lastTimeReceived;
public PassControlMessageListenerContainer() {
this.setAutoStartup(false);
}
@Override
public void start() throws JmsException {
/*When the container is started the lastTimeReceived is set to actial time*/
lastTimeReceived = (new Date()).getTime();
super.start();
}
@Override
protected void noMessageReceived(Object invoker, Session session) {
long actualTime = (new Date()).getTime();
if((actualTime - lastTimeReceived) >= timeOut
&& mainInputGateway != null && !mainInputGateway.isRunning()){
mainInputGateway.start();
}
super.noMessageReceived(invoker, session);
}
@Override
protected void messageReceived(Object invoker, Session session) {
/*lastTimeReceived is set again to actual time at new message arrive*/
lastTimeReceived = (new Date()).getTime();
super.messageReceived(invoker, session);
}
}
最后,spring bean 配置如下:
<bean id="listenerContainer"
class="org.merol.ControlMessageListenerContainer">
<property name="mainInputGateway" ref="mainGateway" />
<property name="destination" ref="onHoldQueue" />
<property name="timeOut" value="10000"/>
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="messageListener"
class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener">
<property name="requestChannel" ref="outputChannel" />
</bean>
<bean id="inboundGateway"
class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
<constructor-arg name="listenerContainer" ref="listenerContainer" />
<constructor-arg name="listener" ref="messageListener" />
</bean>
希望这对其他人有帮助。
感谢@Nicholas 提供的线索。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)