10.回退消息

2023-05-16

rabbitmq回退消息

mandatory参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这件事情的。

可以通过设置mandatory参数在当消息传递过程中不可达目的地时将消息返回给生产者。

案例演示

添加application.properties配置文件

server.port=8080

spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

spring.rabbitmq.publisher-confirm-type=correlated

添加配置类

/**
 * 回退消息配置类
 */
@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";


    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("key1");
    }

}

添加回调接口

/**
 * 消息回退回调接口
 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {


    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId():"";
        if (ack){
            log.info("交换机已经收到id为:{}的消息",id);
        }else {
            log.info("交换机还未收到id为:{}的消息,由于原因:{}",id ,cause);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息:{}被服务器退回,退回原因:{},交换机是:{},路由key:{}",new String(returned.getMessage().getBody()),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());
    }
}

添加消费者

/**
 * 消息消费者
 */
@Component
@Slf4j
public class ConfirmConsumer {
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";

    @RabbitListener(queues = CONFIRM_QUEUE_NAME)
    public void receiveMsg(Message message){
        String msg = new String(message.getBody());
        log.info("接收到队列confirm.queue消息:{}",msg);
    }
}

添加生产者

/**
 * 消息生产者
 */
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MyCallBack myCallBack;

    /**
     * 初始化rabbitTemplate
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(myCallBack);
        /**
         *  true:交换机无法将消息进行路由时,会讲该消息返回给生产者
         *  false:如果发现消息无法进行路由,则直接丢弃
         */
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setReturnsCallback(myCallBack);
    }



    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        String routingKey = "key1";

        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData);

        CorrelationData correlationData2 = new CorrelationData("2");
        String routingKey2 = "key2";
        rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message+routingKey,correlationData2);

        log.info("发送消息内容:{}",message);
    }
}

结果

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

10.回退消息 的相关文章

