使用spring,我是rabbitmq的新手,我想知道我错在哪里。
我编写了一个rabbitmq连接工厂和一个包含侦听器的侦听器容器。我还为侦听器容器提供了错误处理程序,但它似乎不起作用。
我的春豆:
<rabbit:connection-factory id="RabbitMQConnectionFactory" virtual-host="${rabbitmq.vhost}" host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}" password="${rabbitmq.password}"/>
<rabbit:listener-container missing-queues-fatal="false" declaration-retries="0" error-handler="errorHandlinginRabbitMQ" recovery-interval="10000" auto-startup="${rabbitmq.apc.autostartup}" max-concurrency="1" prefetch="1" concurrency="1" connection-factory="RabbitMQConnectionFactory" acknowledge="manual">
<rabbit:listener ref="apcRabbitMQListener" queue-names="${queue.tpg.rabbitmq.destination.apc}" exclusive="true" />
</rabbit:listener-container>
<bean id="errorHandlinginRabbitMQ" class="RabbitMQErrorHandler"/>
这是我的 RabbitMQ ErrorHandler 类:
public class RabbitMQErrorHandler implements ErrorHandler
{
@Override
public void handleError(final Throwable exception)
{
System.out.println("error occurred in message listener and handled in error handler" + exception.toString());
}
}
我假设的是,如果我向连接工厂提供无效凭据,则应该执行 RabbitMQErrorHandler 类的 handleError 方法,并且服务器应该正确启动,但是,当我尝试运行服务器时,该方法不会执行(例外是抛出在控制台中)并且服务器无法启动。我在哪里遗漏了一些东西,那可能是什么?
错误处理程序用于处理消息传递过程中的错误;由于您尚未连接,因此没有可处理错误的消息。
要获取连接异常,您应该实现ApplicationListener<ListenerContainerConsumerFailedEvent>
如果将失败作为 bean 添加到应用程序上下文中,您将收到该失败事件。
如果您实施,您将收到其他事件(消费者启动、消费者停止等)ApplicationListener<AmqpEvent>
.
EDIT
<rabbit:listener-container auto-startup="false">
<rabbit:listener id="fooContainer" ref="foo" method="handleMessage"
queue-names="si.test.queue" />
</rabbit:listener-container>
<bean id="foo" class="com.example.Foo" />
Foo:
public class Foo {
public final CountDownLatch latch = new CountDownLatch(1);
public void handleMessage(String foo) {
System.out.println(foo);
this.latch.countDown();
}
}
App:
@SpringBootApplication
@ImportResource("context.xml")
public class So43208940Application implements CommandLineRunner {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(So43208940Application.class, args);
context.close();
}
@Autowired
private SimpleMessageListenerContainer fooContainer;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private RabbitTemplate template;
@Autowired
private Foo foo;
@Override
public void run(String... args) throws Exception {
this.connectionFactory.setUsername("junk");
try {
this.fooContainer.start();
}
catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(5000);
this.connectionFactory.setUsername("guest");
this.fooContainer.start();
System.out.println("Container started");
this.template.convertAndSend("si.test.queue", "foo");
foo.latch.await(10, TimeUnit.SECONDS);
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)