RabbitMQ消息队列基础点

2023-11-19

为什么使用消息队列

1.解耦

看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…
在这里插入图片描述

在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。
在这里插入图片描述
总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。

思考:你需要去考虑一下你负责的系统中是否有类似的场景,就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。在简历中体现出来这块东西,用MQ 作解耦。

2.异步

再来看一个场景,A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3+ 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器
发起请求,等待个 1s,这几乎是不可接受的。

在这里插入图片描述
一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。如果 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了,爽!网站做得真好,真快!

在这里插入图片描述

3.削峰

每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请求涌入MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作,每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。
在这里插入图片描述
如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。
在这里插入图片描述
这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。

消息队列的缺点

缺点有以下几个:

  • 系统可用性降低
    系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,ABCD 四个系统还好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整?MQ 一挂,整套系统崩溃,你不就完了?
  • 系统复杂度提高
    硬生生加个 MQ 进来,你怎么怎么保证消息没有重新消费?怎么保证消息不丢失消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
  • 一致性问题
    A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个
    数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。

各种消息中间件的区别

在这里插入图片描述

消息丢失

生产者弄丢数据
在这里插入图片描述
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都
有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback ,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit 。
但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来 消耗性能

channel txSelect
e
channel txRollback
channel txCommit
// 开启事务
.
try {
// 这里发送消息
} catch (Exception ) {
.
// 这里再次重发这条消息
}
// 提交事务

所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产
者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入
了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果
RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可
以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还
没接收到这个消息的回调,那么你可以重发。事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会在那儿,但是 confirm 机制是 的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。
所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

RabbitMQ弄丢了数据
就是 RabbitMQ 自己弄丢了数据,这个你必须 RabbitMQ的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

设置持久化有两个步骤:

  • 创建 queue 的时候将其设置为持久化
    这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2
    就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个 queue 里的数据。注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。所以,持久化可以跟生产者那边的 confirm 机制配合来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack ,你也是可以自己重发的。

消费端弄丢数据
RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到还没处理完,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
在这里插入图片描述

如何保证消息的顺序性?

举个例子,我们以前做过一个 mysql binlog 同步的系统,压力还是非常大的,日同步数据
要达到上亿,就是说数据从一个 mysql 库原封不动地同步到另一个 mysql 库里面去(mysql ->
mysql)。常见的一点在于说比如大数据 team,就需要同步一个 mysql 库过来,对公司的业务
系统的数据做各种复杂的操作。
你在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog 日志,接着这三条 binlog
发送到 MQ 里面,再消费出来依次执行,起码得保证人家是按照顺序来的吧?不然本来是:增
加、修改、删除;你愣是换了顺序给执行成删除、修改、增加,不全错了么。
本来这个数据同步过来,应该最后这个数据被删除了;结果你搞错了这个顺序,最后这个数据
保留下来了,数据同步就出错了。

Rabbitmq:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,
顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从
MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后
是 data1/data3。这不明显乱了。
在这里插入图片描述
解决方案:
拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者
就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发
给底层不同的 worker 来处理。
在这里插入图片描述

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

大量消息在mq里积压了几个小时了还没解决

几千万条数据在MQ里积压了七八个小时,从下午4点多,积压到了晚上11点多。这个是我们
真实遇到过的一个场景,确实是线上故障了,这个时候要不然就是修复consumer的问题,让
它恢复消费速度,然后傻傻的等待几个小时消费完毕。这个肯定不能在面试的时候说吧。
一个消费者一秒是1000条,一秒3个消费者是3000条,一.分钟就是18万条。所以如果你积压
了几百万到上千万的数据,即使消费者恢复了,也需要大概1小时的时间才能恢复过来。
一般这个时候,只能临时紧急扩容了,具体操作步骤和思路如下:

  • 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉。
  • 新建一个topic, partition 是原来的10倍,临时建立好原先10倍的queue数量。
  • 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之
    后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的
    queue。
  • 接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数
    据。这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度
    来消费数据。
  • 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的consumer机器来消
    费消息。

mq中的消息过期失效了

假设你用的是RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是TTL。如果消息在queue
中积压超过–定的时间就会被RabbitMQ给清理掉,这个数据就没了。那这就是第二个坑了。
这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
这个情况下,就不是说要增加consumer消费积压的消息,因为实际上没啥积压,而是丢了大
量的消息。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。
就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家- -起喝
咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,
写个临时程序,- -点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。
也只能是这样了。
假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那
1000个订单给查出来,手动发到mq里去再补一-次。

