RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)

2023-11-20

目录

一.RabbitMQ消息丢失的三种情况

二.RabbitMQ消息丢失解决方案

1.针对生产者

方案1 :开启RabbitMQ事务

方案2: 使用confirm机制 

2.针对RabbitMQ

(1)消息持久化

(2)设置集群镜像模式

(3)消息补偿机制

3.针对消费者

方案一:ACK确认机制

三.总结

一.RabbitMQ消息丢失的三种情况

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了
第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。

二.RabbitMQ消息丢失解决方案

1.针对生产者

方案1 :开启RabbitMQ事务

可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。

// 开启事务
channel.txSelect
try {
      // 这里发送消息
} catch (Exception e) {
      channel.txRollback

// 这里再次重发这条消息

}

// 提交事务
channel.txCommit

缺点:
RabbitMQ 事务机制是同步的,你提交一个事务之后会阻塞在那儿,采用这种方式基本上吞吐量会下来,因为太耗性能。

方案2: 使用confirm机制 

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的

在生产者开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

    //开启confirm
    channel.confirm();
    //发送成功回调
    public void ack(String messageId){
      
    }

    // 发送失败回调
    public void nack(String messageId){
        //重发该消息
    }

2.针对RabbitMQ

说三点:

(1)要保证rabbitMQ不丢失消息,那么就需要开启rabbitMQ的持久化机制,即把消息持久化到硬盘上,这样即使rabbitMQ挂掉在重启后仍然可以从硬盘读取消息;

(2)如果rabbitMQ单点故障怎么办,这种情况倒不会造成消息丢失,这里就要提到rabbitMQ的3种安装模式,单机模式、普通集群模式、镜像集群模式,这里要保证rabbitMQ的高可用就要配合HAPROXY做镜像集群模式

(3)如果硬盘坏掉怎么保证消息不丢失

(1)消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

1) Exchange 设置持久化

2)Queue 设置持久化

3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

(2)设置集群镜像模式

我们先来介绍下RabbitMQ三种部署模式:

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
2)普通模式:消息只会存在与当前节点中,并不会同步到其他节点,当前节点宕机,有影响的业务会瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
3)镜像模式:消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。属于RabbitMQ的HA方案

为什么设置镜像模式集群,因为队列的内容仅仅存在某一个节点上面,不会存在所有节点上面,所有节点仅仅存放消息结构和元数据。下面自己画了一张图介绍普通集群丢失消息情况:

如果想解决上面途中问题,保证消息不丢失,需要采用HA 镜像模式队列。

下面介绍下三种HA策略模式:

1)同步至所有的
2)同步最多N个机器
3)只同步至符合指定名称的nodes

命令处理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

1)为每个以“rock.wechat”开头的队列设置所有节点的镜像,并且设置为自动同步模式
rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

2)为每个以“rock.wechat.”开头的队列设置两个节点的镜像,并且设置为自动同步模式
rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat" \
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

3)为每个以“node.”开头的队列分配指定的节点做镜像
rabbitmqctl set_policy ha-nodes "^nodes\." \
'{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 镜像队列有一个很大的缺点就是:   系统的吞吐量会有所下降

(3)消息补偿机制

为什么还要消息补偿机制呢?难道消息还会丢失,没错,系统是在一个复杂的环境,不要想的太简单了,虽然以上的三种方案,基本可以保证消息的高可用不丢失的问题,

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

1)生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚。

2)根据消息表中消息状态,失败则进行消息补偿措施,重新发送消息处理。

3.针对消费者

方案一:ACK确认机制

多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?

使用rabbitmq提供的ack机制,服务端首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。才把消息从内存删除。

这样就解决了,即使一个消费者出了问题,但不会同步消息给服务端,会有其他的消费端去消费,保证了消息不丢的case。 

三.总结

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

