RocketMQ系列之顺序消费

2023-11-03

前言

上节我们介绍了RMQ的两大亮点,重试和重复消费的问题,其实重试才能算亮点,重复消费最终还是要由我们自己来解决这个问题,RMQ自身并没有提供很好的机制,至少目前是没有,不知道将来会不会有,OK扯远了,今天呢,我们再来介绍RMQ一个不错的地方,那就是顺序消费,RMQ是可以保证同一个queue中的消息被顺序的消费。

 

RMQ实现如何实现顺序消费?

生产者Producer在生产消息时将需要顺序消费的消息发送到同一个queue下,每个topic默认是有4个queue所以Producer需要一个队列选择器来进行queue的选择;

消费者Consumer端在进行消息的消费时,消费者注册的消息监听器就不是之前的MessageListenerConcurrently,而是换成MessageListenerOrderly,这样就可以保证消费者只有一个线程去处理该消息;

 

Producer端如何操作?

生产端保证将消息发送到topic下同一个队列中即可:我们发送了8条消息到坐标为0的队列中:

public class Producer {
   public static void main(String[] args) throws MQClientException, InterruptedException {
       // 声明一个生产者,需要一个自定义生产者组(后面我们会介绍这个组的概念和作用)
       DefaultMQProducer producer = new DefaultMQProducer("myTestGroup");
       // 设置集群的NameServer地址,多个地址之间以分号分隔
       producer.setNamesrvAddr("");
       // 如果消息发送失败就进行5次重试
       producer.setRetryTimesWhenSendFailed(5);
       // 启动生产者实例
       producer.start();

       for (int i = 0; i < 8; i++) {
           Message msg = new Message("TopicTest", "order_1", "key" + i, ("order_1" + i).getBytes());
           // 调用Produce的send方法发送消息
           try {
               // 发送消息并构建一个queue选择器,保证消息都进入到同一个队列中
               SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                   // 重写了MessageQueueSelector 的select方法
                   @Override
                   public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) {
                       Integer id = (Integer) arg;
                       return list.get(id);
                   }
               }, 0);// 队列的下标
               System.out.println(sendResult);
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
       // 关闭
       producer.shutdown();
   }
}

 

Consumer端

Consumer注册MessageListenerOrderly监听即可:

public class Consumer {
   public static void main(String[] args) throws InterruptedException, MQClientException {
       // 声明一个消费者consumer,需要传入一个组
       DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerTest");
       // 设置集群的NameServer地址,多个地址之间以分号分隔
       consumer.setNamesrvAddr("");
       // 设置consumer的消费策略
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
       // 集群模式消费,广播消费不会重试
        consumer.setMessageModel(MessageModel.CLUSTERING);
       // 设置最大重试次数,默认是16次
       //consumer.setMaxReconsumeTimes(5);
       // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
       consumer.subscribe("TopicTest", "*");
       // Listener,主要进行消息的逻辑处理,监听topic,如果有消息就会立即去消费
       consumer.registerMessageListener(new MessageListenerOrderly() {
           @Override
           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext consumeOrderlyContext) {
               try {
                   MessageExt messageExt = msgs.get(0);
                   String msgBody = new String(messageExt.getBody(),"utf-8");
                   System.out.println(" 接收新的消息:消息内容为:"+msgBody);
               } catch (Exception e) {
                   e.printStackTrace();
                   System.out.println(e);
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
       });
       // 调用start()方法启动consumer
       consumer.start();
       System.out.printf("Consumer1 启动.%n");
   }
}

 

OK,我们先看下目前MQ上消息情况如下图:

 

我们依次启动消费者和生产者:

我们在看下控制台消息情况:8条消息出入记录

 

到这里可能有的小伙伴就会问了,你消息都发送到同一个队列,那如果我发2个队列,会是什么情况呢?我们把生产者改造下:生产者往下标0和3的队列分别发送4条消息:

for (int i = 0; i < 4; i++) {
           Message msg = new Message("TopicTest", "order_1", "key" + i, ("order_1" + i).getBytes());
           // 调用Produce的send方法发送消息
           try {
               // 发送消息并构建一个queue选择器,保证消息都进入到同一个队列中
               SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                   // 重写了MessageQueueSelector 的select方法
                   @Override
                   public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) {
                       Integer id = (Integer) arg;
                       return list.get(id);
                   }
               }, 0);// 队列的下标
               System.out.println(sendResult);
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
       for (int i = 0; i < 4; i++) {
           Message msg = new Message("TopicTest", "order_1", "key2" + i, ("order_2" + i).getBytes());
           // 调用Produce的send方法发送消息
           try {
               // 发送消息并构建一个queue选择器,保证消息都进入到同一个队列中
               SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                   // 重写了MessageQueueSelector 的select方法
                   @Override
                   public MessageQueue select(List<MessageQueue> list, Message msg, Object arg) {
                       Integer id = (Integer) arg;
                       return list.get(id);
                   }
               }, 3);// 队列的下标
               System.out.println(sendResult);
           } catch (Exception e) {
               e.printStackTrace();
           }
       }

 

