消息中间件如何保证消息不丢失

2023-05-16

一、消息队列MQ的三个阶段

1、生产者发送消息到MQ

2、MQ存储消息到内存或者硬盘

3、消费者消费消息

由于网络的原因、服务器的原因、程序的原因等等,在每个阶段都有可能引起消息的丢失:

1、生产者发送消息到MQ:这个阶段可能由于网络延迟导致mq消息丢失

2、MQ存储消息到内存或者硬盘:Broker将消息先放到内存,然后根据刷盘策略持久化到硬盘上,但是刚收到消息,还没持久化到硬盘服务器宕机了,消息就会丢失

3、消费者消费消息:MQ由于网络原因在传输过程中把消息传丢了,同时MQ也从队列中把消息删除了,或者消费者消费失败消息丢失了

接下来详细描述一下三大主流队列:rocketmq、rabbitmq、kafka是如何保证消息不丢失的

二、RocketMQ

1、第一阶段:消息发送到MQ

选择不同的消息模式发送消息

RocketMQ发送消息有三种模式,即同步发送,异步发送、单向发送。

  • 同步发送消息:会同步阻塞等待Broker返回发送结果,发送失败不会收到发送结果SendResult,这种是最可靠的发送方式。
  • 异步发送消息:可以在回调方法中得知发送结果。
  • 单向发送消息:发送完之后就不管了,不管发送成功没成功,是最不可靠的一种方式。

同步和异步都是可靠的消息发送模式,应用可以根据消息的重要性和不同场景,选择不同的模式。

   /**
     * @description: 单向发送 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
     */
    public void sendMq() {
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.convertAndSend("RocketMQ-test", "测试-单向消息发送-" + i);
        }
    }
 
/***********************************************************************************/
  /**
     * @description: 同步发送 可靠性同步地发送 使用的比较广泛,比如:重要的消息通知,短信通知。

     */
    public void sync() {
        SendResult sendResult = rocketMQTemplate.syncSend("RocketMQ-test", "sync同步发送消息。");
        log.info("发送结果:{}", sendResult);
    }
 
/***********************************************************************************/
 /**
     * @description: 异步发送
     * 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
     */
    public void async() {
        String msg = "async异步发送消息。";
        log.info(">msg:<<" + msg);
        rocketMQTemplate.asyncSend("RocketMQ-test", msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                log.info("异步发送成功{}", var1);
            }
 
            @Override
            public void onException(Throwable var1) {
                //发送失败可以执行重试
                log.info("异步发送失败{}", var1);
            }
        });
    }

消息生产者发送消息失败的重试机制

RocketMQ为生产者提供了失败重试机制,同步发送和异步发送默认都是失败重试两次,当然可以修改重试次数,如果多次还是失败,那么可以采取记录这条信息,然后人工采取补偿机制。 

 可以通过配置来灵活的设置消息发送失败重试次数

 2、第二阶段:MQ存储消息到内存或者硬盘

刷盘策略

RocketMq持久化消息有两种策略,即同步刷盘和异步刷盘。通过修改配置文件来完成:ASYNC_FLUSH=异步刷盘(默认方式),SYNC_FLUSH=同步刷盘。

异步刷盘:此模式下当生产者把消息发送到broker,消息存到内存之后就认为消息发送成功了,就会返回给生产者消息发送成功的结果。但是如果消息还没持久化到硬盘,服务器宕机了,那么消息就会丢失。

同步刷盘:当Broker接收到消息并且持久化到硬盘之后才会返回消息发送成功的结果,这样就会保证消息不会丢失,但是同步刷盘相对于异步刷盘来说效率上有所降低,大概降低10%,具体情况根据业务需求设定吧。

3、第三阶段:消费者消费消息

手动ack

/**
 * @description: 消费端确认消息消费成功的消费者
 */
@Component
@Slf4j
public class consumerAck implements MessageListenerConcurrently {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg:msgs){
           log.info("接收到的消息是-----{}",new String(msg.getBody()));
        }
        //MQ收到SUCCESS即认为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

消费者消费失败重试机制

