RocketMQ 用法详解,你学会了吗?

2023-11-14

大家好,我是指北君。

消息中间件是我们工作中使用最频繁的一类中间件,它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。今天,指北君就来详细讲讲RocketMQ生产者和消费者在使用时的一些注意事项。

一. 生产者

1.1 发送消息注意事项

1)消息大小

建议消息大小不要超过512K。

2)异步发送

默认的发送为同步发送,send方法会一直阻塞,等待broker端的响应。如果你关注性能问题,可以通过send(msg, callback)来发起异步调用。

3)生产者组

正常情况下生产者组是没有作用的,但是在发送事务消息时,如果producer中途意外宕机,broker会主动回调producer group 内的任意一台机器来确认事务的状态。(目前开源版本还不支持事务消息)。

4)线程安全问题

生产者实例是线程安全的,在应用中只需要实例化一次即可。

5)性能问题

如果你希望在一个jvm进程内使用多个producer实例来提高发送能,我们建议:

使用异步发送,并且producer实例只需要3 ~ 5个即可 对每一个producer 调用 setInstanceName,区别不同的生产者。

6)发送超时时间

当客户端向broker发送请求超时时,客户端会抛出 RemotingTimeoutException,默认的超时时间是3秒。通过调用send(msg, timeout) 可以设置超时时间。超时时间建议不要设置过小,因为 broker 可能需要时间刷盘或向 slave 同步数据。

7)对于同一个应用最好只使用一个Topic,消息的子类型可以使用 tags 来标识,tags 可以由应用自由设置。当发送的消息设置了 tags 时,消费方在订阅消息时可以使用 tags 在 broker 做消息过滤。注意这里的命名虽然是复数,但是一条消息只能有一个tag。

8)消息在业务层面的唯一标识可以设置到 keys 字段,方便根据 keys 来定位消息。broker 会为每个消息创建索引(哈希索引),应用可以通过topic 、key 查询这条消息的内容(MessageExt),以及消息被谁消费(MessageTrack,精确到consumer group)。由于是哈希索引,请尽量保证key 的唯一,这样可以避免潜在的哈希冲突。

9)消息发送不管是成功还是失败都要打印消息日志,日志内容务必包含 sendResult 和 key 字段。

10)对于消息不可丢失的应用,务必要有消息重发机制。例如如果消息发送失败,可以将消息存储到数据库,然后通过定时程序或者人工的方式触发重发。

11)调用send 同步发送消息时,假定此时设置了 isWaitStoreMsgOK=true(default is true),只要不抛出异常就代表发送成功,但当 isWaitStoreMsgOK = false 时,发送永远返回 SEND_OK。但是对于发送“成功”会有多个状态,在 SendStatus 中定义如下:

FLUSH_DISK_TIMEOUT

如果 broker 设置的 FlushDiskType = SYNC_FLUSH,当 broker 的在刷盘超时时(MessageStoreConfig.syncFlushTimeout,默认5秒)会返回该状态。此时消息任然保存在内存中,只有broker 宕机时消息才会丢失。

FLUSH_SLAVE_TIMEOU

如果 broker 的 role 是 SYNC_MASTER,当 slave 同步数据的时间超过了 MessageStoreConfig.syncFlushTimeout (默认5秒) 时会返回此状态。此时只有主从都宕机,并且主也没有刷盘时,消息才会丢失。

SLAVE_NOT_AVAILABLE

如果 broker 的 role 是 SYNC_MASTER,并且此时 slave 不可用时会返回该状态。

SEND_OK

发送成功。为了保证消息不丢失还需要配置 SYNC_MASTER or SYNC_FLUSH。

12)消息重复

当发送消息时返回 FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT,若非常不幸的 broker 也宕机了,消息将会丢失。此时如果什么都不做,消息可能会丢失,如果重发消息,消息可能会出现重复。

通常我们建议发送端重发消息,由消费方来保证消息消费的幂等性。

1.2 消息发送失败如何处理

Producer 的 send 方法本生支持内部重试,重试逻辑如下:

