《消息队列高手课》 消息积压了该如何处理?

2023-10-26

据我了解,在使用消息队列遇到的问题中,消息积压这个问题,应该是最常遇到的问题了,并且,这个问题还不太好解决。

我们都知道,消息积压的直接原因,一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压。

所以,我们先来分析下,在使用消息队列时,如何来优化代码的性能,避免出现消息积压。然后再来看看,如果你的线上系统出现了消息积压,该如何进行紧急处理,最大程度地避免消息积压对业务的影响。

优化性能来避免消息积压

在使用消息队列的系统中,对于性能的优化,主要体现在生产者和消费者这一收一发两部分的业务逻辑中。对于消息队列本身的性能,你作为使用者,不需要太关注。为什么这么说呢?

主要原因是,对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,还可以通过水平扩展 Broker 的实例数成倍地提升处理能力。

而一般的业务系统需要处理的业务逻辑远比消息队列要复杂,单个节点每秒钟可以处理几百到几千次请求,已经可以算是性能非常好的了。所以,对于消息队列的性能优化,我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。

1. 发送端性能优化

发送端业务代码的处理性能,实际上和消息队列的关系不大,因为一般发送端都是先执行自己的业务逻辑,最后再发送消息。如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。

对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。为什么这么说呢?

我们之前的课程中讲过 Producer 发送消息的过程,Producer 发消息给 Broker,Broker 收到消息后返回确认响应,这是一次完整的交互。假设这一次交互的平均时延是 1ms,我们把这 1ms 的时间分解开,它包括了下面这些步骤的耗时:

  • 发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;
  • 发送消息和返回响应在网络传输中的耗时;
  • Broker 处理消息的时延。

如果是单线程发送,每次只发送 1 条消息,那么每秒只能发送 1000ms / 1ms * 1 条 /ms = 1000 条 消息,这种情况下并不能发挥出消息队列的全部实力。

无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。至于到底是选择批量发送还是增加并发,主要取决于发送端程序的业务性质。简单来说,只要能够满足你的性能要求,怎么实现方便就怎么实现。

比如说,你的消息发送端是一个微服务,主要接受 RPC 请求处理在线业务。很自然的,微服务在处理每次请求的时候,就在当前线程直接发送消息就可以了,因为所有 RPC 框架都是多线程支持多并发的,自然也就实现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送必然会影响 RPC 服务的时延。这种情况,比较明智的方式就是通过并发来提升发送性能。

如果你的系统是一个离线分析系统,离线系统在性能上的需求是什么呢?它不关心时延,更注重整个系统的吞吐量。发送端的数据都是来自于数据库,这种情况就更适合批量发送,你可以批量从数据库读取数据,然后批量来发送消息,同样用少量的并发就可以获得非常高的吞吐量。

2. 消费端性能优化

使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。如果这种性能倒挂的问题只是暂时的,那问题不大,只要消费端的性能恢复之后,超过发送端的性能,那积压的消息是可以逐渐被消化掉的。

要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障。

所以,我们在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。

消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,**在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。**如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。

我见到过很多消费程序,他们是这样来解决消费慢的问题的:

它收消息处理的业务逻辑可能比较慢,也很难再优化了,为了避免消息积压,在收到消息的 OnMessage 方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。然后它可以启动很多的业务线程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单个 Consumer 不能并行消费的问题。

这个方法是不是很完美地实现了并发消费?请注意,这是一个非常常见的错误方法! 为什么错误?因为会丢消息。如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失。关于“消息丢失”问题,你可以回顾一下我们的专栏文章《05 | 如何确保消息不会丢失?》。

消息积压了该如何处理?

还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一个时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要你在短时间内找到消息积压的原因,迅速解决问题才不至于影响业务。

导致突然积压的原因肯定是多种多样的,不同的系统、不同的情况有不同的原因,不能一概而论。但是,我们排查消息积压原因,是有一些相对固定而且比较有效的方法的。

能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了。

大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。

如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

还有一种不太常见的情况,你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度。

如果监控到消费变慢了,你需要检查你的消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下你的消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了。