消费者消费失败会自动重试,如果消费失败并且没有手动ack则会自动重试15次。

三、RabbitMQ

1、第一阶段:消息发送到MQ

事务机制:类似RocketMQ的同步消息发送模式,会降低系统的性能,一般不采用

    public static void main(String[] args) {
        try {
            System.out.println("生产者启动成功..");
            // 1.创建连接
            connection = MyConnection.getConnection();
            // 2.创建通道
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String msg = "测试事务机制保证消息发送可靠性。。。。";
            channel.txSelect(); //开启事务
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
            //发生异常时,mq中并没有新的消息入队列
            //int i=1/0;
            //没有发生异常,提交事务
            channel.txCommit();
            System.out.println("生产者发送消息成功:" + msg);
        } catch (Exception e) {
            e.printStackTrace();
            //发生异常则回滚事务
            try {
                if (channel != null) {
                    channel.txRollback();
                }
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
 
        }
    }

确认机制:当mq收到生产者发送的消息时,会返回一个ack告知生产者,收到了这条消息,如果没有收到,那就采取重试机制后者其他方式补偿。

 #开启生产者确认模式
 publisher-confirm-type: correlated
 # 打开消息返回,如果投递失败,会返回消息
 publisher-returns: true  
 
 #publisher-confirm-type有3种取值
 #NONE值是禁用发布确认模式,是默认值
 #CORRELATED值是发布消息成功到交换器后会触发回调方法
 #SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法 
@Component
@Slf4j
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    @PostConstruct
    public void init() {
        //指定 ConfirmCallback
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
 
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("correlation--{},ack--{},cause--{}", correlationData, ack, cause);
        if (ack) {
            //确认收到消息
        } else {
            //收到消息失败,可以自定义重试机制,或者将失败的存起来,进行补偿
        }
    }
 
    /*
     *
     * @param returnedMessage
     * 消息是否从Exchange路由到Queue, 只有消息从Exchange路由到Queue失败才会回调这个方法
     * @return void
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("被退回信息是》》》》》》{}", returnedMessage.getMessage());
        log.info("replyCode》》》》》》{}", returnedMessage.getReplyCode());
        log.info("replyText》》》》》》{}", returnedMessage.getReplyText());
        log.info("exchange》》》》》》{}", returnedMessage.getExchange());
        log.info("routingKey>>>>>>>{}", returnedMessage.getRoutingKey());
    }
}

重试机制:默认3次,可以修改重试次数,超过了最大重试次数限制采取人工补偿机制。

2、第二阶段:MQ存储消息到内存或者硬盘

开启消息持久化机制

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息。要想做到消息持久化,必须满足以下三个条件,缺一不可。

  • Exchange 设置持久化
  • Queue 设置持久化
  • Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

消息补偿机制

  • 生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚。
  • 根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。

死信队列:如果队列满了,多余的消息发送到Broker时可以使用死信队列保证消息不会被丢弃 

3、第三阶段:消费者消费消息

开启消费端的手动ack

@Component
@Slf4j
public class SnailConsumer {
 
    @RabbitListener(queues = "snail_direct_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        // 获取消息Id
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        log.info("获取到的消息>>>>>>>{},消息id>>>>>>{}", msg, messageId);
        try {
            int result = 1 / 0;
            System.out.println("result" + result);
            // // 手动ack
            Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
            // 手动签收
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            //拒绝消费消息(丢失消息) 给死信队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

 四、kafka

1、第一阶段:消息发送到MQ

producer的ack机制

kafka的生产者确认机制有三种取值即三个模式:

acks = 0 :生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障)。
acks = 1 :leader会将记录写入其本地日志,但无需等待所有follwer服务器的完全确认即可做出回应,在这种情况下,当leader还没有将数据同步到Follwer宕机,存在丢失数据的可能性。
acks = -1:代表所有的所有的分区副本备份完成,不会丢失数据这是最强有力的保证。但是这种模式往往效率相对较低。

producer重试机制 

2、第二阶段:MQ存储消息到内存或者硬盘

kafka的broker使用副本机制保证数据的可靠性。每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。

3、第三阶段:消费者消费消息

手动ack

    /*
     *
     * @param message
     * @param ack
     * @手动提交ack
     * containerFactory  手动提交消息ack
     * errorHandler 消费端异常处理器
     * @return void
     */
    @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "test-topic", errorHandler = "consumerAwareListenerErrorHandler")
    public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
        for (int i=0;i<record.size();i++){
            System.out.println(record.get(i).value());
        }
        ack.acknowledge();//直接提交offset
    }