在实际开发中,需要考虑消息丢失的影响程度,来做出对可靠性以及性能之间的权衡。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂) 的相关文章

  • Android:如何暂停和恢复可运行线程?

    我正在使用 postDelayed 可运行线程 当我按下按钮时 我需要暂停并恢复该线程 请任何人帮助我 这是我的主题 protected void animation music6 music4 postDelayed new Runnab
  • “源兼容性”和“目标兼容性”有什么区别?

    之间有什么关系 区别sourceCompatibility and targetCompatibility 当它们设置为不同的值时会发生什么 根据工具链和兼容性 https docs gradle org current userguide
  • java程序有多少种结束方式?

    我知道使用 System exit 0 可以结束一个java程序 例如 如果我有一个JFrame窗口 它会关闭并结束程序 但我想知道还有多少其他方法 可以关闭它并结束程序 包括发生错误时 程序会被关闭 JFrame也会被关闭吗 添加到其他答
  • 使用 proguard 混淆文件名

    我正在使用 proguard 和 Android Studio 混淆我的 apk 当我反编译我的apk时 我可以看到很多文件 例如aaa java aab java ETC 但我项目中的所有文件都有原始名称 有没有办法混淆我的项目的文件名
  • Java LostFocus 和 InputVerifier,按反向制表符顺序移动

    我有一个 GUI 应用程序 它使用 InputVerifier 在产生焦点之前检查文本字段的内容 这都是很正常的 然而 昨天发现了一个问题 这似乎是一个错误 但我在任何地方都找不到任何提及它的地方 在我将其报告为错误之前 我想我应该问 我在
  • 我们可以有条件地声明 spring bean 吗?

    有没有一种方法可以有条件地声明 Spring bean 例如
  • 什么是内部类的合成反向引用

    我正在寻找应用程序中的内存泄漏 我正在使用的探查器告诉我寻找这些类型的引用 但我不知道我在寻找什么 有人可以解释一下吗 Thanks Elliott 您可以对 OUTER 类进行合成反向引用 但不能对内部类实例进行合成 e g class
  • 无法使用 datastax java 驱动程序通过 UDT 密钥从 cassandra 检索

    我正在尝试使用用户定义的类型作为分区键将对象存储在 cassandra 中 我正在使用 datastax java 驱动程序进行对象映射 虽然我能够插入到数据库中 但无法检索该对象 如果我更改分区键以使用非 udt 例如文本 我就能够保存和
  • Java中Gson、JsonElement、String比较

    好吧 我想知道这可能非常简单和愚蠢 但在与这种情况作斗争一段时间后 我不知道发生了什么 我正在使用 Gson 来处理一些 JSON 元素 在我的代码中的某个位置 我将 JsonObject 的 JsonElements 之一作为字符串获取
  • Java AES 256 加密

    我有下面的 java 代码来加密使用 64 个字符密钥的字符串 我的问题是这会是 AES 256 加密吗 String keyString C0BAE23DF8B51807B3E17D21925FADF273A70181E1D81B8EDE
  • 在 Spring 中为 @Pathvariable 添加类级别验证

    在发布这个问题之前 我已经做了很多研究并尝试了很多可用的解决方案 这是我陷入的棘手情况 我有一个 Spring 控制器 它有多个请求映射 它们都有 PathVariables 控制器如下所示 Controller EnableWebMvc
  • 2^31 次方的 Java 指数错误 [重复]

    这个问题在这里已经有答案了 我正在编写一个java程序来输出2的指数幂 顺便说一句 我不能使用Math pow 但是在 2 31 和 2 32 处我得到了其他东西 另外 我不打算接受负整数 My code class PrintPowers
  • 使用 JDBC 连接到 PostgreSql 的本地实例

    我在 Linux 机器上有一个正在运行的 PostgreSql 本地实例 当我使用psql来自 shell 的命令我成功登录 没有任何问题 我需要通过 JDBC 连接到 PostgreSql 但我不知道我到底应该传递什么url参数为Driv
  • 类更改(例如字段添加或删除)是否保持 Serialized 的向后兼容性?

    我有一个关于 Java 序列化的问题 在这种情况下 您可能需要修改可序列化类并保持向后兼容性 我有丰富的 C 经验 所以请允许我将 Java 与 NET 进行比较 在我的Java场景中 我需要使用Java的运行时序列化机制序列化一个对象 并
  • 从三点求圆心的算法是什么?

    我在圆的圆周上有三个点 pt A A x A y pt B B x B y pt C C x C y 如何计算圆心 在Processing Java 中实现它 我找到了答案并实施了一个可行的解决方案 pt circleCenter pt A
  • Azure Java SDK:ServiceException:ForbiddenError:

    尝试了基本位置检索器代码 如下所示 String uri https management core windows net String subscriptionId XXXXXXXX 5fad XXXXXX 9dfa XXXXXX St
  • Spring Data Rest 多对多 POST

    首先 让我解释一下我的用例 这非常简单 有一个用户实体和一个服务实体 我使用 UserService 作为连接实体 连接表 在用户和服务之间建立多对多关联最初 会有一些用户集和一些服务集 用户可以在任何时间点订阅任何服务 在这种情况下 将向
  • 确定 JavaFX 中是否消耗了事件

    我正在尝试使用 JavaFX 中的事件处理来做一些非滑雪道的事情 我需要能够确定手动触发事件后是否已消耗该事件 在以下示例中 正确接收了合成鼠标事件 但调用 Consumer 不会更新该事件 我对此进行了调试 发现 JavaFX 实际上创建
  • Java 中清除嵌套 Map 的好方法

    public class MyCache AbstractMap
  • 如何让 Firebase 与 Java 后端配合使用

    首先 如果这个问题过于抽象或不适合本网站 我想表示歉意 我真的不知道还能去哪里问 目前我已经在 iOS 和 Android 上开发了应用程序 他们将所有状态保存在 Firebase 中 因此所有内容都会立即保存到 Firebase 实时数据