小结

这节课我们主要讨论了 2 个问题,一个是如何在消息队列的收发两端优化系统性能,提前预防消息积压。另外一个问题是,当系统发生消息积压了之后,该如何处理。

优化消息收发性能,预防消息积压的方法有两种,增加批量或者是增加并发,在发送端这两种方法都可以使用,在消费端需要注意的是,增加并发需要同步扩容分区数量,否则是起不到效果的。

对于系统发生消息积压的情况,需要先解决积压,再分析原因,毕竟保证系统的可用性是首先要解决的问题。快速解决积压的方法就是通过水平扩容增加 Consumer 的实例数量。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

《消息队列高手课》 消息积压了该如何处理? 的相关文章

  • 从字符串编译Java源代码? [复制]

    这个问题在这里已经有答案了 有没有办法让正在运行的Java程序编译Java源代码 作为字符串传递 Class newClass Compiler compile class ABC void xyz etc etc 理想情况下 传入源代码引
  • 如何在 spring 中将模型作为重定向属性传递

    redirectModel addAttribute Model model return REDIRECT PREFIX my company organization management manage users 当我通过这个时 我得
  • 如何阻止 Apache CXF 发送响应消息?

    如果给定的 SOAP 标头元素具有给定值 例如 如果标头标记 response 的值为 0 我根本不希望 Apache CXF 返回响应 我怎样才能做到这一点 CXF 似乎假设所有呼叫都会收到响应 我知道这在 Web 服务上下文中看起来很奇
  • Jackson @JsonRawValue 获取地图的值

    我有以下 Java bean 类 使用 Jackson 将其转换为 JSON public class Thing public String name JsonRawValue public Map content new HashMap
  • 将密钥添加到选定的密钥集中

    我正在编写一个 NIO 服务器 并希望响应用户请求 即将一些数据写入通道 Selector selector if selector selectNow 0 if key isReadable SocketChannel channel k
  • Wicket setResponsePage() 方法如何工作?

    在学习 JSP 和 servlet 时 我听说了重定向和调度 他们中的哪一个做 Wicket 的setResponsePage 履行 What setResponsePage确实取决于几个因素 您调用 setResponsePage 的次数
  • 如何将 JMX 绑定到特定接口?

    我目前正在启动我的 Java VMcom sun management jmxremote 属性 以便我可以通过 JConsole 连接到它进行管理和监控 不幸的是 它监听机器上的所有接口 IP 地址 在我们的环境中 经常会出现多个 Jav
  • 在 IntelliJ IDEA 中哪里添加像 -ea 这样的编译器选项?

    我想添加 ea选项 我把它设置在Project Settings gt Compiler gt Java Compiler Additional command line parameters 但它导致了 make 错误 invalid f
  • Dao 和服务接口的需求

    我是Spring Mvc的新手 在很多教程中 我发现有一个像这样的Dao接口 public interface StudentDAO public List
  • JUnit 5 中的参数化 beforeEach/beforeAll

    我想为一个小型的类似数据库的应用程序编写一个测试 此应用程序使用查询 查询应返回正确的结果 这在 JUnit 5 中很容易实现 比如 BeforeEach void before database prepareDatabase Test
  • Java中使用流的byte[]到byte[]的ArrayList

    我有一个 byte 的 ArrayList 我想知道是否可以使用 Java 8 中的流将其转换为 byte ArrayList 内的所有数组都具有相同的大小 ArrayList
  • 编写无 BOM 的 UTF-8

    这段代码 OutputStream out new FileOutputStream new File C file test txt out write A getBytes 和这个 OutputStream out new FileOu
  • 面向 Clojure 用户的 Java

    我一直在断断续续地使用 Lisp 并且正在赶上 clojure clojure的好处是我可以自然地使用所有的java函数 而clojure的坏处也是我必须自然地了解java函数 例如 我不得不花一些时间 谷歌搜索 来查找 Java 中的平方
  • Java 泛型和数字

    为了看看我是否可以清理一些数学代码 主要是矩阵代码 我尝试使用一些 Java 泛型 我有以下方法 private
  • 在idea ide中出现钻石运算符的编译错误

    我在尝试在idea ide中编译一些简单的源代码时收到此错误 java diamond operator is not supported in source 1 6 use source 7 or higher to enable dia
  • JSP:已为此响应调用 getOutputStream()

    我正在使用此代码从 FTP 下载文件 我在 tomcat 日志中遇到异常 如下所示 我能够执行我的任务 但我的日志大小增加了很多 Code if file exists if file canRead IE6 SSL PDF Bug htt
  • 了解 Collection.isEmpty() 和 Collection.size() == 0 之间的区别? [复制]

    这个问题在这里已经有答案了 我读过很多关于两者之间差异的文章isEmpty and size gt 0 用于检查collection是否为空并发现isEmpty 表现超过size 但我无法轻易理解为什么性能isEmpty 即使 isEmpt
  • 逐列读取 CSV 文件

    我想从多列 csv 文件中读取特定列 并使用 Java 在其他 csv 文件中打印这些列 有什么帮助吗 以下是我逐行打印每个标记的代码 但我希望只打印多列 csv 中的几列 import java io BufferedReader imp
  • Eclipse 中的预构建事件

    我有一个使用 jaxb 进行一些 xml 处理的项目 如何在 eclipse 中设置预构建事件以在构建项目之前执行 xjc 转到项目 gt 属性 gt 构建器 创建您自己的构建器并启用它 并在构建器的配置中启用 自动构建期间 等 如下所示
  • List 和 List 之间的区别[重复]

    这个问题在这里已经有答案了 我读过很多这方面的内容 我知道 List listOfObject new ArrayList