offset commit

消费者通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。kafka并不像其他消息队列,消费完消息之后,会将数据从队列中删除,而是维护了一个日志文件,通过时间和储存大小进行日志删除策略。如果offset没有提交,程序启动之后,会从上次消费的位置继续消费,有可能存在重复消费的情况。

Offset Reset 三种模式

earliest(最早):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。
latest(最新的):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。
none(没有):topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

消息中间件如何保证消息不丢失 的相关文章

  • 【C++】类与结构体的区别

    C 43 43 中结构体 xff08 struct xff09 我们知道C 43 43 中的 struct 对C中的 struct 进行了扩充 xff0c 它不再是只能用来封装不同类型数据的数据结构了 xff0c 而是拥有了更多的功能 xf
  • C++模板类成员函数最好和模板类声明一起放在同一个.h头文件里

    一个完整的C 43 43 程序应包括三部分 头文件 包含结构声明和使用这些结构的函数原型 头文件常包含的内容如下所示 xff1a 1 xff09 函数原型 2 xff09 使用 define或const定义的符号常量 3 xff09 结构声
  • 【VSCode】Visual Studio Code软件使用入门

    说明 这是一篇VS Code IDE软件使用入门文章 xff0c 工欲善其事 xff0c 必先利其器 xff0c 在使用新IDE之前 xff0c 不妨先对其进行一个全面了解 如果你做WPF等桌面端开发 xff0c 目前VSCode还没有好用
  • Elasticsearch 按字段进行分组 aggs 聚合 分组

    ES 按 userName 字段进行分组 统计 34 query 34 34 bool 34 34 must 34 34 range 34 34 operateTime 34 34 gt 34 34 2020 05 18 00 00 00
  • Hutool 操做excel 导出大数据 到excel

    1 hutool的版本 xff1a hutool all 4 5 15 2 POI 的版本 xff1a 3 17 lt dependency gt lt groupId gt org apache poi lt groupId gt lt
  • 使用vue-router携带不同参数多次push到一个页面时请求 不重新触发问题 ,只有第一次触发

    1 vue跳转 this router push path 39 user userDetils 39 query id JSON stringify val id name JSON stringify this searchData n
  • 惯性导航坐标系介绍

    常用坐标系定义 运载体中三维空间运动包含六个自由度 xff0c 既有角运动也有线运动 在地球表面附近 xff0c 运载体的角运动描述一般以当地水平面和地理北向为参考基准 xff1b 线运动的描述通常采用地理经度 纬度和高度表示 xff0c
  • 达梦 DM管理工具

    DM 管理工具是数据库自带的图形化工具 xff0c 可以方便快捷的对数据进行管理 在网络允许的条件下 xff0c 可通过单个管理工具 xff0c 对多个数据实例进行管理 xff0c 方便简化 DBA 对数据库的日常运维操作要求 打开DM管理
  • Windows 下 DM 的安装 和 数据库配置工具使用说明

    步骤 1 xff1a 运行安装程序 用户将 DM 安装光盘放入光驱中 xff0c 插入光盘后安装程序自动运行或直接双击 setup exe 安装程序后 xff0c 程序将检测当前计算机系统是否已经安装其他版本 DM 如 果存在其他版本 DM
  • window 下 达梦数据库的备份和还原

    DM 提供的各种工具进行备份还原与恢复的操作 xff0c 包括 DIsql工具 DMRMAN 工具 图形化客户端管理工具 MANAGER 和 CONSOLE DIsql 工具用于执 行联机的数据备份与数据还原 xff0c 包括数 归档备份据
  • Linux下与Windows的文件共享

    有三种方法 安装VMware Tools xff08 在虚拟机 gt 重新安装VMware Tools xff09 通过Winscp软件 xff08 前提Windows能ping通linux xff0c 和关防火墙 xff09 本文介绍 x
  • 关于大小端的经典问题

    源代码如下 xff1a span class hljs preprocessor include lt stdio h gt span span class hljs keyword int span main span class hlj
  • 关于#define宏的生命周期

    我们一起来看一段代码 xff1a span class hljs preprocessor include lt stdio h gt span span class hljs preprocessor define X 3 span sp
  • 关于char的溢出问题

    现在看下面的问题 span class hljs keyword int span main span class hljs keyword char span number 61 span class hljs number 129 sp
  • exit()和_exit()的区别

    exit c源代码 xff1a span class hljs preprocessor include span span class hljs preprocessor include span span class hljs keyw
  • ubuntu16.04下u盘的自动挂载(脚本)

    一般固定的u盘在 dev sdxx 的形式 先在 mnt下建一个usb目录用于挂载 1 在 etc udev rules d下创建10 usb rules文件 xff0c 内容如下 xff1a SUBSYSTEM 61 61 34 bloc
  • arp欺骗

    ARP工作的过程 原理及现象 ARP全称是地址解析协议 xff08 address resolution potocol xff09 xff0c 是在仅仅知道主机的IP地址时确定其物理的地址的一种协议 ARP协议的工作过程 场景 xff1a
  • LeetCode中stdout结果是正确的,输出没有

    没有返回值 xff0c 加一行return
  • gstreamer学习(一) gstreamer-rtsp-server环境安装

    gstreamer rtsp server环境安装 Linux环境下 两种方式 xff1a 第一种方式 xff0c 通过官网安装 xff08 如果是Linux环境 xff0c 可以直接通过软件包工具进行安装 xff09 xff0c 点击进入