随机推荐

  • ROS tf工具与消息查看命令

    TF工具坐标系统是一个基础理论 xff0c 但是涉及到多个空间的变换 xff0c 不容易进行想象所以TF工具给开发者调试提供很多方便 1 tf monitor xff1a 将当前的坐标系转换关系打印到终端控制台 rosrun tf tf m
  • melodic 打开gazebo出现[Err] [REST.cc:205] Error in REST request错误解决方法

    ROS melodic版本下打开gazebo出现 Err REST cc 205 Error in REST request错误解决方法 输入以下命令打开文件 sudo gedit ignition fuel config yaml 然后将
  • 技术资源汇总(一)

    1 Ubuntu技术论坛 xff1a https askubuntu com 2 树莓派资源 https www yahboom com study raspberry3B 密码 xff1a cf0p 汇总资料提取码 xff1a hdy7
  • docker常用命令

    1 配置docker阿里云镜像 1 打开daemon json文件 xff08 若没有此文件 xff0c 则创建 etc docker daemon json xff09 xff1a vi etc docker daemon json 2
  • 网络调试助手UDP广播问题

    用直接广播地址 xff08 192 168 xxx 255 端口 xff09 可以进行广播 xff1b 用受限广播地址 xff08 255 255 255 255 端口 xff09 显示没有指定有效的远程主机端口 xff0c 搞了好久发现是
  • “平衡小车之家”家的STM32F103最小系统源代码分享

    在网上寻找了好久 xff0c 因为他家的开发板自带有mpu6050模块 故想测试其精准度以及z轴漂移程度 发现也有很大的漂移 代码如下 main c部分 xff1a span class token macro property span
  • 使用PMW3901和VL53L1X 实现室内定点悬停

    使用PMW3901和VL53L1X 实现室内定点悬停 使用PMW3901 光流传感器进行水平方向定位Pixhawk连接PMW3901传感器PX4源代码加入PMW3901驱动后重新编译QGroundControl中的配置 使用气压计和VL53
  • 使用 QGroundControl 地面站更新 PixHawk飞控的Bootloader

    安装最新版本的PX4固件 启动QGroundControl并且使用USB连接到Pixhawk飞控 选择 Q icon gt Vehicle Setup gt Firmware sidebar 打开固件设置 安装最新版本的PX4固件 更新Bo
  • 自制DIY 机器狗 完全教程 - MIT猎豹Cheetah

    自制DIY 机器狗 完全教程 MIT猎豹Cheetah 背景结构设计模块化关节电机性能考虑关节结构 四足平台设计腿部设计身体设计脚部设计 硬件设计关节驱动器通信总线板供电系统 控制系统人工智能 背景 3年前 xff0c MIT开源了世界上跑
  • centos安装wxWidgets,erlang,RabbitMq

    centos安装wxWidgets erlang RabbitMq 默认已经安装了java环境 而安装RabbitMq需要安装erlang xff0c 安装erlang又需要安装wxWidgets 安装wxWidgets 更新系统 yum
  • 2.rabbitmq概述和helloworld

    rabbitmq概述 rabbitmq中的几个概念 BROKER 接收和分发消息的应用 xff0c RabbitMQ Server 就是 Message Broker Virtual Host 出于多租户和安全因素设计的 xff0c 把 A
  • 3.rabbitmq轮询和不公平分发

    rabbitmq轮询和不公平分发 rabbitmq轮询分发 rabbitmq默认是使用轮询来分发消息的 测试代码如下所示 生产者代码 span class token comment 生产者 task rabbitmq 轮询演示 span
  • 4.rabbitmq消息应答

    rabbitmq消息应答 概述 消息应答就是消费者在收到消息的时候 xff0c 在它接收到消息并处理完毕之后 xff0c 告诉rabbitmq它已经处理完了 xff0c rabbitmq可以删除这个消息了 消息应答的方式 channel b
  • 5.rabbitmq持久化

    rabbitmq持久化 队列的持久化 队列的持久化需要我们在声明的时候指定其持久化 使用durable 61 true来持久化队列 span class token comment 队列的持久化 span span class token
  • 关于双控阵列的实现原理的讨论

    xfeff xfeff http bbs chinaunix net forum viewthread tid 4140392 html 对于一个支持FC SAN的双控存储阵列 xff0c 对外号称active active xff0c 实
  • 6.rabbitmq中exchange的几种形式

    rabbitmq中exchange的几种形式 RabbitMQ 消息传递模型的核心思想是 生产者生产的消息从不会直接发送到队列 实际上 xff0c 通常生产 者甚至都不知道这些消息传递传递到了哪些队列中 相反 xff0c 生产者只能将消息发
  • 7.rabbitmq死信和死信队列

    rabbitmq死信和死信队列 概述 先从概念解释上搞清楚这个定义 xff0c 死信 xff0c 顾名思义就是无法被消费的消息 xff0c 字面意思可以这样理 解 xff0c 一般来说 xff0c producer 将消息投递到 broke
  • 8.rabbitmq发布确认

    rabbitmq发布确认 生产者将信道设置成 confirm 模式 xff0c 一旦信道进入 confirm 模式 xff0c 所有在该信道上面发布的 消息都将会被指派一个唯一的 ID 从 1 开始 xff0c 一旦消息被投递到所有匹配的队
  • 9.延迟队列

    延迟队列 延迟队列的概念 延时队列 队列内部是有序的 xff0c 最重要的特性就体现在它的延时属性上 xff0c 延时队列中的元素是希望 在指定时间到了以后或之前取出和处理 xff0c 简单来说 xff0c 延时队列就是用来存放需要在指定时
  • 10.回退消息

    rabbitmq回退消息 mandatory参数 在仅开启了生产者确认机制的情况下 xff0c 交换机接收到消息后 xff0c 会直接给消息生产者发送确认消息 xff0c 如果发现该消息不可路由 xff0c 那么消息会被直接丢弃 xff0c