RocketMQ-高级原理

2023-11-17

本节讲解下当MQ消息消费失败,或者发送不成功时如何处理消息,消息发送不成功一般存在于几种情况,网络原因,服务宕机,或者broker配置

消息发送失败

 如果是由于broker配置原因,可以通过报错提示排查原因:

无法查到路由信息,一般考虑到rocketMQ读取路由信息过程:

  • 如果Broker开启了自动创建Topic,在启动的时候会默认创建主题:并会随着Broker发送到Nameserver的心跳包汇报给Nameserver,继而从Nameserver查询路由信息时能返回路由信息。
  • 消息发送者在消息发送时首先会查本地缓存,如果本地缓存中存在,直接返回路由信息。
  • 如果缓存不存在,则向Nameserver查询路由信息,如果Nameserver存在该路由信息,就直接返回。
  • 如果Nameserver不存在该topic的路由信息,如果没有开启自动创建主题,则抛出 No route info of this topic。
  • 如果开启了自动创建主题,则使用默认主题向Nameserver查询路由信息,并使用默认Topic的路由信息为自己的路由信息,将不会抛出 No route info of this topic。
     

消息发送超时

客户端报消息发送超时,通常第一怀疑的对象是RocketMQ服务器,是不是Broker性能出现了抖动,无法抗住当前的量。那我们如何来排查RocketMQ当前是否有性能瓶颈呢?

查看rocketMQ日志:

cd ~/logs/rocketmqlogs/
grep -n 'PAGECACHERT' store.log | more

因为网络抖动原因出现的消息超时,通过减少消息发送的超时时间,增加重试次数,并增加快速失败的最大等待时长。具体措施如下:

  • 增加Broker端快速失败的时长,建议为1000,在broker的配置文件中增加如下配置:

    maxWaitTimeMillsInQueue=1000
DefaultMQProducer producer = new DefaultMQProducer("dw_test_producer_group");
producer.setRetryTimesWhenSendFailed(5);// 同步发送模式:重试次数
producer.setRetryTimesWhenSendAsyncFailed(5);// 异步发送模式:重试次数
producer.start();
producer.send(msg,500);//消息发送超时时间

 消息重试

首先对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。而对于普通的消息,当消费者消费消息失败后,你可以通过设置返回状态达到消息重试的结果。

如何让消息进行重试 ,集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:

  • 返回Action.ReconsumeLater-推荐
  • 返回null
  • 抛出异常
public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //处理消息
        doConsumeMessage(message);
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("Consumer Message exceotion");
    }
}