mq都快写满了

如果消息积压在mq里,你很长时间都没有处理掉,此时导致mq都快写满了,咋办?这个还
有别的办法吗?没有,谁让你第一一个方 案执行的太慢了,你临时写程序,接入数据来消费,消
费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数
据吧。

如何保证消息不被重复消费?

正常情况下,消费者在消费消息后,会给消息队列发送一个确认,消息队列接收后就知道消息已经被成功消费了,然后就从队列中删除该消息,也就不会将该消息再发送给其他消费者了。不同消息队列发出的确认消息形式不同,RabbitMQ是通过发送一个ACK确认消息。但是因为网络故障,消费者发出的确认并没有传到消息队列,导致消息队列不知道该消息已经被消费,然后就再次消息发送给了其他消费者,从而造成重复消费的情况。

重复消费问题的解决思路是:保证消息的唯一性,即使多次传输,也不让消息的多次消费带来影响,也就是保证消息等幂性;幂等性指一个操作执行任意多次所产生的影响均与一次执行的影响相同。具体解决方案如下:

(1)改造业务逻辑,使得在重复消费时也不影响最终的结果。例如对SQL语句: update t1 set money = 150 where id = 1 and money = 100; 做了个前置条件判断,即 money = 100 的情况下才会做更新,更通用的是做个 version 即版本号控制,对比消息中的版本号和数据库中的版本号。

(2)基于数据库的的唯一主键进行约束。消费完消息之后,到数据库中做一个 insert 操作,如果出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

(3)通过记录关键的key,当重复消息过来时,先判断下这个key是否已经被处理过了,如果没处理再进行下一步。

① 通过数据库:比如处理订单时,记录订单ID,在消费前,去数据库中进行查询该记录是否存在,如果存在则直接返回。
② 使用全局唯一ID,再配合第三组主键做消费记录,比如使用 redis 的 set 结构,生产者发送消息时给消息分配一个全局ID,在每次消费者开始消费前,先去redis中查询有没有消费记录,如果消费过则不进行处理,如果没消费过,则进行处理,消费完之后,就将这个ID以k-v的形式存入redis中(过期时间根据具体情况设置)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ消息队列基础点 的相关文章

