RocketMQ学习笔记 - 顺序消息

2023-11-01

1、定义

顺序消息(FIFO 消息):是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。
顺序消息由两个部分组成:顺序发布和顺序消费。

顺序消息包含两种类型:

  • 分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费
  • 全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费

2、代码示例

2.1、消息实体

package com.wzl.rocketmq.order;

import java.util.ArrayList;
import java.util.List;

/**
 * 消息体
 */
public class OrderStep {

    private long orderId;
    private String desc;

    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    public static List<OrderStep> buildOrders() {
        List<OrderStep> list = new ArrayList<>();

        OrderStep orderStep = new OrderStep();
        orderStep.setOrderId(101L);
        orderStep.setDesc("创建");
        list.add(orderStep);


        orderStep = new OrderStep();
        orderStep.setOrderId(101L);
        orderStep.setDesc("付款");
        list.add(orderStep);


        orderStep = new OrderStep();
        orderStep.setOrderId(102L);
        orderStep.setDesc("创建");
        list.add(orderStep);


        orderStep = new OrderStep();
        orderStep.setOrderId(101L);
        orderStep.setDesc("发货");
        list.add(orderStep);

        orderStep = new OrderStep();
        orderStep.setOrderId(102L);
        orderStep.setDesc("发货");
        list.add(orderStep);

        orderStep = new OrderStep();
        orderStep.setOrderId(101L);
        orderStep.setDesc("结束");
        list.add(orderStep);

        orderStep = new OrderStep();
        orderStep.setOrderId(102L);
        orderStep.setDesc("结束");
        list.add(orderStep);

        return list;
    }

    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }
}

2.2、生产者

package com.wzl.rocketmq.order;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * 顺序消息--生产者
 */
public class OrderProducer {

    public static void sendTagAMessage() throws MQClientException {
        //定义producer
        DefaultMQProducer producer = new DefaultMQProducer("order_group101");

        //要设置NameServer地址,多个;分割
        producer.setNamesrvAddr("192.168.12.121:9876");

        //启动
        producer.start();

        //发送消息
        for (int i = 0; i < OrderStep.buildOrders().size(); i++) {
            try {
                OrderStep orderStep = OrderStep.buildOrders().get(i);
                String body = orderStep + "";

                Message msg = new Message("OrderTopic", "TagOrder", "i" + i, body.getBytes());

                SendResult sendResult = producer.send(msg, (mqs, message, args) -> {
                    long orderId = (long) args;
                    long index = orderId % mqs.size();//订单号和queue数量进行取模
                    return mqs.get((int) index);
                }, orderStep.getOrderId());

                System.out.println("OrderProducer发送消息Order:" + sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }

        }

        //关闭生产者
        producer.shutdown();
    }

    public static void main(String[] args) throws MQClientException {
        //发送tagA消息
        sendTagAMessage();
    }

}

2.3、消费者

package com.wzl.rocketmq.order;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;


/**
 * 顺序消息--消费者
 */
public class OrderConsumer {

    public static void main(String[] args) throws MQClientException {
        //声明并初始化一个consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer101");

        //要设置NameServer地址,多个;分割
        consumer.setNamesrvAddr("192.168.12.121:9876");

        //设置consumer所订阅的Topic和Tag,*代表全部的Tag
        consumer.subscribe("OrderTopic", "TagOrder");

        //集群订阅
        consumer.setMessageModel(MessageModel.CLUSTERING);

        //这里设置的是一个consumer的消费策略
        //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //设置一个Listener,主要进行消息的逻辑处理
        consumer.registerMessageListener((MessageListenerOrderly) (list, context) -> {
            for (MessageExt messageExt : list) {
                String messageBody = new String(messageExt.getBody());
                System.out.println("线程名称: " + Thread.currentThread().getName() + " 消费消息: " + messageBody);//输出消息内容
            }

            //返回消费状态
            return ConsumeOrderlyStatus.SUCCESS;
        });

        //调用start()方法启动consumer
        consumer.start();

        System.out.println("OrderConsumer Started.");
    }
}

2.3、测试结果

生产者发送的消息都是刻意被打乱的
为了方便观察,我们启动2个消费者

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

结果发现:由于具有相同的OrderId取模在同一个queue中,消费也是不同订单号被不同的消费者有序消费的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ学习笔记 - 顺序消息 的相关文章

随机推荐

  • 霍夫投票直观理解

    霍夫投票法最典型的示例是二维图像中霍夫直线检测 过二维平面中的定点可以得到无数条直线 如果以顶点参数为做一条直线 即将这些直线变换到参数空间 那么这个定点对应参数空间中的一条直线 如果在参数空间中有两条直线相交于同一个点 那么说明对应的两个
  • Unity排行榜功能(使用MVC框架)

    要求 实现如图所示UI界面 按下某一按键实现排行榜的更新 M 数据模型类 主要负责数据 玩家的数据 public class RankModel 定义三个属性 姓名 分数 排名 定义构造函数 public string Name set g
  • 基于arduino下的巡线小车

