我正在测试Spring-AMQP
with Spring-Integration
支持,我有以下配置和测试:
<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
<int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>
<int-amqp:inbound-channel-adapter
channel="consumingChannel"
queue-names="durableQ"
connection-factory="connectionFactory"
concurrent-consumers="1"
acknowledge-mode="AUTO"
/>
public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
"classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");
PollableChannel consumingChannel = context.getBean("consumingChannel",
PollableChannel.class);
int count = 0;
while (true) {
Message<?> msg = consumingChannel.receive(1000);
System.out.println((count++) + " \t -> " + msg);
try { //sleep to check number of messages in queue
Thread.sleep(50000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
在此配置中,很明显,一旦消息到达consumingChannel
它们被确认并因此从队列中删除。我通过放置一个高值来验证这一点sleep
after receive
并检查queue-size
。对此没有进一步的控制。
现在如果我设置acknowledge-mode=MANUAL
,似乎没有办法手动操作ack
通过弹簧集成。
我的需要是处理消息并在处理后执行manual-ack
所以直到ack
消息仍然保留在durableQ
.
有什么办法可以处理吗MANUAL
确认spring-amqp-integration
?我想避免通过ChannelAwareMessageListener
to inbound-channel-adapter
因为我想控制消费者receive
.
Update:
使用自己的时甚至似乎不可能listener-container
with inbound-channel-adapter
:
// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" />
<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueNames" value="durableQ" />
<property name="acknowledgeMode" value="MANUAL" />
// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
<property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>
上面的配置会引发错误messageListener
不允许使用属性,请参阅标签上的内联注释。所以使用目的listner-container
被击败(因为暴露channel
via ChannelAwareMessageListener
).
To me spring-integration
不能用于manual-acknowledgement
(我知道,这是一个很难说的说法!),任何人都可以帮助我验证这一点,或者是否有我所缺少的特定方法/配置?