随机推荐

  • 【Shell牛客刷题系列】SHELL2 打印文件的最后5行:优雅的解决方案~

    该系列是基于牛客Shell题库 针对具体题目进行查漏补缺 学习相应的命令 刷题链接 牛客题霸 Shell篇 该系列文章都放到专栏下 专栏链接为 专栏 Linux 欢迎关注专栏 本文知识预告 首先学习用于查看文件尾部内容的tail命令 然后给
  • iPhone: There is no SDK with the name or path iphoneos XXX

    for ever 2010 10 25 环境 MAC OS 10 6 4 老的iPhone 项目 使用最新的 XCode 3 2进行编译 报错 iPhone There is no SDK with the name or path iph
  • vue中组件的划分(重点)

    vue中组件的划分 重点 组件的职能划分 如果要将 Vue 组件按照职能划分 我们可以将其分为两种类型 容器组件和展示组件 容器组件和展示组件的概念来自于 Redux 文档 那么首先什么是容器组件呢 顾名思义 它是一个容器性质的组件 我们可
  • 如何在ubuntu安装powershell

    在Ubuntu上安装PowerShell可以通过以下步骤来完成 1 安装依赖软件 可以使用以下命令安装所需的依赖软件 sudo apt get install curl libunwind8 gettext apt transport ht
  • 使用postman怎么都访问不到项目,也不报错

    使用postman怎么都访问不到项目 也不报错 可能问题1 本地路径没有对应你访问的访问路径 在你的C盘下面的C Windows System32 drivers etc这个路径下找到host文件 你127 0 0 1 对应的是什么路径 你
  • 设计模式(不懂)

    面试中经常问到设计模式 我才对这个东西了解了一下 才发现他是没有开发的新大陆 是oo设计的更高级别 能把设计模式搞懂 那oo你就搞的差不多了 随便看了还是很有意思的 虽然不怎么懂 百科名片 相关书籍 设计模式 Design pattern
  • 在 spawn 的子进程中保持命令行颜色

    本文首发于我的博客 转载请注明出处 https kohpoll github io blo 最近在用子进程运行 gulpfile js 的时候发现终端上的颜色全部没有了 很是奇怪 经过一些研究 最终解决了问题 同时也总结了一些相关知识 希望
  • 锋利的jQuery(五)--jQuery对表单、表格的操作及更多应用

    5 1表单应用 一个表单有3个基本组成部分 1 表单标签 包含处理表单数据所用的服务器端程序URL以及数据提交到服务器的方法 2 表单域 包含文本框 密码框 隐藏域 多行文本框 复选框 单选框 下拉选择框和文件上传框等 3 表单按钮 包括提
  • 吉林大学超星MOOC学习通高级语言程序设计 C++ 实验02 分支与循环程序设计(2021级)(2)

    5 爱因斯坦阶梯 题目编号 Exp02 Basic10 GJBook3 04 15 题目名称 爱因斯坦阶梯 问题描述 设有阶梯 不知其数 但知 每步跨2阶 最后剩1阶 每步跨3阶 最后剩2阶 每步跨5阶 最后剩4阶 每步跨7阶 正好到楼顶
  • python文件运行路径设置,python获取程序执行文件路径的方法(推荐)

    1 获取当前执行主脚本方法 sys argv 0 和 file 1 sys argv 一个传给python脚本的指令参数列表 sys argv 0 是脚本的名字 一般得到的是相对路径 用os path abspath sys argv 0
  • Dubbo 默认线程池fixed

    SPI fixed public interface ThreadPool 线程池 param url 线程参数 return 线程池 Adaptive Constants THREADPOOL KEY Executor getExecut
  • linux共享内存面试题,linux系统工程师面试题(附答案)

    1 查看Linux系统当前单个共享内存段的最大值 命令 ipcs m ipcs a 2 用什么命令查询指定IP地址的服务器端口 题意应该是 nmap 和nbtscan 命令来扫吧 3 crontab中用什么命令定义某个程序执行的优先级别 n
  • Gurobi:使用Java+Gurobi建立一个小数学模型

    Gurobi 使用Java Gurobi建立一个小数学模型 按变量进行建模 按列进行建模 模型的求解结果 现在基本上都流行python gurobi java cplex进行建模 但是由于java相较于python还是具有显著的速度优势 于
  • 后疫情时代企业云原生成本优化指南

    在本篇文章的末为还有福利 在等着大家哦 前言 近年来 公有云 混合云等技术在全球迅速发展 云的普及度越来越高 Docker Kubernetes DevOps Service Mesh等云原生技术蓬勃发展 但在 上云 之后 企业却往往发现
  • 《数据库系统内幕》笔记 —— LSM树与OceanBase

    本文为 数据库系统内幕 第7章的笔记与心得 因为看到OceanBase底层也使用LSM树的实现作为存储引擎 因此特地记下笔记 详见OceanBase文档 https www oceanbase com docs community obse
  • Opencv学习笔记(三)线性及非线性滤波

    大纲 1 滤波综述 2 方框滤波 3 均值滤波 4 高斯滤波 5 中值滤波 6 双边滤波 一 滤波综述 图像的滤波指的是在尽量保证图像细节特征的的情况下对图像中的噪声进行抑制 又因为图像的能量大部分集中在低频或者中频的区域 图像大部分区域是
  • Scrum

    产品列表梳理会 Backlog Refinement Meeting 会议目的 Refinement 这个词是加工 提炼的意思 在scrum里 其实就是对下阶段的需求做一个讨论 澄清 细化的一个活动 希望通过这个活动 使得团队能对后续阶段的
  • 默认构造函数、拷贝构造函数、析构函数、赋值构造函数

    最近老是有人问我拷贝构造函数和赋值构造函数 说实话 我会用 但这个概念还真是搞不太清楚 真烦 概念问题少问我 学习笔记 1 析构函数 每个类只有一个析构函数 2 构造函数 每个类可以有多个构造函数 包括 默认构造函数 拷贝构造函数 赋值构造
  • Redis 7.0 核心技术、实战应用、面试题

    Redis 7 0 核心技术与实战应用 Redis 入门概述 01 Redis 是什么 Redis REmote Dictionary Server 远程字典服务器 官网介绍 https redis io docs about 官网定义 R
  • RabbitMQ消息丢失的场景,如何保证消息不丢失?(详细讲解,一文看懂)

    目录 一 RabbitMQ消息丢失的三种情况 二 RabbitMQ消息丢失解决方案 1 针对生产者 方案1 开启RabbitMQ事务 方案2 使用confirm机制 2 针对RabbitMQ 1 消息持久化 2 设置集群镜像模式 3 消息补