    先发代码再进行解释吧 include
  • 《Head First HTML5 javascript》第10章 自定义对象

    2022 11 23 第10章 自定义对象 面向对象OOP Object Oriented Programming 对象是一个包含相关数据和方法的集合 通常由一些变量和函数组成 我们称之为对象里面的属性和方法 对象是存储在单个分组中的相关功
  • Git统计个人提交代码行数

    1 Git统计个人提交代码行数 git log format aN sort u while read name do echo en name t git log author name pretty tformat numstat aw
  • Windows系统文件快捷链接(软链接/硬链接/符号链接)mklink

    记录学习mklink 将大文件从c盘移出 快捷方式 软链接 硬链接 符号链接 快捷方式 常见文件链接方式 在Windows上以 lnk结尾的文件 这类文件通常用于指定某一个文件或某一个目录的位置 可扩展性很强 桌面快捷方式就是这类了 软链接
  • Ubuntu Openvino(YOLOV5)加载训练好的模型 xml onnx bin

    使用 Openvino 对自己训练好的模型yolov5 pt gt IR 进行推理 环境需求 Ubuntu 系统 20 04 vscode 编辑器 python3 Openvino 环境 priority 关键为 export 导出的修改
  • Libvirt简介

    Libvirt 是一个函数库 包含实现Linux虚拟化功能的linux API 提供了管理虚拟机的稳定的 统一的接口 其主要包括Libvirt API Libvirtd进程和virsh工具集三个部分 架构说明 用户程序 程序 virsh工具
  • Flutter基础(手势检测GestureDetector)-二

    import package flutter material dart void main runApp new MaterialApp title flutter质感设计 home new MyButton class MyButton
  • 核工业物理研究院九院三所

    1 流体物理研究所 一所 高能量密度流体力学过程 爆轰压缩 凝聚态 辐射流体力学等 可以说包括了核武器设计的基础理论部分和核试验 亚临界试验等的试验数据采集分析 2 核物理与化学研究所 二所 核物理 放射化学 核过程等 就是核反应堆技术 核
  • mongo 常用的命令

    一个mongod服务可以有建立多个数据库 每个数据库可以有多张表 这里的表名叫collection 每个collection可以存放多个文档 document 每个文档都以BSON binary json 的形式存放于硬盘中 因此可以存储比
  • 虚拟网络无法连接本机网络(Xshell无法连接虚拟机)

    Xshell无法连接虚拟机 类似于 Could not connect to 192 168 0 128 port 22 Connection failed 在很多博客中也看到过许多解决方法 例如 安装sshd的客户端 服务端 给sshd的
  • 华为mate30老是显示无法连接服务器,华为Mate30 Pro手机微信信息老是发不出,提示无法连接到网络...

    华为Mate30 Pro手机的微信信息老是发不出 并且在接收微信消息的有延迟 信号满格 时常提示无法连接到网络 微信时常半天发不出 发个东西转半天 接收也经常延迟几分钟才收到提醒 这是什么鬼手机啊 别人发来消息 一来就是好多条 还有十来分钟
  • priorityQueue优先级队列 (python、c++)

    优先级队列 优先级队列 python C 最近用优先队列写了一个SNIC超像素分割的工程 有兴趣的可以下载看看 VIP大佬让我赚一点下载积分吧 感激不尽 https download csdn net download koffee f 1
  • 2023天一永安杯部分wp

    web Query 布尔盲注 import requests import string dictionary string digits string ascii letters url http cd5a2660b462c867 nod
  • linux 开启curl命令,如何启用curl命令HTTP2支持

    当我们直接使用 curl 去请求一个 https 页面时 默认可以看到其默认返回的是 HTTP1 1 的 response 现在使用 HTTP2 的网站越来越多 技术也越来越成熟 如何启用 curl 命令 HTTP 2 支持就成为了一个问题
  • 洛谷 B2043 判断能否被 3,5,7 整除 题解

    这一道题很简单 首先 要输入一个数x 之后将判断x分别是否能整除3 5 7 为了进行依次的判断 我们要使用for循环 for int i 3 i lt 7 i 2 i 3表示从3开始循环 之后i lt 7循环到7结束 由于整除的数为3 5
  • 安装tensorflow过程中的报错

    安装tensorflow后 测试安装是否成功 结果报错 SystemError
  • 数组对象与字符串对象

    1 数组对象 1 1什么是数组对象 javaScrip中的数组对象可以使用new Array或字面量 来创建 在创建以后 就可以调用数组对象提供的一些方法来实现对数组额度操作了 数组对象用于在单个变量中存储多个值 JavaScript的数组
  • RocketMQ学习笔记 - 顺序消息

    文章目录 1 定义 2 代码示例 2 1 消息实体 2 2 生产者 2 3 消费者 2 3 测试结果 1 定义 顺序消息 FIFO 消息 是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型 顺序消息由两个部分组成 顺序发布和顺序消费