随机推荐

  • 用C++打开指定网址

    用C 43 43 打开指定网址原理 system 命令 就像这样 xff1a span class token macro property span class token directive hash span span class t
  • 项目遇到的各种异常抛出及解决方法

    项目遇到的各种异常抛出及解决方法 xff1a 1 java lang NumberFormatException xff1a 类型格式异常 第一次遇到的异常抛出原因及解决方法 xff1a 项目运行没有问题 xff0c 各种接口能正常查询出数
  • 【STC8学习笔记】STC8A8K64S4A12精准延时函数设置

    在设置单片机精准的延时函数的时候 xff0c 给大家一个方法 xff0c STC ISP有一个延时函数计算器 xff0c 可以计算出想要的延时 我的例程也是基于这个软件生成的 xff0c 我生成一个1ms和1us出来 xff0c 剩下的我再
  • vc版本与vs版本对应关系

    vc版本与vs版本对应关系 最近在整理之前代码 xff0c 用cmake编译一直报错 xff0c 忘记了opencv3 1 0不支持vs2019 xff0c 所以在这里总结下vc版本与vs版本对应关系 VC版本号 VS对应版本 vc6 VC
  • cmake编译依赖opencv的c++库

    前面一篇主要讲了c 43 43 项目怎么在本地配置opencv过程 xff0c 这种方式缺点就是只能在开发着本地环境编译 xff0c 换台电脑就会出现环境配置问题 接下来主要讲解 xff0c 使用cmake编译 xff0c 生成一个依赖op
  • c++ stl 迭代器iterators(traits编程技法)

    文章目录 1 1 迭代器设计思维 stl关键所在1 2 迭代器是一种smart pointer1 3 迭代器相应型别 xff08 associated types xff09 1 4 traits编程技法 stl源代码门匙1 4 1 val
  • 如何用算法把一个十进制数转为十六进制数-C语言基础

    这一篇文章要探讨的是 如何用算法实现十进制转十六进制 并不涉及什么特别的知识点 属于C语言基础篇 在翻找素材的时候 xff0c 发现一篇以前写的挺有意思的代码 xff0c 这篇代码里面涉及的知识点没有什么好讲的 xff0c 也没有什么特别的
  • 关于 Qt使用QJsonObject解析失败的问题。

    1 问题 在QJsonObject转 toInt toLongLong 等类型时 xff0c 转换失败 但是转toString xff08 xff09 没有任何问题 列如 xff1a 解决方法 xff1a 这样 xff0c 就可以结局问题
  • char 和 string 的相互转换

    一个char字符转为string span class token keyword char span ch span class token operator 61 span span class token char 39 A 39 s
  • C++STL标准库学习总结/索引/学习建议

    前言 xff1a 如果刚刚开始学习STL标准库 xff0c 不知道从哪里入手学习的话 xff0c 建议去中国大学mooc平台 xff0c 先学习北京大学郭炜老师的 程序设计与算法 xff08 一 xff09 C语言程序设计 xff08 ht
  • Python 调用API接口方式,通过http.client调用api接口,远程调用flask接口方式

    一 创建接口 xff08 如果调用别人的接口 xff0c 跳过此条 xff09 如果没有api xff0c 首先自己写一个接口玩一下 xff1a 必备知识 xff1a 一个项目最基本的文件 xff0c 接口run py文件 config文件
  • git tag和branch的区别

    tag 和branch的区别 Git tag是一系列commit的中的一个点 xff0c 只能查看 xff0c 不能移动 branch是一系列串联的commit的线 git tag的用法 我们常常在代码封板时 使用git 创建一个tag 这
  • 结构体对齐计算(超详细讲解,一看就会)

    想要计算结构体大小 xff0c 咱就先要清楚结构体内存对齐的规则 xff1a 1 结构体的第一个成员直接对齐到相对于结构体变量起始位置为0处偏移 2 从第二个成员开始 xff0c 要对齐到某个 对齐数 的整数倍的偏移处 3 结构体的总大小
  • RTK差分编码

    一 概念 DCB xff08 Differential Code Bias 差分码偏差 xff09 是全球卫星导航系统 xff08 GNSS xff09 中 xff0c 通过不同信号得到的观测值之间存在的系统性偏差 DCB是由卫星和接收机硬
  • 详解JAVA的事件监听机制和观察者设计模式

    一 事件监听机制的三要素 事件源 事件监听器 xff0c 事件对象 监听器一般是JAVA接口 xff0c 用来约定可以执行的操作 二 事件监听机制简要说明 事件源注册一个或者多个事件监听器 xff0c 事件源对象状态发生变化或者被操作时 x
  • Nginx控制IP(段)的访问策略配置

    Nginx engine x 是一个高性能的HTTP和反向代理web服务器 xff0c 同时也提供了IMAP POP3 SMTP服务 有着负载均衡 动静分离等强大的功能 xff0c 而且还有众多三方插件来满足应用要求 这里重点介绍nginx
  • 敏捷开发-互联网时代的软件开发方式

    一 什么是敏捷开发 敏捷开发简单的描述为 xff1a 是一种应对需求快速变化的软件开发方式 敏捷开发的核心思想就是小步快跑 不断迭代 xff0c 在一次次的迭代升级中完成 小目标 最终完成那个 大目标 正因为敏捷开发的这种不断迭代升级的开发
  • Window系统查看端口是否启用以及占用程序

    1 打开DOS命令行窗口 开始 gt 运行 gt cmd xff0c 或者是 window 43 R gt cmd xff0c 调出命令窗口 2 查看当前正在使用的所有端口 命令 xff1a netstat ao 包括协议 xff0c 端口
  • ThreadLocal的深度解读

    一 J2SE的原始描述 This class provides thread local variables These variables differ from their normal counterparts in that eac
  • 消息中间件如何保证消息不丢失

    一 消息队列MQ的三个阶段 1 生产者发送消息到MQ 2 MQ存储消息到内存或者硬盘 3 消费者消费消息 由于网络的原因 服务器的原因 程序的原因等等 xff0c 在每个阶段都有可能引起消息的丢失 xff1a 1 生产者发送消息到MQ xf