随机推荐

  • 命令行参数设计

    1 目的 众多通用的小功能 制作为一个小工具 然后通过命令行来进行交互 使用非常的简便 本规范是为了统一命令行参数的设计 使得大家在制作或使用命令行工具时 能够更加有共享 进行会更加方便 2 适用范围 所有命令行工具参数的设计 3 基本原则
  • #SATA# SATA 实际管脚接线图

    前言 概述 实际接线管脚图 PATA 接口 M 2 U 2 AHCI NVMe 概述 SATA是Serial ATA的缩写 即串行ATA 它是一种电脑总线 主要功能是用作主板和大量存储设备 如硬盘及光盘驱动器 之间的数据传输 这是一种完全不
  • 迁移学习:他山之石,可以攻玉【VALSE Webinar】Panel实录

    编者按 迁移学习是机器学习与计算机视觉中的重要研究问题之一 旨在研究如何将一个领域的知识迁移到另外的领域 具有重要的研究意义与应用价值 但迁移学习又会存在哪些局限性 在实际应用中的价值是什么 未来的发展方向在哪里 为此 VALSE Webi
  • docker 数据持久化

    文章目录 定制镜像持久化 需求 实现 数据卷持久化 数据卷简介 数据卷的特性 创建读写数据卷 停止容器后的操作 查看数据卷详情 创建只写数据卷 查看数据卷详情 创建共享数据卷 Dockerfile持久化 创建Dockerfile 构建和运行
  • 大二上学期数据结构课程设计

    1 报数问题 问题描述 有n个小朋友围成一圈玩游戏 小朋友从1至n编号 2号小朋友坐在1号小朋友的顺时针方向 3号小朋友坐在2号小朋友的顺时针方向 1号小朋友坐在n号小朋友的顺时针方向 游戏开始 从1号小朋友开始顺时针报数 接下来每个小朋友
  • 安装TensorFlow遇到no module named ‘tensorflow’问题解决方法

    按照这个博客https blog csdn net qq 16633405 article details 79941696里的步骤安装TensorFlow时遇到no module named tensorflow 虽然作者给出了一个解决方
  • 文本多分类之Doc2Vec实战篇

    本文链接 https blog csdn net weixin 42608414 article details 88391760 版权 在我之前的几篇博客中 我介绍了两种文档向量化的表示方法 如Sklearn的CountVectorize
  • 1.3. 分治法—最近点对问题

    1 问题描述 给定平面S上n个点 找其中的一对点 使得在n个点组成的所有点对中 该点对间的距离最小 2 求解过程 划分 将集合S分成两个大小基本相等的子集 S 1 S 1 S1 和 S
  • linux 基础知识考试试题,Linux常识型试题

    Linux常识型试题 发布时间 2011 06 06 18 11 10来源 红联作者 lijiang i s 本帖最后由 lijiang 于 2011 10 22 17 51 编辑 i 一 填空题 1 链接分为 和 2 安装Linux系统对
  • 解决Linux界面显示问号字符?与Failed to set locale, defaulting to C报错

    解决方法 暂时性处理 export LC ALL zh CN UTF 8 一劳永逸 vim etc bashrc 然后在最后一行写入 export LC ALL zh CN UTF 8 问题复现 解析 当输入 locale 会得到如下结果
  • 数据结构----利用栈实现表达式的计算

    利用栈实现表达式的计算 例如 12 5 6 9 7 8 5 6 8 5 6 12 要解决的问题主要有两个 和 的运算顺序的处理问题 括号内的表达式优先运算问题 这里利用栈来解决这两个问题 首先我们设置两个栈 一个符号栈 一个数字栈 下面我们
  • Novell数据备份

    从昨天下午到现在 才搞定 关总不提示的情况下 我一直认为xvRf是更新备份数据 cvRf是全部备份 其实则不然 关总告诉我 xvRf是导入数据 而cvRf才是备份数据 如果网络成功链接的话 那NDS服务器的数据就会被老数据覆盖了 幸好幸好
  • Android Studio day_01 初识线性布局和相对布局还有按钮

    序章 今天学习了线性布局 LinearLayout 和相对布局 RelativeLayout 还有Button按钮 布局是要用和进行结束的 至于Botton按钮嘛 使用 gt 结束就好啦 相对布局 RelativeLayout 相对布局我理
  • 卸载npm和安装npm_使用`npm uninstall`卸载npm软件包

    卸载npm和安装npm To uninstall a package you have previously installed locally using npm install
  • 激光雷达对植被冠层结构和SIF同时探测展望

    前言 陆表植被在全球碳循环中起着不可替代的作用 但现阶段 人们对气候变化与植被生态理化功能的关系的研究还不够完善 为了提高气候预测以及缓解气候恶化的速率 对植被参数比如 叶面积指数 leaf 植被冠层结构 canopy 和生态系统以及区域尺
  • Linux服务器程序规范

    Linux服务器程序规范 Linux服务器程序一般都是以后台进程形式运行 后台进程又称为守护进程 daemon 其没有控制终端 不会意外接收到用户输入 守护进程的父进程通常是init进程 PID为1的进程 Linux服务器程序通常有一套日志
  • Tomcat启动不了报 java.net.BindException “Address already in use: NET_Bind“这个异常

    Tomcat在IDEA运行报以下错误 启动不了Tomcat Error running Tomcat 8 5 57开关 Unable to open debugger port 127 0 0 1 63840 java net BindEx
  • Hive文件格式

    文章目录 1 概述 1 1 行存储 列存储 2 TEXTFILE 3 SEQUENCEFILE 3 RCFILE 4 ORCFILE 5 Parquet 8 区别 8 1 空间对比 磁盘空间占用大小比较 8 2 查询语句运行时间大小比较 9
  • socket链接检测超时时间过短导致的问题

    新增了另外一个区域的代理 跨州 原来的代理可达性检测只有50ms 就不够了 导致大量报错 更换为1000毫秒后 就正常了 需要注意网络中几个连接超时时间的设置问题 1 链接超时时间 一般是1 5秒 全内网服务器 可以设置得更短一些 2 等待
  • 《消息队列高手课》 消息积压了该如何处理?

    据我了解 在使用消息队列遇到的问题中 消息积压这个问题 应该是最常遇到的问题了 并且 这个问题还不太好解决 我们都知道 消息积压的直接原因 一定是系统中的某个部分出现了性能问题 来不及处理上游发送的消息 才会导致消息积压 所以 我们先来分析