至多重试3次 如果发送失败,则轮转到下一个broker 这个方法的总耗时时间不超过 sendMsgTimeout,默认3秒 所以发送消息已经产生超时异常的话就不会再重试。以上策略仍不能保证消息发送一定成功,为保证消息发送一定成功,建议应用这么做:如果调用 send 同步发送失败,则尝试将消息存储到db,由后台线程定时重试,保证消息一定到达 Broker。

1.3 oneway 的发送形式

对于可靠性要求不高的应用,可以采用 oneway 的发送形式,oneway 形式不等待应答。

1.4 发送顺序消息

顺序消息分为分区有序和全局有序。

分区有序要求 producer 在send 时传入 MessageQueueSelector 的实现类,最终将某一类消息发送到同一队列。但是一旦发生通信异常、broker 重启等,由于队列总数发生变化,哈希取模后定位的队列会变化,会产生短暂的顺序不一致。如果业务能容忍在集群异常情况下(如某个 broker 宕机或者重启)消息短暂的乱序,使用分区有序比较合适。

全局严格有序的消息即便在异常情况下也能保证消息的有序性,但是却牺牲了分布式的 failover 特性,即 broker 集群中只有要一台机器不可用,则整个集群都不可用,服务可用性会大大降低。

顺序消息的缺点:

发送顺序消息无法利用集群的 FailOver 特性 消费顺序消息的并行度依赖于队列数量 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消费堆积问题 遇到消费失败的消息,无法跳过,当前队列需要暂停 5.发送事务消息 目前暂不支持。

二. 消费者

2.1 消费者组和订阅

不同的消费者组可以独立消费相同的topic,这点类似于ActiveMQ的虚拟 topic. 另外对于相同的消费者组,需要确保组内的消费者订阅消息的规则是一致的!

MQ 里的一个Consumer Group 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个 Consumer Group 下通常会挂载多个 Consumer 实例。订阅关系一致指的是同一个 Consumer Group 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

由于 MQ 的订阅关系主要由 Topic+Tag 共同组成,因此,保持订阅关系一致意味着同一个 Consumer Group 下所有的实例需在以下两方面均保持一致:

订阅的 Topic 必须一致;订阅的 Topic 中的 Tag 必须一致。

技术架构 > Consumer 最佳实践 > image2017-11-15 15:50:13.png

2.2 MessageListener

1)顺序消费 MessageListenerOrderly

顺序消费时消费者会锁定队列,以确保消息被顺序消费,但是这样也会造成一定的性能损耗。当消费出现异常的时候,建议不要抛出异常,而是返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,让消费暂停一会,暂停时间由 context.setSuspendCurrentQueueTimeMillis 方法指定。

2)并发消费

并发消费是推荐的消费方式,在此种模式下,消息将被并发的消费。消费出现异常时不建议抛出异常,只需要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 即可。为了保证消息肯定被至少消费一次,消息将会被重发回 broker (topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置,通过 delayLevelWhenNextConsume 和 MessageStoreConfig.messageDelayLevel 设置)后,再次投递到这个 ConsumerGroup,而如果一直这样重复消费都持续失败到一定次数(默认是16次,DefaultMQPushConsumer.maxReconsumeTimes),就会投递到DLQ队列。应用可以监控死信队列来做人工干预。

3)返回状态

在并行消费时可以通过返回 RECONSUME_LATER 来告诉 Consumer 当前无法消费该消息,等延时一段时间再重新消费,但是此时消费不会停止,你可以继续消费其他消息。但在顺序消费时,因为要保证消费的顺序性,所以你不能跳过失败的消息,此时你可以通过返回 SUSPEND_CURRENT_QUEUE_A_MOMENT 来告诉 Consumer 先暂停一会。

4)阻塞

不建议阻塞Listener,因为这会阻塞住线程池,同时也有可能造成消费者线程终止。

2.3 线程数

consumer 内部通过一个 ThreadPoolExecutor 来消费消息,可以通过 setConsumeThreadMin 和 setConsumeThreadMax 来改变线程池的大小。

2.4 ConsumeFromWhere

当新实例启动的时候,PushConsumer会拿到本消费组broker已经记录好的消费进度(consumer offset),按照这个进度发起自己的第一次Pull请求。

如果这个消费进度在Broker并没有存储起来,证明这个是一个全新的消费组,这时候客户端有几个策略可以选择:

CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息。

CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍。

CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 注意:这些配置只对全新的消费组有效,老的消费组都是按已经存储过的消费进度继续消费。

对于老消费组想跳过历史消息可以采用以下几种方法:

1)判断消息的发送时间,太老的消息直接返回 CONSUME_SUCCESS。

2)判断消息的 offset 和 MAX_OFFSET 的差距,如果落后太多,可以直接。返回 CONSUME_SUCCESS。

3)消费者启动前,先调整该消费组的消费进度,再开始消费。可以人工使用命令 resetOffsetByTimeStamp,详见 ResetOffsetByTimeCommand.java。

2.5 消息幂等

由于 RocketMQ 无法避免消费重复,所以如果业务对消息重复非常敏感,务必在业务层面去重。

2.6 消费速度慢处理方式

1)提高消费并行度

大部分消息消费行为都属于 IO 密集型业务,适当的提高并发度可以显著的改善消费的吞吐量。

2)批量方式消费

默认情况下 consumer 的 consumeMessageBatchMaxSize 为1,即一次只消费一个消息,如果应用可以批量消费消息,则可以很大程度上提高消费吞吐量。

3)跳过非重要消息

当消堆积严重时可以丢弃不重要的消息。

4)优化消息消费过程

2.7 打印消费日志

建议在消费入口方法打印消息,方便后续排查问题,消费失败时也打印失败日志。

2.8 利用broker过滤消息,避免多余的消息传输

三. 小结