如果希望消费失败后不重试,可以直接返回Action.CommitMessage。

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            doConsumeMessage(message);
        } catch (Throwable e) {
            //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        //消息处理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}

生产者消息重试策略:

如果由于网络抖动等原因,Producer程序向Broker发送消息时没有成功,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试

DefaultMQProducer可以设置消息发送失败的最大重试次数,并可以结合发送的超时时间来进行重试的处理,具体API如下:


//设置消息发送失败时的最大重试次数
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
   this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
 
//同步发送消息,并指定超时时间
public SendResult send(Message msg,
                      long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
   return this.defaultMQProducerImpl.send(msg, timeout);
}

消费者消费异常重试

异常重试:由于Consumer端逻辑出现了异常,导致返回了RECONSUME_LATER状态,那么Broker就会在一段时间后尝试重试。
超时重试:如果Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。
因此,如果Consumer端正常消费成功,一定要返回ConsumeConcurrentlyStatus.ConsumeConcurrentlyStatus状态

异常重试

RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
1

但是在大部分情况下,如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。

        defaultMQPushConsumer.setMaxReconsumeTimes(5);

死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。

死信队列的名称是%DLQ%+ConsumGroup

死信队列的特征:

  • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
  • 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
  • 死信队列中的消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。

注:默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。

消息幂等

幂等的概念

在MQ系统中,对于消息幂等有三种实现语义:

  • at most once 最多一次:每条消息最多只会被消费一次
  • at least once 至少一次:每条消息至少会被消费一次
  • exactly once 刚刚好一次:每条消息都只会确定的消费一次

这三种语义都有他适用的业务场景。

  • 其中,at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。
  • 而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。
  • 而这个exactly once是MQ中最理想也是最难保证的一种语义,需要有非常精细的设计才行。RocketMQ只能保证at least once,保证不了exactly once。所以,使用RocketMQ时,需要由业务系统自行保证消息的幂等性。

关于这个问题,官网上有明确的回答:4. Are messages delivered exactly once?RocketMQ ensures that all messages are delivered at least once. In most cases, the messages are not repeated.

消息幂等的必要性

在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

  • 发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

处理方式

      从上面的分析中,我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ-高级原理 的相关文章

  • centos安装rocketmq

    centos安装rocketmq 1 下载rocketmq二进制包 2 解压二进制包 3 修改broker conf 4 修改runbroker sh和runserver sh的JVM参数 5 启动NameServer和Broker 6 安
  • OSAL

    OSAL为 Operating System Abstraction Layer 即操作系统抽象层 支持多任务运行 它并不是一个传统意义上的操作系统 但是实现了部分类似操作系统的功能 OSAL概念是由TI公司在ZIGBEE协议栈引入 他的意
  • (一)调试RocketMq源码

    文章目录 一 启动NameServer 1 1namesrv模块找到NamesrvStartup java 1 2修改配置文件目录 1 3启动NamesrvStartup java 二 配置Broker 2 1配置BrokerStartup
  • 7 SpringBoot整合RocketMQ发送单向消息

    发送单向消息是指producer向 broker 发送消息 执行 API 时直接返回 不等待broker 服务器的结果 这种方式主要用在不特别关心发送结果的场景 举例 日志发送 RocketMQTemplate给我们提供了sendOneWa
  • 【RocketMQ】NameServer总结

    NameServer是一个注册中心 提供服务注册和服务发现的功能 NameServer可以集群部署 集群中每个节点都是对等的关系 没有像ZooKeeper那样在集群中选举出一个Master节点 节点之间互不通信 服务注册 Broker启动的
  • RabbitMQ理论+实战

    1 引出 1 1 中间件应用场景 1 跨系统数据传输 2 高并发的流量削峰 3 数据的分发与异步处理 4 大数据分析与传递 5 分布式事务 1 2 中间件常用协议 01 什么是协议 所谓协议是指 1 计算机底层操作系统和应用程序通讯时共同遵
  • RocketMq存储设计——Index file

    RocketMq存储设计 Index file index file设计 rocket mq存储设计
  • 分析rocketmq-client产生大量rocketmq_client.log日志文件的原因处理方案

    源码 public static final String CLIENT LOG USESLF4J rocketmq client logUseSlf4j public static final String CLIENT LOG ROOT
  • Rocketmq Filter 消息过滤(TAGS、SQL92)原理详解 & 源码解析

    1 背景 1 1 Rocketmq 支持的过滤方式 Rocketmq 作为金融级的业务消息中间件 拥有强大的消息过滤能力 其支持多种消息过滤方式 表达式过滤 通过设置过滤表达式的方式进行过滤 TAG 根据消息的 tag 进行过滤 SQL92
  • 32 Consumer消息零丢失方案:手动提交offset + 自动故障转移

    1 消费者 红包系统 丢失消息的问题 前面两章中 阐述了如何确保订单系统发送出去的消息一定会到达MQ中 而且也能确保了如果消息到达了MQ如何确保一定不会丢失 在整个消息的生产消费中 就剩下消费者这一端的问题了 红包系统 消费者 拿到消息后
  • spring整合RocketMQ

    1 看官方javademo https www apache org dyn closer cgi path rocketmq 4 2 0 rocketmq all 4 2 0 source release zip 下载下来 spring
  • RocketMQ占用内存过大的解决方法

    目录 一 问题描述 二 解决方法 1 runserver sh 修改 2 runbroker sh 修改 一 问题描述 RocketMQ 启动后 一下子把内存撑爆了 二 解决方法 修改启动参数 分别对 bin 目录下的 runserver
  • rocketMq启动broker报错找不到或无法加载主类 Files\Java\jdk1.8.0_171\lib\dt.jar;C:\Program]

    假如弹出提示框提示 错误 找不到或无法加载主类 xxxxxx 1 打开runbroker cmd 将 CLASSPATH 加上英文双引号 切勿别加中文双引号 2 打开runserver cmd 同理 将 CLASSPATH 加上英文双引号
  • 1 RocketMQ简介

    简介 RocketMQ是由阿里捐赠给Apache的一款低延迟 高并发 高可用 高可靠的分布式消息中间件 经历了淘宝双十一的洗礼 RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力 同时也具备互联网应用所需的海量消息堆积 高吞吐
  • 关于rocketmq 中日志文件路径的配置

    前些天发现了一个巨牛的人工智能学习网站 通俗易懂 风趣幽默 忍不住分享一下给大家 点击跳转到网站 rocketmq 中的数据和日志文件默认都是存储在user home路径下面的 往往我们都需要修改这些路径到指定文件夹以便管理 服务端日志 网
  • RocketMQ系列之架构浅谈

    RMQ的架构设计 下面我从GitHub上截取了一张RMQ的源码结构图 图中我框框出来的9大模块 基本就构成了整个RMQ的内部结构 上面9大模块的依赖层次主要如下 依赖越强的越处于底层 下面介绍下最上层的4个模块 这4个模块中工具命令行就不讲
  • RocketMQ的消息优先级

    有些场景 需要应用程序处理几种类型的消息 不同消息的优先级不同 RocketMQ是个先入先出的队列 不支持消息级别或者Topic级别的优先级 业务中简单的优先级需求 可以通过间接的方式解决 下面列举三种优先级相关需求的具体处理方法 第一种
  • RabbitMQ(四):RabbitMQ高级特性

    消息队列在使用过程中 面临着很多实际问题需要思考 消息可靠性问题 如何确保发送的消息至少被消费 次 延迟消息问题 如何实现消息的延迟投递 消息堆积问题 如何解决数百万消息堆积 无法及时消费的问题 高可用问题 如何避免单点的MQ故障而导致的不
  • Apache RocketMQ 远程代码执行漏洞(CVE-2023-33246)

    漏洞简介 RocketMQ 5 1 0及以下版本 在一定条件下 存在远程命令执行风险 RocketMQ的NameServer Broker Controller等多个组件外网泄露 缺乏权限验证 攻击者可以利用该漏洞利用更新配置功能以Rock
  • 基于Jmeter实现Rocketmq消息发送

    在互联网企业技术架构中 MQ占据了越来越重要的地位 系统解耦 异步通信 削峰填谷 数据顺序保证等场景中 到处都能看到MQ的身影 而测试工程师在工作中 也经常需要和mq打交道 比如构造测试数据 触发某些业务场景 以及针对mq的性能测试等 目前

