RocketMQ(一)—— 基本使用

2023-10-29

目录

1、RocketMQ基本使用

1、启动

2、测试

3、关闭 

2、集群简介

 特点:

集群模式:

工作流程

3、双主双从集群搭建

关闭防火墙

环境变量配置

创建消息存储路径

broker配置文件

启动

集群监控平台搭建

4、消息发送

1、基本样例

1、消息发送

2、消费信息

2、顺序消息

3、延时消息

4、批量消息发送

5、过滤消息

6、事物消息

(1)事务消息发送及提交

(2)事务补偿

(3)事务消息状态

 使用限制

5、springboot整合


1、RocketMQ基本使用

1、启动

1.启动NameServer

# 1.启动NameServer
nohup sh bin/mqnamesrv &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
或者为
nohup sh bin/mqnamesrv > /dev/null 2>&1 &

2.启动Broker

# 1.启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 2.查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log 
nohup sh mqbroker -n localhost:9876 -c /root/rocketMQ/rocketmq-all-4.4.0-bin-release/conf/broker.conf autoCreateTopicEnable=true > /dev/null 2>&1 &

启动失败:RocketMQ默认的虚拟机内存较大,启动Broker如果因为内存不足失败,需要编辑如下两个配置文件(在bin中),修改JVM内存大小

# 编辑runbroker.sh和runserver.sh修改默认JVM大小
vi runbroker.sh
vi runserver.sh

2、测试

1.发送消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

2.接收消息

# 1.设置环境变量
export NAMESRV_ADDR=localhost:9876
# 2.接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

3、关闭 

# 1.关闭NameServer
sh bin/mqshutdown namesrv
# 2.关闭Broker
sh bin/mqshutdown broker

2、集群简介

  • Producer:消息的发送者;举例:发信者

  • Consumer:消息接收者;举例:收信者

  • Broker:暂存和传输消息;举例:邮局

  • NameServer:管理Broker;举例:各个邮局的管理机构

  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息

  • Message Queue:相当于是Topic的分区;用于并行发送和接收消息

 特点:

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步

  • Broker部署相对复杂,Broker分为Master与Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

集群模式:

(1)单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

(2)多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

(3)多Master多Slave模式(异步)

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;

  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

(4)多Master多Slave模式(同步)

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

工作流程

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

  2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

3、双主双从集群搭建

准备两个虚拟机(都要进行配置)

编辑 /etc/hosts:

# nameserver
192.168.126.131 rocketmq-nameserver1
192.168.126.132 rocketmq-nameserver2
# broker
192.168.126.131 rocketmq-master1
192.168.126.131 rocketmq-slave2
192.168.126.132 rocketmq-master2
192.168.126.132 rocketmq-slave1

 配置完成后, 重启网卡

systemctl restart network

关闭防火墙

# 关闭防火墙
systemctl stop firewalld.service 
# 查看防火墙的状态
firewall-cmd --state 
# 禁止firewall开机启动
systemctl disable firewalld.service

或者为了安全,只开放特定的端口号,RocketMQ默认使用3个端口:9876 、10911 、11011 。如果防火墙没有关闭的话,那么防火墙就必须开放这些端口:

  • nameserver 默认使用 9876 端口

  • master 默认使用 10911 端口

  • slave 默认使用11011 端口

执行以下命令:

# 开放name server默认端口
firewall-cmd --remove-port=9876/tcp --permanent
# 开放master默认端口
firewall-cmd --remove-port=10911/tcp --permanent
# 开放slave默认端口 (当前集群模式可不开启)
firewall-cmd --remove-port=11011/tcp --permanent 
# 重启防火墙
firewall-cmd --reload

环境变量配置

添加到 /etc / profile

#set rocketmq
ROCKETMQ_HOME=/root/rocketMQ/rocketmq-all-4.4.0-bin-release
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
source /etc/profile

创建消息存储路径

mkdir /root/rocketMQ/store
mkdir /root/rocketMQ/store/commitlog
mkdir /root/rocketMQ/store/consumequeue
mkdir /root/rocketMQ/store/index

slave: 

mkdir /root/rocketMQ/store2
mkdir /root/rocketMQ/store2/commitlog
mkdir /root/rocketMQ/store2/consumequeue
mkdir /root/rocketMQ/store2/index