好了,RocketMQ生产者与消费者的使用事项就总结完毕了,相信大家对RocketMQ的使用应该会更有信心了。

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ 用法详解,你学会了吗? 的相关文章

  • Java中printf左对齐

    当我运行该程序时 阶乘值右对齐 有没有办法让它左对齐 同时保持中间 50 个空格 public class Exercise 5 13 public static void main String args int numbers 1 2
  • 将命令行参数传递给可运行的 JAR [重复]

    这个问题在这里已经有答案了 我从 Eclipse 项目构建了一个可运行的 JAR 用于处理给定的 XML 文件并提取纯文本 但是 此版本要求将该文件硬编码在代码中 有没有办法做这样的事情 java jar wiki2txt enwiki 2
  • 具有多字符替换的字符串组合(产生返回Java的替代重写)[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 还有另一篇 Stack Overflow 帖子是为与车辆登记号相关的算法创建的 根据输入的车牌 例如ABC123 和列表 替换值 例如
  • Java switch case 抛出 nullPointer 异常

    我有一个枚举声明如下 public enum Status REQ URL1 NOT URL2 GET URL3 String getURL Status String getURL this getURL getURL 我班上的一个领域
  • 在word文档的标题中添加图片时出现问题

    我正在Word文档的标题中添加图片 它显示图像的框架并显示 当前无法显示图像 如果我将文本添加到标题 它会显示文本 如果我在文档正文中添加图像 它也会显示图像 获取图像也是如此 它在标题上显示文本 但没有图像 我的支票快用完了 有人可以建议
  • 实现一个java UDF并从pyspark调用它

    我需要创建一个在 pyspark python 中使用的 UDF 它使用 java 对象进行内部计算 如果它是一个简单的 python 我会做类似的事情 def f x return 7 fudf pyspark sql functions
  • 从命令行将 clojure 源代码编译为类(AOT)(不使用 lein)

    我正在尝试将 clojure 源代码编译成类文件 并仅使用命令行运行它 没有 lein 也没有 可能 回复 我有 core cljsrc hello目录 src hello core clj 这是源代码 ns hello core defn
  • 如何抑制有关已弃用 api 的 javac 警告?

    当我编译时 javac 输出 Note Some input files use or override a deprecated API Note Recompile with Xlint deprecation for details
  • 尽管设置为 1.7,IntelliJ IDEA 13 仍使用 Java 1.5

    尽管在所有项目设置中指定了 JDK 1 7 包括File gt Project Structure gt Project Project SDK 则产生以下错误IntelliJ 13当尝试编译一些使用菱形运算符的简单 Java 7 代码时
  • 无法向 openfire 服务器发送消息

    我无法使用 SMACK API 向 openfire 服务器上的 XMPP 客户端发送消息 我不确定我哪里出错了 我在 gtalk 上测试了相同的代码 它工作正常 public class SenderTest public static
  • 更新(合并)时缺少 Spring Data JPA 验证

    我正在使用 Spring Boot 1 5 4 RELEASE 和 Spring Data JPA 进行项目 遇到更新实体时未执行 Hibernate 验证器或至少在某些情况下未验证的问题 For Person如下所示 禁止使用空名称 并且
  • 如何在 Spring Boot 中跳过将某些 @Entity 类创建为 h2(内存中)数据库中的表?

    我正在尝试构建一个使用 2 个数据源的 Spring Boot 应用程序 我现在的主要数据库是内存数据库 仅用于测试目的 其中的表是在我创建的 sql 文件的帮助下填充的 另一个数据库 oracledb 具有已填充的表 我想实现什么目标 我
  • 强制预先加载原本延迟加载的属性

    我有一个 Hibernate 对象 它的属性都是惰性加载的 大多数这些属性是其他 Hibernate 对象或 PersistentSet 现在我想强制 Hibernate 一次性加载这些属性 当然 我可以 触摸 这些属性中的每一个objec
  • 如何反序列化数组 google-gson 内的数组

    我有这样的 JSON Answers Locale Ru Name Name1 Locale En Name Name2 Locale Ru Name Name3 Locale En Name Name4 正如你所看到的 我的数组里面有数组
  • 为什么这个 eclipse 错误显示以及它的解决方案应该是什么

    缺少库 xdoclet 1 2 1 jar 选择 XDoclet 的主目录 1 2 1 为什么这个 eclipse 错误显示以及它的解决方案应该是什么alz 这可能是因为该 jar 没有添加到您的项目构建路径中 请按照以下步骤操作 Righ
  • GWT 和身份验证

    保护 GWT Tomcat 应用程序执行身份验证和授权的最佳策略是什么 有两种基本策略 确保入口点安全 确保远程服务的安全 确保入口点安全 最简单的方法是使用常规 Web 应用程序安全工具限制对 GWT 生成的 html js 文件的访问
  • 优化Gson反序列化

    优化反序列化的最佳方法是什么 我目前正在使用标准 Gson toJson 和 Gson fromJson 方法来序列化和反序列化一些复杂对象 我希望尽可能减少反序列化时间 如果重要的话 我的最复杂的对象包含 43 个变量 如果你想使用 Gs
  • 使用java读取行并映射过滤数据[关闭]

    这个问题不太可能对任何未来的访客有帮助 它只与一个较小的地理区域 一个特定的时间点或一个非常狭窄的情况相关 通常不适用于全世界的互联网受众 为了帮助使这个问题更广泛地适用 访问帮助中心 help reopen questions publi
  • 在 WildFly 10 中添加 jar 作为部署

    有没有办法 我们可以将 jar 部署为库 部署WildFly 10就像我们可以做到的那样weblogic服务器 或者我们可以将 jar 放在服务器的任何文件夹中并将这些依赖项定义为provided 我得到了什么部署方式jars on Wil
  • Oracle 的商业 Hotspot JVM 相对于 OpenJDK 有哪些性能优势?

    正如这个问题中所描述的 OpenJDK 与 Java HotspotVM https stackoverflow com q 44335605 1593077 Oracle 的商业 Hotspot JVM 本质上是 OpenJDK 加上一些

随机推荐

  • 线性表的查找算法-C语言

    文章目录 一 实验目的 二 实验内容 三 实验工具 四 实验代码 五 实验结果 六 总结与思考 一 实验目的 了解查找的基本概念 理解顺序查找 折半查找和分块查找的思想 掌握折半查找过程的判定树构造方法 实现线性表的查找算法 二 实验内容
  • mybatis中的if-else语句的使用解答

    1 mybatis中if else语句的语法 使用模板样例
  • 什么是沙箱技术?与容器有什么区别

    沙箱和容器的 隔离 机制 首先 什么是沙箱 它本身就是一种线下生活现象的虚拟化 现实世界里 小孩子们在沙地 沙滩上用木板隔离出一个方盒子 在盒子里堆砌 创造各种东西 城堡 房屋 山丘 这就是一个沙箱 它有两个根本特点 一 它有边界 通过木板
  • aix oracle 11 补丁包,oracle 11g for aix6.1安装基本步骤(含升级11.1.0.7)

    oracle 11g for aix6 1安装基本步骤 含升级11 1 0 7 1 检查物理内存 swap空间以及tmp空间 usr sbin lsattr E l sys0 a realmem 检查内存至少1G usr sbin lsps
  • anaconda创建python环境

    1 前提 系统中安装了anaconda沙箱环境 下载地址 anaconda官网 conda V 检验是否安装以及当前conda的版本 2 conda常用的命令 1 conda list 查看安装了哪些包 2 conda env list 或
  • bcd码和十进制码之间的转换

    BCD码转十进制 static u8 BCDToInt u8 value unsigned char temp 0 temp value gt gt 4 10 temp value 0x0F return temp 十进制转BCD码 sta
  • java awt linux_解决在linux下awt调用错误的问题

    在java中使用awt在服务器上处理图片的时候发现有错 第一遍执行 500 Servlet Exception java lang InternalError Can t connect to X11 window server using
  • MyBatis自动生成实体类

    MyBatis MySQL生成实体类 需要的工具jar包 mybatis generator core 1 3 2 jar mysql connector java 5 0 4 jar 第一步 编写一个MybatisGeneratorUti
  • android 之 如何让app没有图标

    我们有时候需要让我们的app没有图标 不要问我没有图标要干啥 我只是说的一个需求 单讲技术不说别的 首先我们要获得 PackageManager 的对象 PackageManager packageManager getPackageMan
  • Caffeine缓存的使用

    1 springboot集成Caffeine
  • KeePass搭建一个私人密码库

    文章作者 GoodBoyboy 文章链接 https blog goodboyboy top posts 2546190081 html 版权声明 本博客所有文章除特别声明外 均采用 CC BY NC SA 4 0 许可协议 转载请注明来自
  • 线程(进程)的同步与互斥实例

    1 有一个队列 线程1负责从网络接收分组 并将收到的分组放到队列尾 然后再次从网络中接收下一个到达的分组 并进行同样的队列操作 线程2从此队列头中取出一个分组进行处理 处理完毕后 再次从队列中取出一个分组进行处理 处理完毕后再次从队列头取出
  • [转]Python实现多功能音乐播放器

    前言 就是用Python做一个简易的音乐播放器 废话不多说 咱们直接开干 当然 今天做这个肯定不是最简单的 最简单的音乐播放器 9行代码足以 import time import pygame file r 歌曲路径 pygame mixe
  • torch.hub.load()解析,如何加载本地权重

    用yolov5训练了一个权重 项目只能部署在本地 官方文档 torch hub load repo or dir model args source github force reload False verbose True skip v
  • Python 第一阶段

    第一章 安装 1 1 开发环境 官网 https www python org稳定版 Stable Releases检验 cmd 命令 python version 1 2 开发工具 PyCharm官网 https www jetbrain
  • (Struts2学习篇) Struts2配置文件之 struts-default.xml

    对struts default xml的一些注释
  • zmq+protobuf 的坑点难点

    zmq protobuf 的坑点难点 之前项目要用到zmq protobuf的方式传递数据 软件采用前后端分离的方式开发 其中前端是异地同事用python开发的 后端是我们这边用C 开发的 1 中间有遇到问题是前后端传送zmq信息时 发现字
  • DRM驱动(七)之atomic_commit

    上节已经把应用的参数check了一遍 这次就可以把对应的参数配置到硬件里进行刷图操作了 int drm atomic commit struct drm atomic state state struct drm mode config c
  • Qt 控制台运行无法弹出小黑框

    Qt Console Application Qt 主要是GUI界面的设计 但在学习的时候控制台运行显得更加方便一些 小编在第一次新建控制台运行的时候 点击运行没有弹出小黑框 解决方法 主要是因为没有执行qmake 就需要在Qt的pro文件
  • RocketMQ 用法详解,你学会了吗?

    大家好 我是指北君 消息中间件是我们工作中使用最频繁的一类中间件 它具有低耦合 可靠投递 广播 流量控制 最终一致性等一系列功能 成为异步RPC的主要手段之一 当今市面上有很多主流的消息中间件 如老牌的ActiveMQ RabbitMQ 炙