我们再来看下消费者端是怎么消息的,是否保持顺序消费?

可能会出现上面2种结果:不管是第一种还是第二种结果,虽然第二种结果整体上不是有序的,但是仔细看每个每列中的消息,发现都是有序的,这也证明是有序消费指的是在同一个queue下而不是topic,针对的是队列;

其实MessageListenerOrderly设计就是不允许你在消费消息时启动多个线程去消费,这是设计上就不允许的;

 

还有一种情况就是启动多个consumer,同时消费,网上流传的版本是多个consumer会分别处理多个不同queue下的数据,我本地是没有测试出来,我试了N次的结果都是启动多个consumer时,只有一个consumer会去消费掉所有的消息,不知道是不是我使用的是新版本RMQ的原因还是别的原因,按道理来说一个组下的consumer是会负载均衡的去消费的,这点我后面再看看。

这是我执行的结果:我分别向4个queue发送了消息,都只会被一个consumer处理:

 

 

好了,关于顺序消费的问题就先到这了,这个问题后面我再去查阅相关资源看看到底是什么原因?今天先到这,感谢你的关注,感谢你的阅读!!!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ系列之顺序消费 的相关文章

  • Docker安装Kafka消息队列

    文章目录 1 安装zookeeper2 安装kafka3 安装kafka map xff08 可选 xff09 1 安装zookeeper span class token function docker span run span cla
  • RabbitMQ(四)消息Ack确认机制

    RabbitMQ 四 消息Ack确认机制 确认种类 RabbitMQ的消息确认有两种 消息发送确认 这种是用来确认生产者将消息发送给交换器 交换器传递给队列的过程中 消息是否成功投递 发送确认分为两步 一是确认是否到达交换器 二是确认是否到
  • rocketMq中文文档

    title 用户指引 date 2017 12 29 categories 文档翻译 为什么是RocketMQ 动机 在早期阶段 我们在ActiveMQ 5 x 早于5 3 的基础上构建我们的分布式消息中间件 我们的跨国业务使用它来实现异步
  • RocketMQ第二篇 单机版安装操作步骤

    MQ下载地址 下载RocketMQ 4 7 1版本 RocketMQ运行版本下载地址 https archive apache org dist rocketmq 4 7 1 rocketmq all 4 7 1 bin release z
  • 2 RocketMQ Server安装

    RocketMQ依赖Java环境 要求有JDK 1 8以上版本 支持Windows和Linux平台 支持源码方式安装和使用已经编译好的安装包安装 我们用windows平台安装RocketMQ Server编译好的安装包 来讲解RocketM
  • 分布式秒杀案例讲解教程文档

    程序员ken 一 准备工作 1 1 vmware软件安装 虚拟机 相关教程 http c biancheng net view 714 html 网络配置这块 1 进入网络配置文件目录 cd etc sysconfig network sc
  • RocketMQ系列之集群搭建

    前言 上节我们对RocketMQ 以下简称RMQ 有了一些基本的认识 大致知道了 什么是RMQ以及他能做什么 今天我们来讲讲如何搭建RMQ 与其说搭建RMQ不如说是搭建RMQ集群 为什么这么说呢 看完这篇文章自然就懂了 RMQ几个重要角色
  • RocketMQ学习笔记 - 顺序消息

    文章目录 1 定义 2 代码示例 2 1 消息实体 2 2 生产者 2 3 消费者 2 3 测试结果 1 定义 顺序消息 FIFO 消息 是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型 顺序消息由两个部分组成 顺序发布和顺序消费
  • RocketMQ系列之入门

    前言 之前我们把RMQ的多Master集群搭建起来了 我们今天就来看看如何向这个集群生产消息以及消费消息 集群搭建回顾 回顾上节的内容 我总结下以下几步 第一 最新版RMQ4 2 0要求最低JDK8版本 第二 修改虚拟机的host 配置na
  • Kafka常见面试题

    1 什么是消息中间件 2 kafka 是什么 有什么作用 3 kafka 的架构是怎么样的 4 Kafka Replicas是怎么管理的 5 如何确定当前能读到哪一条消息 6 生产者发送消息有哪些模式 7 发送消息的分区策略有哪些 8 Ka
  • Rocketmq原理&最佳实践

    一 MQ背景 选型 消息队列作为高并发系统的核心组件之一 能够帮助业务系统解构提升开发效率和系统稳定性 主要具有以下优势 削峰填谷 主要解决瞬时写压力大于应用服务能力导致消息丢失 系统奔溃等问题 系统解耦 解决不同重要程度 不同能力级别系统
  • 06-分布式消息队列Kafka

    目录 一 简介 1 什么是kafka 1 1 概念 1 2 特性 2 应用场景 二 原理 1 基本概念 1 1 Broker 代理 1 2 Topic 主题 1 3 Partition 分区 1 4 Replication 副本 1 5 P
  • RabbitMQ的安装

    一 安装erlang环境 官网下载 http www erlang org downloads 这个文件其实不是gz格式的 使用file otp src 20 1 tar gz可以查看它的真实数据格式 解压 tar xvf otp src
  • 7 SpringBoot整合RocketMQ发送单向消息

    发送单向消息是指producer向 broker 发送消息 执行 API 时直接返回 不等待broker 服务器的结果 这种方式主要用在不特别关心发送结果的场景 举例 日志发送 RocketMQTemplate给我们提供了sendOneWa
  • Kafka实战——简单易懂的生产者消费者demo

    单线程版本适合本地调试 多线程版本适合做压测 1 引入maven依赖
  • RocketMQ 简介

    本文根据阿里云 RocketMQ产品文档整理 地址 https help aliyun com document detail 29532 html userCode qtldtin2 简介 RocketMQ是由阿里捐赠给Apache的一款
  • 1 RocketMQ简介

    简介 RocketMQ是由阿里捐赠给Apache的一款低延迟 高并发 高可用 高可靠的分布式消息中间件 经历了淘宝双十一的洗礼 RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力 同时也具备互联网应用所需的海量消息堆积 高吞吐
  • kafka详解及集群环境搭建

    一 kafka详解 安装包下载地址 https download csdn net download weixin 45894220 87020758 1 1Kafka是什么 1 Kafka是一个开源消息系统 由Scala写成 是由Apac
  • RocketMQ系列之架构浅谈

    RMQ的架构设计 下面我从GitHub上截取了一张RMQ的源码结构图 图中我框框出来的9大模块 基本就构成了整个RMQ的内部结构 上面9大模块的依赖层次主要如下 依赖越强的越处于底层 下面介绍下最上层的4个模块 这4个模块中工具命令行就不讲
  • 5分钟学会RocketMQ

    RocketMQ 简介 RocketMQ 是一个队列模型的消息中间件 具有高性能 高可用 高实时等特性 它并不支持JMS java消息服务 规范 但参考了JMS规范和kafak等的思想 Producer Consumer 队列都可以分布式