随机推荐

  • 仿网易云音乐小程序

    一 程序描述 仿网易云音乐小程序 调用网易云API真实接口 实现了音乐播放 音乐切换 歌词显示 搜索音乐 视频播放 用户登录退出等功能 以下是效果展示 仿网易云小程序 二 代码介绍 1 目录介绍 2 搜索 代码讲解 实现思路 先把搜索的静态
  • 台式电脑切换集成显卡和独立显卡

    台式电脑切换集成显卡和独立显卡 1 背景 2 认识 3 步骤 3 1 确认是否有两个显卡以及当前显示器连接显卡 3 2 更新驱动 3 3 确认显示器连接接口 3 4 显卡设置 4 总结 1 背景 发现电脑在处理画面时有非常卡顿 想着电脑装的
  • html中图片热区链接怎么设置,html图像热区链接做好之后怎么上传到网页上让别人打开?...

    html图像热区链接做好之后怎么上传到网页上让别人打开 以下文字资料是由 历史新知网www lishixinzhi com 小编为大家搜集整理后发布的内容 让我们赶快一起来看一下吧 图像热区链接做好之后怎么上传到网页上让别人打开 做的是静态
  • 使用session实现同一账号只能同时一个人使用

    使用session实现同一账号只能同时一个人使用 今天我们要讲的就是 实现同一个账号只能同一时间让一个人使用 实现起来也是非常的简单 其实我这里讲到的是我前几天做出来的一个大概核心代码和核心思路 我也是查遍了很多网站 看了很多人的源码然后都
  • QT绘图之自动缩放画线和点

    需求 用 作为画板 把纸条缩放到 上 纸条长度不定 宽度固定 纸条上任意位置画点或者线 点距或者线距不固定 点数和线数也不固定 要成比例映射到 上直观显示 话不多说 上代码 ool sprayer Widget eventFilter QO
  • 导入Unity 2D Animation插件没有生效

    导入Unity 2D Animation后 打开Sprite Editor发现没有Skinning Editor选项 这可能是因为导入插件后与项目原有的Plugin下的库产生冲突导致的 这时候点击Packages Editor Unity
  • 线程和线程池

    线程和线程池 并发和并行 并发是指在单个CPU核心上 多个线程占用不同的CPU时间片 但由于每个线程占用的CPU时间片非常短 比如10ms 看起来就像是多个线程在共同执行一样 但在微观物理上看其实还是串行执行的 这样的场景称作并发 并行是指
  • QQ小程序服务器内部错误,网络

    网络 使用说明 在小程序 小游戏中使用网络相关的 API 时 需要注意下列问题 请开发者提前了解 1 服务器域名配置 每个QQ小程序需要事先设置一个通讯域名 小程序只可以跟指定的域名与进行网络通信 包括普通 HTTPS 请求 qq requ
  • 工作队列(workqueue)

    转载于 http blog csdn net angle birds article details 8448070 项目需要 在驱动模块里用内核计时器timer list实现了一个状态机 郁闷的是 运行时总报错 Scheduling wh
  • 仿鸟云IDC模板 最新修复创梦虚拟主机管理系统+主控模板+鸟云模板源码

    介绍 最近仿鸟云的 IDC 模板 好像很火 很早以前做了一个和鸟云早期的模板 但后来服务器没备份 数据丢失了 此为近期找到的一份新的鸟云源码 如果大家有需求 可以进行长期更新维护此套模板 做到更精 在 Global asa 文件的最后一行之
  • SpringBoot配置多个Redis数据源

    一 添加依赖
  • type traits浅析

    1 G2 9的type trait G2 9的type trait用一个泛化类模板来定义各种类型的属性 默认都是假的 然后给每一种类型都添加了偏特化版本 这样的缺点是每写一个类 都要添加一个偏特化的模板类 十分麻烦 2 C 改进的type
  • 渗透:vulnhub DC系列之DC1

    DC系列之DC1 一 靶机配置及说明 下载地址 https www vulnhub com entry dc 1 292 靶机 DC1 VMware IP 192 168 49 152 攻击机 kali2018 IP 192 168 49
  • 姿态估计与人体动作识别的多任务深度学习模型

    最近在做一个人体康复训练的项目 一开始考虑到人体康复训练需要肢体的细微动作 所以先使用人体姿态估计识算法提取骨骼点 再根据人体骨骼点来识别动作 后来发现也不一定这样 并组合成一个端对端的模型 正好找到了最近的一篇论文 2D 3D Pose
  • 使用QFile进行文件操作(QFile可以使用FILE *指针,还必须指定AutoCloseHandle)

    QFile类我我们提供了操作文件的常用功能 它是一种io设备 可以用来读写文本文件和二进制文件 也可以用来读写Qt的资源文件 QFile类可以单独使用 该类本身提供了read write函数 但更方便的方式是 将QFile和QTextStr
  • 毕业设计 2023-2024年最新软件工程专业毕设选题题目推荐汇总

    文章目录 0 简介 1 如何选题 2 最新软件工程毕设选题 0 简介 学长搜集分享最新的软件工程业专业毕设选题 难度适中 适合作为毕业设计 大家参考 学长整理的题目标准 相对容易 工作量达标 题目新颖 1 如何选题 最近非常多的学弟学妹问学
  • dell笔记本怎么开启虚拟化_dell台式电脑bios设置u盘启动的三种操作教程

    最近有小编发现有很多的网友朋友们并不知道dell怎么进入bios 为什么这么说呢 因为每个品牌怎么进入bios是不一样的 现在戴尔用户越来越多 不知道dell怎么进入bios的用户 接下来小编教你dell台式电脑bios设置u盘启动的三种操
  • uni-table多选获取当前行数据

    废话不多说 直接上代码
  • connect()函数

    connect 用于建立与指定socket的连接 头文件 include
  • RabbitMQ消息队列基础点

    消息队列 为什么使用消息队列 1 解耦 2 异步 3 削峰 消息队列的缺点 各种消息中间件的区别 消息丢失 如何保证消息的顺序性 如何解决消息队列的延时以及过期失效问题 消息队列满了以后该怎么处理 有几百万消息持续积压几小时 说说怎么解决