随机推荐

  • C语言实现TCP连接

    开发环境 TCP服务端 TCP UDP测试工具 开发环境 Linux 编程语言 C语言 TCP UDP测试工具工具的使用请自行百度 我们用这款软件模拟TCP服务端 效果展示 代码编写 include
  • bootstrap中container类和container-fluid类的区别

    近几天才开始系统的学习bootstrap 但马上就遇到了一个 拦路虎 container和container fluid到底什么区别 查了很多资料 看到很多人和我有同样的疑问 但是下面的回答一般都是一个是响应式一个宽度是百分百 说的好像是那
  • 【华为OD机试】斗地主之顺子(C++ Python Java)2023 B卷

    时间限制 C C 1秒 其他语言 2秒 空间限制 C C 262144K 其他语言524288K 64bit IO Format lld 语言限定 C clang11 C clang 11 Pascal fpc 3 0 2 Java jav
  • firefox 阻止此页面创建其他对话框的解决方法

    用Firefox操作弹出界面时总是遇到 firefox 阻止此页面创建其他对话框 点击确定后 控制台就会报错误 解决方法 1 在firefox里输入about config 2 在列表框里右键 gt 新建 gt 整数 3 输入选项名dom
  • Redis底层数据结构

    Redis简单介绍一下 Redis是一个开源的 使用C语言编写的 支持网络交互的 可基于内存也可持久化的Key Value数据库 有哪些数据结构 说起Redis数据结构 肯定先想到Redis的5 种基本数据结构 String 字符串 Lis
  • 日期类之运算符重载

    date h pragma once include
  • linux tcpdump抓包命令详解,tcpdump(抓包分析命令详解)

    TCPDump可以完全拦截网络上传输的数据包以提供分析 它支持对网络层 协议 主机 网络或端口的过滤 并提供逻辑语句 例如和 或不帮助您删除无用的信息 tcpdump抓包分析命令详解 tcpdump是一个用于截取网络分组 并输出分组内容的东
  • 如何成为一个牛逼的脚本小子日记之0x001-JAVA 代码审计 Top half (2023829-...

    如何成为一个牛逼的脚本小子日记之 0x001 JAVA 代码审计 Top half 2023 8 29 2023 9 1 此记录是在拥有一定的java基础下进行的 java基础类 反射 继承 filter servlet calssLoad
  • Springboot实现热部署

    所谓的热部署 比如项目的热部署 就是在应用程序在不停止的情况下 实现新的部署 而Springboot在我们每次修改完代码之后 可能只是修改下打印的信息 就得重新启动App类 这样太浪费时间 有没有一种修改完代码让程序自动重启的方法呢 答案是
  • DirectShow中的工具GraphEdit使用小结

    一 安装完Windows SDK 7 0或7 1后 在C Program Files Microsoft SDKs Windows v7 0 Bin下有32位的graphedt exe 及x64目录下有64位版本的graphedt exe
  • Python—PEP8规范

    Python PEP8规范 介绍 代码布局 模块导入顺序 空格 注释 注释块 命名风格 应避免的名字 模块名 类名 异常名 全局变量名 函数名 方法名和实例变量名 设计建议 Python思维导图 app siweidaotu com R06
  • MySql如何获取表头字段?实用技巧

    show columns from 表名
  • Tomcat源码分析之getParameter(String)与getQueryString()

    本文有些地方的描述对某些人来说可能比较罗嗦 如果想直接进入正题 可阅读 源码分析 节 但本文是自己一步步分析解决问题思路的记录 虽然有些地方的思考还不是很深入 主要是由于时间不是很充裕 虽然花了三天时间 但感觉还是不够 我会在后续的博文中
  • OpenZeppelin集成Truffle编写健壮安全的合约

    原文 http truffleframework com tutorials robust smart contracts with openzeppelin 因为智能合约往往涉及金钱 保证Soldity代码没有错误 以及足够的安全是非常根
  • RTP/RTCP/RTSP负载H264的一些问题小结

    以下内容都是基于rfc3984 RTP负载H264时的参数配置 1 在TCP传输时 Transport头中的interleaved参数必须设置 比如0 1 或者2 3 海康的流中出现了4 但是没有配置 所以wireshark也无法解析cha
  • Maven实战(五)使用maven开发的项目,如何更方便地提取第三方包

    如果你的项目使用maven构建的话 当项目要上线 部署到服务器上去的时候或许会碰见这样的问题 问题就是 服务器上没有maven的环境 也就是说 项目所依赖到的那些仓库 repository 中的jar包你需要单独提取出来上传到服务器中去 解
  • 软件测试题答案

    搜索答案的一个方法 点Ctrl F 在页面上查找 输入题目上的部分字 软件测试题及答案 1 单选题 通常 是在编码阶段进行的测试 它是整个测试工作的基础 A 系统测试 B 确认测试 C 集成测试 D 单元测试 正确答案 D 2 单选题 如果
  • 华三ap设置无线服务器,H3C无线控制器V5版本配置AP上线的方法

    H3C无线控制器上没有开启自动上线 需要先手动在控制器上输入AP信息 然后新建vlan 新建vlan接口 新建wlan ess接口 新建无线服务 然后在AP上配置射频参数 交换机上需要开启DHCP 为手机终端分配IP地址 并且交换机上与AP
  • vscode相关问题处理

    1 跳转缓慢 跳转函数 一直转圈圈 比较慢 关闭vscode 删除索引文件 cd config Code User workspaceStorage rm rf 2 重新打开vscode即可 2 波浪线报错 在确认c cpp propert
  • RocketMQ-高级原理

    本节讲解下当MQ消息消费失败 或者发送不成功时如何处理消息 消息发送不成功一般存在于几种情况 网络原因 服务宕机 或者broker配置 消息发送失败 如果是由于broker配置原因 可以通过报错提示排查原因 无法查到路由信息 一般考虑到ro