broker配置文件

131配置 m1、s2(还要加上 brokerIP1=192.168.126.131),132配置m2、s1(还要加上 brokerIP1=192.168.126.132

添加:↓ 开启 sql过滤

enablePropertyFilter=true

 m1

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/rocketMQ/store
#commitLog 存储路径
storePathCommitLog=/root/rocketMQ/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/rocketMQ/store/consumequeue
#消息索引存储路径
storePathIndex=/root/rocketMQ/store/index
#checkpoint 文件存储路径
storeCheckpoint=/root/rocketMQ/store/checkpoint
#abort 文件存储路径
abortFile=/root/rocketMQ/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

 s2

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/rocketMQ/store2
#commitLog 存储路径
storePathCommitLog=/root/rocketMQ/store2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/rocketMQ/store2/consumequeue
#消息索引存储路径
storePathIndex=/root/rocketMQ/store2/index
#checkpoint 文件存储路径
storeCheckpoint=/root/rocketMQ/store2/checkpoint
#abort 文件存储路径
abortFile=/root/rocketMQ/store2/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

m2

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/rocketMQ/store
#commitLog 存储路径
storePathCommitLog=/root/rocketMQ/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/rocketMQ/store/consumequeue
#消息索引存储路径
storePathIndex=/root/rocketMQ/store/index
#checkpoint 文件存储路径
storeCheckpoint=/root/rocketMQ/store/checkpoint
#abort 文件存储路径
abortFile=/root/rocketMQ/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

s1

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/root/rocketMQ/store2
#commitLog 存储路径
storePathCommitLog=/root/rocketMQ/store2/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/root/rocketMQ/store2/consumequeue
#消息索引存储路径
storePathIndex=/root/rocketMQ/store2/index
#checkpoint 文件存储路径
storeCheckpoint=/root/rocketMQ/store2/checkpoint
#abort 文件存储路径
abortFile=/root/rocketMQ/store2/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

启动

分别启动 nameserver

nohup sh mqnamesrv -n "192.168.126.131:9876" &

nohup sh mqnamesrv -n "192.168.126.132:9876" &

启动broker

m1

nohup sh mqbroker -c /root/rocketMQ/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &

s2

nohup sh mqbroker -c /root/rocketMQ/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &

m2

nohup sh mqbroker -c /root/rocketMQ/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &

s1

nohup sh mqbroker -c /root/rocketMQ/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &

查看日志信息

# 查看nameServer日志
tail -500f ~/logs/rocketmqlogs/namesrv.log
# 查看broker日志
tail -500f ~/logs/rocketmqlogs/broker.log

集群监控平台搭建

需要自己对rocketmq-console进行编译打包运行。

D:\学习总结\微服务\资料-全面解剖RocketMQ和项目实战\资料\rocketmq源码\rocketmq-externals-master\rocketmq-console\src\main\resources。

rocketmq.config.namesrvAddr=192.168.126.131:9876;192.168.126.132:9876

进入pom所在目录,进行打包

mvn clean package -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar

4、消息发送

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

1、基本样例

1、消息发送

1、发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

/**
 * 发送同步消息
 */
public class SyncProducer {

    public static void main(String[] args) throws Exception {

        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.126.131:9876;192.168.126.132:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("base", "Tag1", ("Hello World" + i).getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();
            //消息id
            String msgId = result.getMsgId();
            //消息接收队列的id
            int queueId = result.getMessageQueue().getQueueId();

            System.out.printf("发送状态:%n%s,消息id:%s,队列id:%d\n",result,msgId,queueId);

            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }

        System.out.println("关闭producer");
        //6.关闭生产者producer
        producer.shutdown();
    }
}

2、发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

/**
 * 发送异步消息
 */
public class AsyncProducer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.126.131:9876;192.168.126.132:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("base", "Tag2", ("异步消息:" + i).getBytes());
            //5.发送异步消息
            producer.send(msg, new SendCallback() {
                /**
                 * 发送成功回调函数
                 * @param sendResult
                 */
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送结果:" + sendResult);
                }

                /**
                 * 发送失败回调函数
                 * @param e
                 */
                public void onException(Throwable e) {
                    System.out.println("发送异常:" + e);
                }
            });

            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }

        //6.关闭生产者producer
        producer.shutdown();
    }
}