随机推荐

  • 【华为OD机试真题2023B卷 JS】生日礼物

    华为OD2023 B卷 机试题库全覆盖 刷题指南点这里 生日礼物 知识点排列组合 时间限制 1s 空间限制 256MB 限定语言 不限 题目描述 小牛的孩子生日快要到了 他打算给孩子买蛋糕和小礼物 蛋糕和小礼物各买一个 他的预算不超过x元
  • 2023年第六届先进控制,自动化与机器人国际会议(ICACAR 2023)

    2023年第六届先进控制 自动化与机器人国际会议 ICACAR 2023 重要信息 会议网址 www icacar org 会议时间 2023年4月7 9日 召开地点 中国广州 截稿时间 2023年3月10日 录用通知 投稿后2周内 收录检
  • 结构体(声明、初始化、内存对齐、如何传参)

    结构基础知识 聚合数据类型能够同时存储超过一个的单独数据 C提供了两种类型的聚合数据类型 分别是数组和结构体 数组是相同元素的集合 它的每个元素是通过下标引用或指针间接访问的 结构体也是一些值的的集合 这些值称为它 的成员 但一个结构的成员
  • 2023年最新前端面试题汇总大全(含答案超详细,HTML,JS,CSS汇总篇)-- 持续更新

    专项练习 持续更新 HTML篇 CSS篇 JS篇 Vue篇 TypeScript篇 React篇 微信小程序篇 前端面试题汇总大全二 含答案超详细 Vue TypeScript React 微信小程序 Webpack 汇总篇 持续更新 前端
  • mybatisPlus 将已有集合进行分页(非plus自带方法)

    最想有一个比较烦的需求 想破脑袋还没有想出来 根据同一搜索字段 不同条件搜索的集合进行合并并分页 看了相关api 想了一个折中的方法 正常情况框架使用mybatisplus分页会使用IPage 但是IPage获取的集合无法合并 已知两个集合
  • Vue脚手架配置代理

    Vue脚手架中配置代理是在vue config js增加配置 1 单代理配置 devServer proxy http localhost 5001 总结 优点 配置简单 缺点 不能配置多个代理 不能灵活控制请求是否走代理 工作方式 如上配
  • 获取时间的方法(四种)

    Java 获取系统时间的四种方法 1 Date day new Date SimpleDateFormat df new SimpleDateFormat yyyy MM dd HH mm ss System out println df
  • JWT技术

    JWT 一 JWT 实现无状态 Web 服务 1 什么是有状态 有状态服务 即服务端需要记录每次会话的客户端信息 从而识别客户端身份 根据用户身份进行请求的处理 典型的设计如tomcat中的session 例如登录 用户登录后 我们把登录者
  • 算法训练 P1103

    算法训练 P1103 时间限制 1 0s 内存限制 256 0MB 编程实现两个复数的运算 设有两个复数 和 则他们的运算公式为 要求 1 定义一个结构体类型来描述复数 2 复数之间的加法 减法 乘法和除法分别用不用的函数来实现 3 必须使
  • QT多线程 信号槽收不到信息 Q_OBJECT关键

    信号和槽是Qt应用开发的基础 它可是将两个毫无关系的对象连接在一起 槽和普通的C 函数是一样的 只是当它和信号连接在一起后 当发送信号的时候 槽会自动被调用 只有加入了Q OBJECT 你才能使用QT中的signal和slot机制 如在新建
  • 一款windows的终端神奇,类似mac的iTem2

    终于找到了一款windows的终端神奇 类似mac的iTem2 来 上神器 cmder cmder是一款windows的命令行工具 就是我们的linux的终端 用起来和linux的命令一样 所以我们今天要做的是安装并配置cmder 在这里插
  • Android约束布局ConstrainLayout

    基本方向约束 layout constraintLeft toLeftOf layout constraintRight toRightOf 这两个基本上用不上因为 layout constraintStart toStartOf就相当于l
  • Linux新手入门

    1 什么是Linux系统 Linux 全称GNU Linux 是一种免费使用和自由传播的类UNIX操作系统 其内核由林纳斯 本纳第克特 托瓦兹于1991年10月5日首次发布 它主要受到Minix和Unix思想的启发 是一个基于POSIX的多
  • Verilog基本语法

    一 模块结构 二 信号类型 首先必须知道该信号的最大值 计算该信号的位宽 wire wire用于结构化器件之间物理连线的建模 在Verilog中 wire永远是wire 就是相当于一条连线 用来连接电路 不能存储数据 无驱动能力 是组合逻辑
  • 只需一个Prompt,ChatGPT秒变万能导师,轻松学习任意领域知识

    AI正在改变我们生活的方方面面 包括我们学习的方式 AI已经证明自己有能力成为我们的助手甚至是老师 帮助我们更有效地获取知识 拿ChatGPT来说 我们平时有什么问题都可以向它提问 不过想让它更高效的帮助我们 还是需要一定的调教方法的 最近
  • 云服务器被攻击了怎么解决

    随着互联网的发展 很多使用云服务器的网络工作者会发现经常会遭到一些来路不明的网络攻击 由于云服务器没有设置任何有力的防火墙 而当遭遇攻击时 常常导致服务器宕机陷入黑洞 造成业务无法开展而损失大量用户 那么使用云服务器被攻击要怎么去解决呢 第
  • QT——QTabWidget自定义背景色

    QTabWidget在自定义背景色上 tab区域和内容区域是不一样的 一 内容区域背景色的设置 这个很简单 直接在UI界面找到下图所示的地方设置background color就行 二 tab区域背景色的设置 在网上看到有的小伙伴说直接在上
  • ---Ubuntu 16.04 server 不能关机问题解决

    https serverfault com questions 712928 systemctl commands timeout when ran as root Failed to start reboot target Connect
  • 游戏引擎架构——【动画系统】阅读记录

    character animation system The big problem with the rigid hierarchy technique is that the behavior of the character s bo
  • RocketMQ系列之顺序消费

    前言 上节我们介绍了RMQ的两大亮点 重试和重复消费的问题 其实重试才能算亮点 重复消费最终还是要由我们自己来解决这个问题 RMQ自身并没有提供很好的机制 至少目前是没有 不知道将来会不会有 OK扯远了 今天呢 我们再来介绍RMQ一个不错的