rabbitmq回退消息
mandatory参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这件事情的。
可以通过设置mandatory参数在当消息传递过程中不可达目的地时将消息返回给生产者。
案例演示
添加application.properties配置文件
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirm-type=correlated
添加配置类
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
添加回调接口
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId():"";
if (ack){
log.info("交换机已经收到id为:{}的消息",id);
}else {
log.info("交换机还未收到id为:{}的消息,由于原因:{}",id ,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息:{}被服务器退回,退回原因:{},交换机是:{},路由key:{}",new String(returned.getMessage().getBody()),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());
}
}
添加消费者
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info("接收到队列confirm.queue消息:{}",msg);
}
}
添加生产者
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message+routingKey,correlationData2);
log.info("发送消息内容:{}",message);
}
}
结果
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)