3、单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

/**
 * 发送单向消息
 */
public class OneWayProducer {

    public static void main(String[] args) throws Exception, MQBrokerException {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.126.131:9876;192.168.126.132:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 3; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("base", "Tag3", ("Hello World,单向消息" + i).getBytes());
            //5.发送单向消息
            producer.sendOneway(msg);

            //线程睡1秒
            TimeUnit.SECONDS.sleep(5);
        }

        //6.关闭生产者producer
        producer.shutdown();
    }
}

2、消费信息

负载均衡模式(默认):多个消费者处理不同的消息

广播模式:多个消费者处理相同的消息

/**
 * 消息的接受者
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.126.131:9876;192.168.126.132:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("base", "Tag1");

        //设定消费模式:负载均衡(默认)|广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

        //4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    //System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        consumer.start();
    }
}

2、顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

public class Producer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.126.131:9876;192.168.126.132:9876");
        //3.启动producer
        producer.start();
        //构建消息集合
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        //发送消息
        for (int i = 0; i < orderSteps.size(); i++) {
            String body = orderSteps.get(i) + "";
            Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());
            /**
             * 参数一:消息对象
             * 参数二:消息队列的选择器
             * 参数三:选择队列的业务标识(订单ID)
             */
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                /**
                 *
                 * @param mqs:队列集合
                 * @param msg:消息对象
                 * @param arg:业务标识的参数
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    long orderId = (long) arg;
                    long index = orderId % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderSteps.get(i).getOrderId());

            System.out.println("发送结果:" + sendResult);
        }
        producer.shutdown();
    }

}
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        consumer.setNamesrvAddr("192.168.126.131:9876;192.168.126.132:9876");
        //3.订阅主题Topic和Tag
        consumer.subscribe("OrderTopic", "*");

        //4.注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("线程名称:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        //5.启动消费者
        consumer.start();

        System.out.println("消费者启动");

    }
}

3、延时消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("DelayTopic", "Tag1", ("Hello World" + i).getBytes());
            //设定延迟时间
            msg.setDelayTimeLevel(2);
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus status = result.getSendStatus();

            System.out.println("发送结果:" + result);

            //线程睡1秒
            TimeUnit.SECONDS.sleep(1);
        }
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

4、批量消息发送

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小<=4MB

        List<Message> msgs = new ArrayList<Message>();
        //4.创建消息对象,指定主题Topic、Tag和消息体
        /**
         * 参数一:消息主题Topic
         * 参数二:消息Tag
         * 参数三:消息内容
         */
        Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes());
        Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes());
        Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes());

        msgs.add(msg1);
        msgs.add(msg2);
        msgs.add(msg3);

        //5.发送消息
        SendResult result = producer.send(msgs);

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
   	@Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}

5、过滤消息

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

可以使用SQL表达式筛选消息

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;

  • 字符比较,比如:=,<>,IN;

  • IS NULL 或者 IS NOT NULL;

  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;

  • 字符,比如:'abc',必须用单引号包裹起来;

  • NULL,特殊的常量

  • 布尔值,TRUEFALSE

            Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes());
            //通过`putUserProperty`来设置消息的属性
            msg.putUserProperty("i", String.valueOf(i));
consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));

6、事物消息

正常事务消息的发送及提交、事务消息的补偿流程。

(1)事务消息发送及提交

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

(2)事务补偿

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

(3)事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

/**
 * 发送同步消息
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        TransactionMQProducer producer = new TransactionMQProducer("group5");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.126.131:9876;192.168.126.132:9876");

        //添加事务监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 在该方法中执行本地事务
             * @param msg
             * @param arg
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                if (StringUtils.equals("TAGA", msg.getTags())) {
                    //提交
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (StringUtils.equals("TAGB", msg.getTags())) {
                    //回滚
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                } else if (StringUtils.equals("TAGC", msg.getTags())) {
                    //不做处理
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 该方法时MQ进行消息事务状态回查——对于不做处理的消息做具体处理
             * @param msg
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                System.out.println("消息的Tag:" + msg.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        //3.启动producer
        producer.start();

        String[] tags = {"TAGA", "TAGB", "TAGC"};

        for (int i = 0; i < 3; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("TransactionTopic", tags[i], ("Hello World" + i).getBytes());
            //5.发送消息
            SendResult result = producer.sendMessageInTransaction(msg, null);
            //发送状态
            SendStatus status = result.getSendStatus();

            System.out.println("发送结果:" + result);

            //线程睡1秒
            TimeUnit.SECONDS.sleep(2);
        }

        //不用关闭生产者producer
        //producer.shutdown();
    }
}

 使用限制

  1. 事务消息不支持延时消息和批量消息。

  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。

  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。

  4. 事务性消息可能不止一次被检查或消费。

  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。

  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

5、springboot整合

生产者

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>${rocketmq-spring-boot-starter-version}</version>
    </dependency>
# application.properties
rocketmq.name-server=192.168.126.131:9876;192.168.126.132:9876
rocketmq.producer.group=my-group
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MQSpringBootApplication.class})
public class ProducerTest {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void test1(){
        rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");
    }
}

消费者:依赖、配置一致

@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("Receive message:"+message);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ(一)—— 基本使用 的相关文章

  • Memcach基础使用

    memcache 基础课程 使用场景 memcache 服务器端的安装 推荐使用memcached memcached是memchache的升级版本 sudo su apt get install memcached usr bin mem
  • Zookeeper之ZAB协议

    1 概念 Zookeeper使用 种称为Zookeeper Atomic Broadcast ZAB Zookeeper原 消息 播协议 的协议作为其数据 致性的核 算法 ZAB协议并不像Paxos算法那样 是 种通 的分布式 致性算法 它
  • kafka配置内外网访问

    listeners 学名叫监听器 其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务 advertised listeners 和 listeners 相比多了个 advertised Advertised 的
  • zookeeper学习草稿纸

    指令重排序 https baijiahao baidu com s id 1701616903992143186 wfr spider for pc JVM JDK JRE 静态方法为什么不能调用非静态成员 重载和重写的区别 可变参数 基本
  • Go内存管理及性能观测工具

    内存管理 TCMalloc Golang内存分配算法主要源自Google的TCMalloc算法 TCMalloc将内存分成三层最外层Thread Cache 中间层Central Cache 最里层Page Heap Thread Cach
  • Redis7之介绍(一)

    一 介绍 1 1 基本了解 Remote Dictionary Server 远程字典服务 是完全开源的 使用ANSIC语言编写遵守BSD协议 是一个高性能的Key Value数据库提供了丰富的数据结构 例如String Hash List
  • Redis常见命令

    命令可以查看的文档 http doc redisfans com https redis io commands 官方文档 英文 http www redis cn commands html 中文 https redis com cn c
  • redis缓存雪崩、穿透、击穿

    这篇文章我们来了解一下一些redis的高并发问题以及他的解决方法 上图是一个简单的数据查询流程 前台请求 后台先从缓存中取数据 取到直接返回结果 取不到时从数据库中取 数据库取到更新缓存 并返回结果 数据库也没取到 那直接返回空结果 一 r
  • Nacos、ZooKeeper和Dubbo的区别

    Nacos ZooKeeper和Dubbo是三个不同的分布式系统组件 它们之间有以下几点区别 功能定位 Nacos主要提供服务发现 配置管理和服务治理等功能 而ZooKeeper主要是分布式协调服务 提供了分布式锁 分布式队列等原语 Dub
  • 【Docker安装部署Kafka+Zookeeper详细教程】

    Docker安装部署Kafka Zookeeper Docker拉取镜像 Docker拉取zookeeper的镜像 docker pull zookeeper Docker拉取kafka的镜像 docker pull wurstmeiste
  • 从zookeeper官方文档系统学习zookeeper

    从zookeeper官方文档系统学习zookeeper 1 zookeeper 2 zookeeper 文档 3 zookeeper 单机版 3 1 配置 3 2 启动 3 3 验证 4 zookeeper 集群版 4 1 配置 4 2 启
  • ZooKeeper踩坑

    一 下载安装包时要下载文件名中带有bin的安装包 否则会报错 找不到或无法加载主类 org apache zookeeper server quorum QuorumPeerMain Error contacting service 这是由
  • Redis热点数据处理

    1 概念 热点数据就是访问量特别大的数据 2 热点数据引起的问题 流量集中 达到物理网卡上限 请求过多 缓存分片服务被打垮 redis作为一个单线程的结构 所有的请求到来后都会去排队 当请求量远大于自身处理能力时 后面的请求会陷入等待 超时
  • SpringCloud使用Zookeeper作为服务注册发现中心

    本篇文章主要记录SpringCloud使用Zookeeper作为服务注册发现中心 通过服务提供者和消费者为例 来真正掌握zk注册中心 目录 一 搭建服务提供者 1 创建cloud provider payment8004项目 2 修改配置
  • redis概述、不同系统安装以及redis配置文件redis.conf各参数详细介绍

    目录 redis简介 简介 特点 优势 redis与其他key value数据库的区别 redis的安装 windows linux ubuntu下安装 redis配置 redis数据类型 String 字符串 Hash 哈希 List 列
  • Redis交互速度慢,频繁处理时经常报错 RedisSystemException: RedisException: Connection closed

    Redis交互速度很慢 达到几十到一百毫秒一次 且压力测试下经常报错 org springframework data redis RedisSystemException Redis exception nested exception
  • SpringBoot中项目启动及定时任务缓存数据库常用数据至内存变量并转换后高频调用

    场景 定时任务中需要获取数据库中数据进行数据转换成需要的格式并进行后续的业务处理 数据库中的数据更新频率不高 可将数据库中数据在项目启动后读取一遍数据 然后再通过定时任务定时查询数据库更新数据 实现数据库缓存的方式有多种 比如以下 Spri
  • Redis——Redis常用命令

    Redis提供了丰富的命令 可以对数据库和各种数据类型进行操作 这些命令可以在Windows和Linux中使用 1 键值相关命令 1 1 KEYS KEYS用于返回满足pattern的所有key pattern支持以下通配符 匹配任意字符
  • 【Redis】Redis 配置文件

    1 概述 相同文件 Redis redis 配置 配置文件 redis conf 自定义目录 myredis redis conf 4 1 Units单位 配置大小单位 开头定义了一些基本的度量单位 只支持bytes 不支持bit 大小写不
  • 淘宝商品类目接口API:获取淘宝商品分类类目信息

    cat get 获得淘宝分类详情 响应参数 名称 类型 必须 示例值 描述 info Mix 0 cid 16 parent cid 0 name 其他女装 is parent true status normal sort order 0

随机推荐

  • SonrLint常见解决方案

    Sonar是什么 Sonar是一个用于代码质量管理的开源平台 用于管理源代码的质量 通过插件形式 可以支持包括java C C C PL SQL Cobol JavaScrip Groovy等等二十几种编程语言的代码质量管理与检测 Sona
  • token不存在问题

    解决 添加一个token鉴权的请求头 注意 要在在缺少的文件添加该代码 const token localStorage getItem TOKEN
  • vector subscript out of range数组下标越界错误

    在使用vector二维数组时 产生 vector subscript out of range 错误 检查之 后并没有发现数组下标越界问题 百度了一下 发现原来是数组并没有初始化 赋值 没有分配空间 所以不能采用下标的方式进行访问 解决方法
  • antd 2.2.8 版本 Form表单使用useForm带星号校验和带星号不校验的写法以及不带星号不校验的写法

    1 带星号且校验
  • 02 Elasticsearch基本常用命令详解

    IK分词器 分词 把一段中文或者词组划分成一个个关键字 我们在搜索的时候会把自己的信息进行拆分 会把数据库中或者索引库中的数据进行分词 然后进行一个个匹配操作 默认的中文分词是将每一个看成一个词 比如 我爱王军 会被拆分成 我 爱 王 军
  • SSM框架下的学生管理系统--序言

    先开个博 占个坑 也是督促自己尽快提上日程 这是为初学者写的一个系列性的文章 我会不断更新 希望看到文章的人能有所收获 简单介绍下SSM 第一个S指的是Spring 第二个S指的是SpringMVC 最后一个M指的是Mybatis 我们这个
  • c++ 入门基础(一)

    第一个c 程序 include
  • MySQL5.6数据库8小时内无请求自动断开连接

    问题 最近的项目中 发现Mysql数据库在8个小时内 没有请求时 会自动断开连接 这是MySQL服务器的问题 The last packet successfully received from the server was 1 836 1
  • 从零开始做一个贪吃蛇游戏(用纯html + css + js实现,主要技术栈:vue + uniCloud)

    今天给大家分享一个贪吃蛇的小游戏 用纯html css js实现 主要技术栈 vue uniCloud 希望能对还未涉及游戏开发的小伙伴有所启发 由于技术栈很浅 希望大佬勿喷 话不多说 直接开撸 效果图如下 初始化项目 本项目是基于uni
  • 网络基础——TCP与UDP的区别

    Web基础 COOKIE与SESSION的区别 如上表格 区别总结如下 1 连接性质不同 TCP是面向有连接 而UDP是面向无连接的 所谓的面向有连接 通俗讲是指传输数据时 是否需要先建立通讯 确认对方在 并且有空接收数据 面向无连接 是不
  • 探索loss.backward() 和optimizer.step()的关系并灵活运用

    loss backward 和optimizer step 的关系及灵活运用 在deep learning的模型训练中 我们经常看到如下的代码片段 loss backward optimizer step 那么 这两个函数到底是怎么联系在一
  • layui数据可视化_利用ggplot2进行数据可视化

    2020 04 25 1 1 first step 意识到ggplot绘制其实是由一层层图层组成 一个命令即可增加一层 ggplot data mpg geom point mapping aes x displ y hwy ggplot
  • TensorFlow 2.0教程05:跨多个节点的分布式培训

    分布式训练允许扩大深度学习任务 因此可以学习更大的models或以更快的速度进行训练 在之前的教程中 我们讨论了如何MirroredStrategy在单个节点 物理机器 内实现多GPU训练 在本教程中 我们将解释如何在多个节点之间进行分布式
  • 永磁同步电机矢量控制(五)——波形记录及其分析

    恰饭一下 已经过了工作的年纪 在这里稍微出一下自己做的一套永磁同步电机的教程 为了解决电机控制入门难的问题 我将自己从一知半解到现在的学习记录整理成十个部分学习教程 从基础的矢量控制 到应用性较强的MTPA 弱磁控制等 最后深入到无速度传感
  • [1024]python sqlalchemy中create_engine用法

    用法 engine create engine dialect driver username password host port database dialect 数据库类型 driver 数据库驱动选择 username 数据库用户名
  • 论文笔记-深度估计(7)-CNN-SLAM Real-time dense monocular SLAM with learned depth prediction

    CVPR2017 CNN SLAM Real time dense monocular SLAM with learned depth prediction 关键词 基于CNN的单张图深度估计 语义SLAM 半稠密的直接法SLAM 作者提出
  • Unity 3D游戏十一:坦克大战

    前言 中山大学数据科学与计算机学院3D游戏课程学习记录博客 游戏代码 gitee 参考师兄的博客 师兄博客 游戏视频 bilibili 游戏要求 从商店下载游戏 Kawaii Tank 或 其他坦克模型 构建 AI 对战坦克 具体要求如下
  • vue替换url中的#为指定字符串

    url符号 处理 替换 号为asqm function replaceUrlJINtoAAB console log 123366 测试全局方法 添加时间 2022 7 21 15 05 43 url符号 处理 替换 号为asqm cons
  • HTML 快速入门

    目录 概念 快速入门 语法 基本标签 1 文件标签 2 文本标签 3 图片标签 4 列表标签 5 链接标签 6 div和span 7 语义化标签 8 表格标签 案例 旅游网站首页 表单标签 form标签 表单项标签 1 input 2 se
  • RocketMQ(一)—— 基本使用

    目录 1 RocketMQ基本使用 1 启动 2 测试 3 关闭 2 集群简介 特点 集群模式 工作流程 3 双主双从集群搭建 关闭防火墙 环境变量配置 创建消息存储路径 broker配置文件 启动 集群监控平台搭建 4 消息发送 1 基本