[1100]rocketmq详解

2023-11-16

rocketmq入门

消息队列
含义

	消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,

	高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

	目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

应用场景

	(同下方rocketmq应用场景)
示例图

	a. 电商系统

		(1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)

		(2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。

		(3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。
	
	b. 日志收集系统

		(消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。)

		1.Zookeeper注册中心,提出负载均衡和地址查找服务;

		2.日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列;

		3.Kafka集群:接收,路由,存储,转发等消息处理;

image.png
image.png

rocketmq示例图

image.png

分析

	a.消息队列是一种"先进先出"的数据结构

	b.不使用队列的情况下,生产者与消费者之间是通过RPC交互的
rocketmq应用场景
  • 应用解耦
问题描述

	系统的耦合性越高,容错性就越低,以电商应用为例,用户创建订单后,
	如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障
	或者因为升级等原因暂时不可用,都会造成下单操作异常

解耦含义

	使用消息队列解耦,系统的耦合性就会下降了,比如物流系统发生故障,
	需要几分钟才能修复,在这段时间内,物流系统要处理的数据被缓存到消
	息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在
	消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故
	障

场景

	用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口

	那么会存在以下缺点

	1.假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
	2.订单系统与库存系统耦合

解决方案

	订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户下单成功

	库存系统:订单下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行
						库存操作

	假如:在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息

	队列就不再关心其他的后续操作了。实现了订单系统与库存系统的应用解耦

image.png
image.png

  • 流量削峰
问题描述

	应用系统如果遇到系统请求流量的瞬间猛增,有可能将系统压垮,有了消
	息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大
	大提高系统的稳定性

削峰含义

	一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用
	户请求,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知
	用户下单完毕,这方法虽然会耗时,但出现系统不能下单的情况

场景描述

	秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为了解决这个问题,一般需要

	在应用前端加入消息队列。这样做的好处有

	1.可以控制活动的人数

	2.可以缓解短时间内高流量压垮应用

	3.用户请求,服务器接收后,首先写入消息队列,假如消息队列长度超过最大数量,则直接

	抛弃用户请求或跳转到错误页面

	4.秒杀业务根据消息队列中的请求信息,再做后续处理

image.png

  • 数据分发
数据分发含义

	通过消息队列可以让数据在多个系统之间更加方便流通。只需要将数据发
	送到消息队列,数据使用方直接在消息队列中获取数据即可

图解

	A系统产生数据,发送到MQ
	BCD哪个系统需要,自己去MQ消费即可
	如果某个系统不需要数据,取消对MQ消息的消费即可
	新系统要数据,直接从MQ消费即可

image.png
image.png

  • 异步处理
场景描述

	用户注册后,需要发注册邮件和注册短信。传统的做法有两种
	
	1.串行方式   2. 并行方式
	
	串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务

	完成后,返回给客户端

	并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个

	任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

探究

	假设三个业务节点每个使用50毫秒,不考虑网络等其他开销,则串行方式的时间是150毫秒

	,并行的时间可能是100毫秒

	因为cpu在单位时间内处理的请求数是一定的,假设cpu1秒吞吐量是100次,则串行方式1

	秒内cpu可处理的请求量是7次(1000/150),并行方式处理的请求量是10次(1000/100)。

小结

	根据如上案例,传统的方式系统的性能,如并发量、吞吐量、响应量等会有瓶颈

解决(使用消息队列)

	按照以上约定,用户的响应时间相当于是注册消息写入数据库时间,也就是50毫秒。

	注册邮件,发送短信写入消息队列后,直接返回,因为写入消息队列的速度很快

	基本可以忽略,因此用户响应时间可能是50毫秒。因此架构改变后,系统的吞吐量

	提高到每秒20 QPS,比串行提高了3倍,比并行提高了2倍

image.png
image.png
image.png

  • 日志处理
含义

	日志处理是指将消息队列用在日志处理中,比如kafka的应用,解决大量日志传输的问题,

	架构简化如下

	日志采集客户端,负责日志数据采集,定时写入kafka队列

	kafka消息队列,负责日志数据的接收,存储和转发

	日志处理,订阅并消费kafka队列中的日志数据

image.png

搭建环境

环境安装——Linux
  • rocketmq安装

下载地址:http://rocketmq.apache.org/

安装选择:二进制包安装方式(含有bin的安装包)

安装环境:unzip 安装包名

安装环境:linux64位系统、JDK1.8(64位)、源码安装需要安装maven3.2x

RocketMQ下载及安装

image.png

RocketMQ目录结构
bin					启动脚本,包括shell脚本和cmd脚本

conf				实例配置文件,包括broker配置文件、logback配置文件等
					
lib					依赖jar包,包括netty、commons-lang、FastJSON等

image.png

RocketMQ启动及测试
  • 前提条件
内存修改

	Rocketmq默认的虚拟机内存较大,启动broker如果因为内存不足失败,需要编辑如下两个配置文件,去修改JVM内存大小
	
	vi runbroker.sh
	vi runserver.sh

	#编辑runbroker.sh和runserver.sh修改默认JVM大小
	#参考设置
	JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -xx:MetaspaceSize=128m xx:MaxMetaspaceSize=320m"

端口

	开启端口:10911 10912 10909 9876

启动位置

	bin目录

image.png
image.png

  • NameServer启动
位置

	RocketMQ目录下的bin目录

本地部署(linux)

	nohup sh mqnamesrv &								启动NameServer
	tail -f ~/logs/rocketmqlogs/namesrv.log				查看日志

外网部署

	nohup sh mqnamesrv  -n "192.168.33.100:9876" &		启动NameServer
	tail -f ~/logs/rocketmqlogs/namesrv.log				查看日志

image.png

[注]此时NameServer启动成功

  • Broker启动
位置

	RocketMQ目录下的bin目录

本地部署(linux)

	nohup sh mqbroker -n localhost:9876 &		启动Broker
	tail -f ~/logs/rocketmqlogs/broker.log		查看启动日志

外网部署

	echo 'brokerIP1=192.168.33.100' > ../conf/broker.properties
	nohup sh mqbroker -n localhost:9876 -c ../conf/broker.properties autoCreateTopicEnable=true &
	tail -f ~/logs/rocketmqlogs/broker.log		查看启动日志
  • 发送与接受消息测试(linux端)
含义

	在发送或接收消息之前,开发者需要通知客户端name servers 的位置。RocketMQ提供多种

	实现方式。为了简单起见,下方展示环境变量NAMESRV_ADDR的用法

发送消息(bin目录下)

	设置环境变量:export NAMESRV_ADDR=localhost:9876
	使用安装包的Demo发送消息: sh tools.sh org.apache.rocketmq.example.quickstart.Producer

接受消息 (bin目录下)

	设置环境变量:export NAMESRV_ADDR=localhost:9876
	接受消息:sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

	jps查看进程号

RocketMQ由如下几部分构成
Name Server
Broker
Producer
Consumer

  • RocketMQ关闭(linux端)
位置

	RocketMQ的bin目录

命令

	sh mqshutdown namesrv			关闭NameServer
	sh mqshutdown broker     		关闭Broker
  • 日志查看问题(windows端)
1.使用NotePad++工具

2.下载NPPTTP插件

3.使用插件远程连接Linux

image.png

nohup的作用
nohup命令:如果你正在运行一个进程,而且你觉得在退出帐户时或者关闭客户端该进程还不会结束,那么可以使用nohup命令。该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。在缺省情况下该作业的所有输出都被重定向到一个名为nohup.out的文件中。

简单理解:
nohup运行命令可以使命令永久的执行下去,和用户终端没有关系,例如我们断开SSH连接都不会影响他的运行,注意了nohup没有后台运行的意思;

&的作用
&是指在后台运行,但当用户退出(挂起)的时候,命令自动也跟着退出

注意
nohup COMMAND & 这样就能使命令永久的在后台执行
nohup可以使用Ctrl+C结束掉,而&使用Ctrl+C则结束不掉,nohup不受终端关闭,用户退出影响,而&则受终端关闭,用户退出影响

管理工具

mqadmin管理工具
使用方式

	进入RocketMQ安装位置,在bin目录下执行./mqadmin {admin} {args}

命令介绍

	updateTopic[创建Topic]

	类路径[com.alibaba.rocketmq.tools.command.topic.UpdateTopicSubCommand] 
	
	参数			是否必填				说明

	-b			如果-c为空,则必填		broker 地址,表示topic 建在该broker

	-c			如果-b为空,则必填		cluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)

	-h			否					打印帮助

	-n			是					nameserve 服务地址列表,格式ip:port;ip:port;...

	-r 			否					可读队列数(默认为8)

	-w			否					可写队列数(默认为8)

注意事项

	几乎所有命令都需要配置-n 表示NameServer地址,格式为ip:port;

	几乎所有命令都可以通过-h获取帮助;

	如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker地址执行

	命令;如果不配置Broker地址,则对集群中所有主机执行命令
集群监控平台搭建
含义

	RocketMQ有一个对其扩展的开源项目,incubator-rocketmq-externals,

	这个项目中有一个子模块叫rocketmq-console。这个便是管理控制台项目。

	步骤是先将incubator-rocketmq-externals从git拉到本地,然后对rocketmq-console

	进行操作(编译打包运行)

git地址

	https://github.com/SummerUnfair/rocketmq-externals.git

使用步骤

	1. 在rocketmq-console中配置namesrc集群地址

		rocketmq.config.namesrvAddr=192.168.33.100:9876

	2. 执行打包命令

		clean package -Dmaven.test.skip=true

	3.启动rokcetmq-console

		java -jar rocketmq-console-ng-1.0.0.jar

image.png

image.png

image.png

image.png

image.png

使用集群监控平台
rocketmq控制台(驾驶舱)

	下图首页即为"驾驶舱"标签下的图标

	-Broker TOP 10 :是指前10个Brokder处理消息的数量。
					比如从下图可以看出来,我只有一个Brokder,并且此Brokder处理了100条消息.
	
	-Broker 5min trend: 此图标可以筛选出某个Topic下5分钟的消息数量,可以切换时间,
						所以就相当于可以查看某个Topic下的消息数量趋势。

切换namesrvAdd(如图)

集群(如图)

 主题(如图)

	-状态,TopicTest是rocketmq系统自带的Topic,默认配置有4个队列

	-发送消息,设定:主体、tag、消息体后,即可在控制台发送消息
	
消息(如图)

image.png
image.png
image.png
image.png
image.png
image.png
image.png

rocketmq基础

1.RocketMQ架构设计图及说明

2.RocketMQ优缺点

3.常见的MQ产品宏观对比

RocketMQ架构设计图及说明
image.png

说明

RocketMQ 整体架构设计主要分为四大部分,分别是:Producer、Consumer、Broker、NameServer。

为了更贴合实际,图画的都是集群部署,像 Broker 还画了主从。

■ Producer:就是消息生产者,可以集群部署。它会先和 NameServer 集群中的随机一台建立长连接,得知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立长连接,支持多种负载平衡模式发送消息。

■ Consumer:消息消费者,也可以集群部署。它也会先和 NameServer 集群中的随机一台建立长连接,得知当前要消息的 Topic 存在哪台 Broker Master、Slave上,然后它们建立长连接,支持集群消费和广播消费消息。

■ Broker:主要负责消息的存储、查询消费,支持主从部署,一个 Master 可以对应多个 Slave,Master 支持读写,Slave 只支持读。Broker 会向集群中的每一台 NameServer 注册自己的路由信息。

■ NameServer:是一个很简单的 Topic 路由注册中心,支持 Broker 的动态注册和发现,保存 Topic 和 Borker 之间的关系。通常也是集群部署,但是各 NameServer 之间不会互相通信, 各 NameServer 都有完整的路由信息,即无状态。

再用一段话来概括它们之间的交互

先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker 启动之后会向所有 NameServer 定期(每 30s)发送心跳包,包括:IP、Port、TopicInfo,NameServer 会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相关信息,代表下线。

这样每个 NameServer 就知道集群所有 Broker 的相关信息,此时 Producer 上线从 NameServer 就可以得知它要发送的某 Topic 消息在哪个 Broker 上,和对应的 Broker (Master 角色的)建立长连接,发送消息。

Consumer 上线也可以从 NameServer 得知它所要接收的 Topic 是哪个 Broker ,和对应的 Master、Slave 建立连接,接收消息。

简单的工作流程如上所述,相信大家对整体数据流转已经有点印象了,我们再来看看每个部分的详细情况。

RocketMQ优缺点

RocketMQ起到解耦、削峰、数据分发的作用,同时也存在着系统可用性降低、系统复杂度提高、一致性问题 这三个方面缺点。

系统可用性降低 : 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响.

系统复杂度提高 : MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费、怎么处理消息丢失情况、如何保证消息传递的顺序性。

一致性问题 : A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理完成、D处理失败、如何保证消息数据处理的一致性。

常见的MQ产品宏观对比

产品 开发语言 单机吞吐量 时效性 可用性 特性
ActiveMQ java 万级 ms级 高(主从架构) ActiveMQ 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好
RabbitMQ erlang 万级 us级 高(主从架构) RabbitMQ 基于erlang开发,所以并发能力强,性能极其好,延时很低,管理界面较丰富
RocketMQ java 10万级 ms级 非常高(分布式架构) RocketMQ MQ功能比较完备,扩展性佳
Kafka scala 10万级 ms级以内 非常高(分布式架构) Kafka 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广

rocketmq消息管理

环境搭建
  • maven依赖
<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-client</artifactId>
	<version>4.3.0</version>
</dependency>
  • consume messages(消费者)
	public class Consumer {
	
	    public static void main(String[] args) throws InterruptedException, MQClientException {
	
	        // Instantiate with specified consumer group name.
	        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
	
	        // Specify name server addresses.
	        consumer.setNamesrvAddr("192.168.33.100:9876");
	
	        // Subscribe one more more topics to consume.
	        consumer.subscribe("TopicTest", "*");
	        // Register callback to execute on arrival of messages fetched from brokers.
	        consumer.registerMessageListener(new MessageListenerConcurrently() {
	
	            @Override
	            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
	                                                            ConsumeConcurrentlyContext context) {
	                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
	                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
	            }
	        });
	
	        //Launch the consumer instance.
	        consumer.start();
	
	        System.out.printf("Consumer Started.%n");
	    }
	
	}
生产者发送消息的三种方式
  • 轮廓图示
    image.png

  • 可靠同步发送

含义

	同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。

示例

	(调用DefaultMQProducer的send方法)

应用

	重要的通知消息、短信通知、短信营销系统

生产者代码(如下)

	public class SyncProducer {
	
	    public static void main(String[] args) throws Exception {
	        // Instantiate with a producer group name.
	        DefaultMQProducer producer = new
	                DefaultMQProducer("please_rename_unique_group_name");
	        // Specify name server addresses.
	        producer.setNamesrvAddr("192.168.33.100:9876");
	        // Launch the instance.
	        producer.start();
	        for (int i = 0; i < 100; i++) {
	            // Create a message instance, specifying topic, tag and message body.
	            Message msg = new Message("TopicTest" /* Topic */,
	                    "TagA" /* Tag */,
	                    ("ferao 帅").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
	            );
	            // Call send message to deliver message to one of brokers.
	            SendResult sendResult = producer.send(msg);
	            System.out.printf("%s%n", sendResult);
	        }
	        // Shut down once the producer instance is not longer in use.
	        producer.shutdown();
	    }
	}
	SendResult [sendStatus=SEND_OK, msgId=C0A8006C22E018B4AAC22DDEB59C0060, offsetMsgId=C0A8216400002A9F00000000000C0782, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=0], queueOffset=974]
	SendResult [sendStatus=SEND_OK, msgId=C0A8006C22E018B4AAC22DDEB59F0061, offsetMsgId=C0A8216400002A9F00000000000C0844, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=1], queueOffset=974]
	SendResult [sendStatus=SEND_OK, msgId=C0A8006C22E018B4AAC22DDEB5A00062, offsetMsgId=C0A8216400002A9F00000000000C0906, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=2], queueOffset=974]
	SendResult [sendStatus=SEND_OK, msgId=C0A8006C22E018B4AAC22DDEB5A20063, offsetMsgId=C0A8216400002A9F00000000000C09C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=3], queueOffset=974]

image.png

	ConsumeMessageThread_19 Receive New Messages: [MessageExt [queueId=2, storeSize=194, queueOffset=973, sysFlag=0, bornTimestamp=1594302370200, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369250, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C05FE, commitLogOffset=787966, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=974, CONSUME_START_TIME=1594302370203, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB598005E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]] 
	ConsumeMessageThread_17 Receive New Messages: [MessageExt [queueId=3, storeSize=194, queueOffset=973, sysFlag=0, bornTimestamp=1594302370202, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369252, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C06C0, commitLogOffset=788160, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=974, CONSUME_START_TIME=1594302370206, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB59A005F, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]] 
	ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=0, storeSize=194, queueOffset=974, sysFlag=0, bornTimestamp=1594302370204, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369254, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C0782, commitLogOffset=788354, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=975, CONSUME_START_TIME=1594302370207, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB59C0060, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]] 
	ConsumeMessageThread_12 Receive New Messages: [MessageExt [queueId=1, storeSize=194, queueOffset=974, sysFlag=0, bornTimestamp=1594302370207, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369257, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C0844, commitLogOffset=788548, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=975, CONSUME_START_TIME=1594302370209, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB59F0061, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]] 
	ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=2, storeSize=194, queueOffset=974, sysFlag=0, bornTimestamp=1594302370208, bornHost=/192.168.33.2:57952, storeTimestamp=1594302369258, storeHost=/192.168.33.100:10911, msgId=C0A8216400002A9F00000000000C0906, commitLogOffset=788742, bodyCRC=1284723028, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=975, CONSUME_START_TIME=1594302370211, UNIQ_KEY=C0A8006C22E018B4AAC22DDEB5A00062, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[102, 101, 114, 97, 111, 32, -27, -72, -123], transactionId='null'}]] 
  • 可靠异步发送
含义

	异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。
	MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。
	消息发送方在发送了一条消息后,不需要等待服务器响应即可返回,
	进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。

应用

	异步发送通常被用于对响应时间敏感的业务场景

示例

	public class AsyncProducer {
	    public static void main(String[] args) throws Exception {
	        //Instantiate with a producer group name.
	        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
	        //Launch the instance.
	        producer.start();
	        producer.setRetryTimesWhenSendAsyncFailed(0);
	        for (int i = 0; i < 100; i++) {
	            final int index = i;
	            //Create a message instance, specifying topic, tag and message body.
	            Message msg = new Message("TopicTest",
	                    "TagA",
	                    "OrderID188",
	                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
	            producer.send(msg, new SendCallback() {
	
	                public void onSuccess(SendResult sendResult) {
	                    System.out.printf("%-10d OK %s %n", index,
	                            sendResult.getMsgId());
	                }
	
	                public void onException(Throwable e) {
	                    System.out.printf("%-10d Exception %s %n", index, e);
	                    e.printStackTrace();
	                }
	            });
	        }
	        //Shut down once the producer instance is not longer in use.
	        producer.shutdown();
	    }
	}

image.png

  • 单向(Oneway)发送
含义

	单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,
	即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。

应用

	单向发送用于要求一定可靠性的场景,例如日志收集

示例

(调用DefaultMQProducer的sendOneway方法)

	public class OnewayProducer {
	    public static void main(String[] args) throws Exception{
	        //Instantiate with a producer group name.
	        DefaultMQProducer producer = new DefaultMQProducer("example_group_name");
	        //Launch the instance.
	        producer.start();
	        for (int i = 0; i < 100; i++) {
	            //Create a message instance, specifying topic, tag and message body.
	            Message msg = new Message("TopicTest" /* Topic */,
	                    "TagA" /* Tag */,
	                    ("Hello RocketMQ " +
	                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
	            );
	            //Call send message to deliver message to one of brokers.
	            producer.sendOneway(msg);
	
	        }
	        //Shut down once the producer instance is not longer in use.
	        producer.shutdown();
	    }
	}

image.png

msgId生成算法
含义

	当开发者用rocketmq发送消息的时候通常都会返回如下消息

	SendResult [sendStatus=SEND_OK, msgId=00000000000000000000000000000001000118B4AAC2088BEB070000, offsetMsgId=C0A8216400002A9F00000000001A3D34, messageQueue=MessageQueue [topic=TopicTest, brokerName=192.168.33.100, queueId=1], queueOffset=2025]

	对于客户端来说msgId是由客户端producer自己生成的,offsetMsgId是由broker生成的,

	其中offsetMsgId就是我们在rocketMQ控制台直接输入查询的那个messageId。

概念

	msgId

		该ID 是消息发送者在消息发送时会首先在客户端生成,全局唯一,
		在 RocketMQ 中该 ID 还有另外的一个叫法:uniqId,无不体现其全局唯一性。

	offsetMsgId

		消息偏移ID,该 ID 记录了消息所在集群的物理地址,
		主要包含所存储 Broker 服务器的地址( IP 与端口号)以及所在commitlog 文件的物理偏移量。
两者ID的生成算法

	msgId

		生成步骤

			1.初始化参数LEN,FIX_STRING,COUNTER
			2.初始化buffer
			3.设置开始时间
			4.字节转string工具方法
			5.最终生成msgId

		细节
		
			 其中createUniqId就是最终生成msgId方法。除些之外的方法者是createUniqId调
			 用或者被间接调用的方法,这些方法实现也比较简单。

			StringBuilder sb = new StringBuilder(LEN * 2);
			由此可知msgId的长度是LEN * 2 = 16 * 2 = 32;

			设time = 当前时间 - 本月开始时间(ms);
			从代码得到 FIX_STRING = ip + 进程pid + MessageClientIDSetter.class.getClassLoader().hashCode();
			createUniqIDBuffer 加入time 和 counter 因子。
			最终得到msgId的生成因子是:   ip + 进程pid + MessageClientIDSetter.class.getClassLoader().hashCode() + time + counter(AtomicInteger自增变量)
			最后调用bytes2string进行十六进制的移位和编码就产生了我们的msgId。

		分析算法

			对于每个producer实例来说ip都是唯一的,所以不同producer生成的msgId是不会重复的。
			对于producer单个实例来说的区分因子是:time + counter。
			首先应用不重启的情况下msgId是保证唯一性的,
			应用重启了只要系统的时钟不变msgId也是唯一的。
			所以只要系统的时钟不回拨我们就可以保证msgId的全局唯一。

image.png
image.png
image.png
image.png
image.png

	offsetMsgId

		生成步骤

			broker端生成的offsetMsgId就比较简单了,直接就是主机ip + 物理分区的offset,
			再调用UtilAll.bytes2string进行移位转码就完成了

image.png

rocketmq之Java Class

1.DefaultMQProducer类

2.DefaultMQPushConsumer类

DefaultMQProducer类

概述

DefaultMQProducer类是应用发送消息使用的基类,封装一些通用的方法方便开发者在更多场景中使用。属于线程安全类,在配置并启动后可在多个线程间共享此对象。
其可以通过无参构造方法快速创建一个生产者,通过getter/setter方法,调整发送者的参数。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。

方法

属性 内容
DefaultMQProducerImpl defaultMQProducerImpl; 生产者内部默认实现类
String producerGroup; Producer组名, 默认值为DEFAULT_PRODUCER。多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组。
String createTopicKey; 自动创建测试的topic名称, 默认值为TBW102;在发送消息时,自动创建服务器不存在的topic,需要指定Key。broker必须开启isAutoCreateTopicEnable
int defaultTopicQueueNums; 创建默认topic的queue数量。默认4
int sendMsgTimeout; 发送消息超时时间,默认值10000,单位毫秒
int compressMsgBodyOverHowmuch; 消息体压缩阈值,默认为4k(Consumer收到消息会自动解压缩)
int retryTimesWhenSendFailed; 同步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2
int retryTimesWhenSendAsyncFailed; 异步模式,返回发送消息失败前内部重试发送的最大次数。可能导致消息重复。默认2
boolean retryAnotherBrokerWhenNotStoreOK; 声明发送失败时,下次是否投递给其他Broker,默认false
int maxMessageSize; 最大消息大小。默认4M; 客户端限制的消息大小,超过报错,同时服务端也会限制
TraceDispatcher traceDispatcher 消息追踪器,定义了异步传输数据接口。使用rcpHook来追踪消息
DefaultMQPushConsumer类

概述

DefaultMQPushConsumer类是rocketmq客户端消费者的实现,从名字上已经可以看出其消息获取方式为broker往消费端推送数据,其内部实现了流控,消费位置上报等等。DefaultMQPushConsumer是Push消费模式下的默认配置。

方法

字段 内容
DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; 消费者实现类,所有的功能都委托给DefaultMQPushConsumerImpl来实现
String consumerGroup; 消费者组名,必须设置,参数默认值是:DEFAULT_CONSUMER (需要注意的是,多个消费者如果具有同样的组名,那么这些消费者必须只消费同一个topic)
MessageModel messageModel; 消费的方式,支持以下两种 1、集群消费 2、广播消费。BROADCASTING 广播模式,即所有的消费者可以消费同样的消息CLUSTERING 集群模式,即所有的消费者平均来消费一组消息
ConsumeFromWhere consumeFromWhere; 消费者从那个位置消费,分别为:CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 ;CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费;CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费(以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始)
AllocateMessageQueueStrategy allocateMessageQueueStrategy; 消息分配策略,用于集群模式下,消息平均分配给所有客户端;默认实现为AllocateMessageQueueAveragely
Map<String, String> subscription; topic对应的订阅tag
MessageListener messageListener; 消息监听器 ,处理消息的业务就在监听里面。目前支持的监听模式包括:MessageListenerConcurrently,对应的处理逻辑类是MessageListener messageListener ;ConsumeMessageConcurrentlyService MessageListenerOrderly 对应的处理逻辑类是ConsumeMessageOrderlyService;两者使用不同的ACK机制。RocketMQ提供了ack机制,以保证消息能够被正常消费。发送者为了保证消息肯定消费成功,只有使用方明确表示消费成功,RocketMQ才会认为消息消费成功。中途断电,抛出异常等都不会认为成功——即都会重新投递。上面两个不同的监听模式使用的ACK机制是不一样的。
OffsetStore offsetStore; offset存储实现,分为本地存储或远程存储 。集群消费:从远程Broker获取。广播消费:从本地文件获取。
DefaultMQPushConsumer类
重要字段
							
	int consumeThreadMin = 20		线程池自动调整

	int consumeThreadMax = 64		线程池自动调整

	long adjustThreadPoolNumsThreshold = 100000 

	int consumeConcurrentlyMaxSpan = 2000
									单队列并行消费最大跨度,用于流控 

	int pullThresholdForQueue = 1000
									一个queue最大消费的消息个数,用于流控 

	long pullInterval = 0			检查拉取消息的间隔时间,由于是长轮询,所以为 0,但是如果应用为了流控,
									也可以设置大于 0 的值,单位毫秒,取值范围: [0, 65535]

	consumeMessageBatchMaxSize = 1	并发消费时,一次消费消息的数量,默认为1,
									假如修改为50,此时若有100条消息,那么会创建两个线程,每个线程分配50条消息。
									换句话说,批量消费最大消息条数,取值范围: [1, 1024]。默认是1
									
	pullBatchSize = 32				消费者去broker拉取消息时,一次拉取多少条。取值范围: [1, 1024]。默认是32 。可选配置

	boolean postSubscriptionWhenPull = false 

	boolean unitMode = false 

重要方法

	 subscribe(String topic, String subExpression) 
									订阅某个topic,subExpression传*为订阅该topic所有消息 

	registerMessageListener(MessageListenerConcurrently messageListener) 
									注册消息回调,如果需要顺序消费,需要注册MessageListenerOrderly的实现 
	
	start 							启动消息消费	
Message类
含义

	Producer发送的消息定义为Message类

位置

	org.apache.rocketmq.common.message

字段定义

	如图

字段详解

	topic

		Message都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的
		消息。
		通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。

	flag

		网络通信层标记。
		
	body

		Producer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制。

	transactionId
	
		RocketMQ 4.3.0引入的事务消息相关的事务编号。

	properties

		该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。
		RocketMQ预定义了一组内置属性,除了内置属性之外,
		还可以设置任意自定义属性。当然属性的数量也是有限的,
		消息序列化之后的大小不能超过预设的最大消息大小。
		系统内置属性定义于org.apache.rocketmq.common.message.MessageConst	(如图)

		对于一些关键属性,Message类提供了一组set接口来进行设置,

		class Message {
		    public void setTags(String tags) {...}
		    public void setKeys(Collection<String> keys) {...}
		    public void setDelayTimeLevel(int level) {...}
		    public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}
		    public void setBuyerId(String buyerId) {...}
		}

		这几个set接口对应的作用分别为为,

		属性								接口				用途
		MessageConst.PROPERTY_TAGS		setTags			在消费消息时可以通过tag进行消
														息过滤判定
														
		MessageConst.PROPERTY_KEYS		setKeys			可以设置业务相关标识,用于消费
														处理判定,或消息追踪查询
														
		MessageConst.PROPERTY_DELAY_TIME_LEVEL	setDelayTimeLevel	
														消息延迟处理级别,不同级别对应不同延迟时间
														
		MessageConst.PROPERTY_WAIT_STORE_MSG_OK	setWaitStoreMsgOK	
														在同步刷盘情况下是否需要等待数
														据落地才认为消息发送成功
		
		`MessageConst.PROPERTY_BUYER_ID	setBuyerId		没有在代码中找到使用的地方,所
														以暂不明白其用处	

		这几个字段为什么用属性定义,而不是单独用一个字段进行表示?方便之处可能在于消息数据存
		盘结构早早定义,一些后期添加上的字段功能为了适应之前的存储结构,以属性形式存储在一个
		动态字段更为方便,自然兼容。

image.png
image.png

MessageExt类讲解
含义

	对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说,
	还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等
	等。在同package下可以找到与之相关的其余类定义。首先就是MessageExt,

字段

	字段							用途
	
	queueId						记录MessageQueue编号,消息会被发送到Topic下的MessageQueue

	storeSize					记录消息在Broker存盘大小

	queueOffset					记录在ConsumeQueue中的偏移

	sysFlag						记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识

	bornTimestamp				消息创建时间,在Producer发送消息时设置

	storeHost					记录存储该消息的Broker地址

	msgId						消息Id

	commitLogOffset				记录在Broker中存储便宜

	bodyCRC						消息内容CRC校验值

	reconsumeTimes				消息重试消费次数

	preparedTransactionOffset	事务详细相关字段	

注意

	Message还有一个名为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性,
	在消息发送时由Producer生成创建。
	上面的msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成
	的,其构成为(如图)

	这个MsgId是在Broker生成的,Producer在发送消息时没有该信息,Consumer在消费消息时则能
	获取到该值。RocketMQ也提供了相关命令,

	命令				实现类					描述
	
	queryMsgById	QueryMsgByIdSubCommand	根据MsgId查询消息		

image.png
image.png

rocketmq角色介绍

Producer
含义

	消息生产者,负责创建消息发送给Broker,一般由业务系统负责产生消息

	RocketMQ默认提供了DefaultMQProducer、TransactionMQProducer用于发送消息。
	
内含

	a.Producer Group:
	
		是一组Producer的名称。通常来说一个业务系统可能会奉陪一个Producer Group。

		Producer Group后续可以用于消息发送相关的各项管理监控功能。
		            			               
	b.Message:

		RocketMQ中的消息。Message必须设置Topic以及消息体,除此之外还可以配

		置一些自定义属性。只要不超过预定义的消息大小,自定义属性可以任意添加。
		
	c.Message Model:

		消息投递存在两种不同类别,
	
	d.Tag:

		Message可以设置Tag,Tag是系统预定义的属性。Message设置了Tag之后,在消费的时候

		可以根据Tag进行过滤。RocketMQ提供了几种过滤方式。可以认为Tag是Message的二级类别
Consumer
含义

	消息消费者,用于从消息队列获取消息。常用的Consumer类

内含

	a.DefaultMQPushConsumer,

		收到消息自动调用传入的处理方法来处理,实时性高。

	b.DefaultMQPullConsumer			

		用户自主控制,灵活度更高。
	
	c.Push Consumer:

		服务端向消费端推送消息

	d.Pull Consumer:
	
		消费端向服务端定时拉取消息

	e.Consumer Group :

		是一组Consumer的名称.相同Group下的Consumer需要有同样的订阅关系,且消费逻辑一致

		否则消息投递的时候可能会出现一些难以排查的问题。

		Consumer Group同样用于分配给不同的业务系统。通过管理工具可以控制Group的消费范围
						 
Broker
含义

	Broker是具体提供业务的服务器,

	解释一:RocketMQ的核心逻辑是Broker。Broker是实际用于手法消息的功能单元。从RocketMQ

		  使用者的角度来看,生产者通过接口将消息投递到Broker,消费者从Broker获取消息进行消费。

		  RocketMQ提供了推拉结合的方式用于获取消息。

	解释二:单个Broker节点与所有的NameServer节点保持长连接及心跳,

		  并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。

		  Broker中分master和slave两种角色,每个master可以对应多个slave,

		  但一个slave只能对应一个master,master和slave通过指定相同的Brokername,

		  不同的BrokerId (master为0)成为一个组。

		  master和slave之间的同步方式分为同步双写和异步复制,

		  异步复制方式master和slave之间虽然会存在少量的延迟,

		  但性能较同步双写方式要高出10%左右

		  举例:邮局 。它是RocketMQ的核心,用于接收Producer发过来的			

		  消息、以及处理Consumer的消费消息请求、消息的持久化存储、服务端过滤功能等

		  另外,Broker中还存在一些非常重要的名词需要说明:

内含

	a.Topic:

		区分消息的种类,一个发送者可以发送消息给一个或者多个Topic,一个消息的接受者可以

	     订阅一个或者多个Topic消息。对于RokectMQ而言,Topic只是一个逻辑上的概念,
	     
	     真正的消息存储其实是在Topic中的Queue中。这要设计是为了消息的顺序消费,
	     		  
	b.Message Queue:

		相当于是Topic的分区,用于并发发送和接受消息
NameServer
含义
		
	   解释一:RocketMQ没有引入第三方服务依赖,消息队列内部的服务发现以及配置更新等,都

			 借由Name Server来完成。从功能上来说,Name Server相当于一个轻量级简化版

			 的Zookeeper,或者说提供了类似ZK的功能。

			 Name Server的定位是维护RocketMQ全局相关配置,提供消息路由信息,除此之外

			 并不包含过多复杂逻辑。因为其相对轻量级,一般一组Name Server集群可以服务多

			 组Broker集群。
		
			 Name Server Cluster是多个Name Server实例的统称,Name Server之间并无关

			 联,互相也不同步信息。多个Name Server的存在是为了提供高可用服务,不同实例之

			 间的数据信息同步则实际是在数据写入的时候保证的。一份配置或消息路由信息会写入

			 所有Name Server实例中。
			
	   解释二:相当于配置中心,维护Broker集群、Broker信息、Broker存活信息、主题与

			 队列信息等。NameServer彼此之间不通信,每个Broker与集群内所有NameServer

			 保持长连接
	
通信机制

	1.Broker启动后需要完成一次将自己注册到NameServer的操作;随后每隔30秒时间定时向

		NameServer更新Topic路由信息

	2.Producer发送消息时,需要根据消息的Topic从本地缓存的获取路由信息。如果没有则

		更新路由信息,会从NameServer重新拉取,同时Producer会默认每隔30秒向NameServer

		拉取一次路由信息

	3.Consumer消费消息时,从NameServer获取的路由信息,并再完成客户端的负载均衡后,监听

		指定消息队列获取消息并进行消费

集群

集群模式
单Master模式

	这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上

	环境使用,可以用于本地测试

多Master模式

	一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点是

	优点:配置简单,单个Master宕机或重启维护对应无影响,在磁盘配置为PAID10时,即时

		机器宕机不可恢复情况下,由于PAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量


		消息,同步刷盘一条不丢),性能最高。

	缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时

		性会受到影响。

多Master多Slave模式(异步)

		每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主

		备都写成功,才向应用返回成功,这种模式优缺点如下:

		优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据

			可用性都非常高
		缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本

			在主节点宕机后,备份不能自动切换为主机
集群特点
NameServer

	是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker

	部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个slave,但是一个

Slave

	只能对应一个Master,Master与Slave的对应关系通过指定相同BrokerName,不同的

	BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个

	Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer

	Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定时从NameServer

	取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。

	Producer完全无状态,可集群部署。

	Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer

	取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、

	Slave发送心跳。Consumer既可以从Master订阅信息,也可以从Slave订阅消息,订阅规则

	由Broker配置决定。
各集群间关系

Producer集群

与nameserver的关系

	单个Producer和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,

	生产者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。

	与nameserver之间没有心跳。

与broker的关系

	单个Producer和与其关联的所有broker保持长连接,并维持心跳。

	默认情况下消息发送采用轮询方式,会均匀发到对应Topic的所有queue中。

最佳实践

	1.一个应用尽可能只使用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。

	只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。

	2.每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。

	服务器会为每个消息创建索引(哈希索引),应用可以通过 Topic,key 来查询返条消息内容,

	以及消息被谁消费。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

	3.消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。

	4.对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定

	时程序尝试重发或者人工触发重发。

	5.某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

Producer集群

与nameserver的关系

	单个Consumer和一台nameserver保持长连接,定时查询topic配置信息,如果该nameserver挂掉,

	消费者会自动连接下一个nameserver,直到有可用连接为止,并能自动重连。与nameserver之间没有心跳。

与broker的关系

	单个Consumer和与其关联的所有broker保持长连接,并维持心跳,失去心跳后,

	则关闭连接,并向该消费者分组的所有消费者发出通知,分组内消费者重新分配队列继续消费。

最佳实践

	1.Consumer 数量要小于等于queue的总数量,由于Topic下的queue会被相对均匀的分配给
	
	Consumer,如果 Consumer 超过queue的数量,那多余的 Consumer 将没有queue可以消费消息。

	2.消费过程要做到幂等(即消费端去重),RocketMQ为了保证性能并不支持严格的消息去重。

	3.尽量使用批量方式消费,RocketMQ消费端采用pull方式拉取消息,通过

	consumeMessageBatchMaxSize参数可以增加单次拉取的消息数量,可以很大程度上提高消费吞吐量。

	另外,提高消费并行度也可以通过增加Consumer处理线程的方式,对应参数

	consumeThreadMin和consumeThreadMax。

	4.消息发送成功或者失败,要打印消息日志。
补充

线上建议关闭autoCreateTopicEnable配置

该配置用于在Topic不存在时自动创建,会造成的问题是自动新建的Topic只会存在于一台broker上,

后续所有对该Topic的请求都会局限在单台broker上,造成单点压力。

broker master宕机情况是否会丢消息

broker master宕机,虽然理论上来说不能向该broker写入但slave仍然能支持消费,

但受限于rocketmq的网络连接机制,默认情况下最多需要30秒,消费者才会发现该情况,

这个时间可通过修改参数pollNameServerInteval来缩短。这个时间段内,发往该broker的请求都是失败的,

而且该broker的消息无法消费,因为此时消费者不知道该broker已经挂掉。

直到消费者得到master宕机通知后,才会转向slave进行消费,但是slave不能保证master的消息100%都同步过来了,

因此会有少量的消息丢失。但是消息最终不会丢,一旦master恢复,未同步过去的消息仍然会被消费掉。

总结

掌握rocketmq流程
基础部分

	mq的介绍
	
		作用
		注意事项
		各mq产品比较

	rocketMQ环境搭建

		安装rocketmq
		启动rocketmq
		测试rocketmq
		关闭rocketmq

	rocketmq高可用集群搭建

		集群各个角色介绍
		集群搭建方式
		双主双从集群搭建
		集群监控平台

	各个消息发送样例

		同步消息
		异步消息
		单向消息
		顺序消息
		批量消息
		过滤消息
		事务消息

项目实战

	项目背景
	功能分析
	项目环境搭建
	下单功能,保证各服务的数据一致性
	确认订单功能,通过消息进行数据分发
	整体联调

高级功能和源码分析

	高级功能
	
		消息的存储和发送
		消息存储结构
		刷盘机制
		消息的同步复制和异步复制

		负载均衡
	
			Producer负载均衡
			Consumer负载均衡

	源码分析

		路由中心NameServer

			NameServer架构设计
			NameServer启动流程
			NameServer路由注册和故障剔除

		消息生产者Producer

			生产者启动流程
			生产者发送消息流程
			批量发送

		消息存储

			消息存储流程
			存储文件与内存映射
			存储文件
			实时更新消息消费队列和存储文件
			消息队列与索引文件恢复
			刷盘机制
			过期文件删除机制

		消息消费Consumer

			消费者启动流程
			消息拉取
			消息队列负载均衡和重新分布机制
			消息消费过程
			定时消息机制
			顺序消费

python实现的消息队列RocketMQ客户端使用

rocketmq-python 是一个基于 rocketmq-client-cpp 封装的 RocketMQ Python 客户端。

一、Producer

#coding:utf-8import json

from rocketmq.client import Producer, Message

producer = Producer('PID-001')  # 实例化Producer对象,指定group-id(可任意取名)
producer.set_namesrv_addr('xxxxxx:xx') # rocketmq队列接口地址(服务器ip:port)
producer.start() # 开启

# 实例化消息对象,需要指定应用名:topic_name
msg = Message('your_topic_name') # 实例化消息对象时,传入topic名称,一个应用尽可能用一个Topic

# 指定消息的keys
msg.set_keys('your_keys') # 业务层面的唯一标识码,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。

# 指定消息tags
msg.set_tags('your_tags') # 消息子类型用tags来标识,tags可以由应用自由设置。

#指定消息体(内容)
msg_body = {'name':'laowang','age':28}
body = json.dumps(msg_body).encode('utf-8')
msg.set_body(body) # 传入消息体(json字节串)

# 向队列发送消息
ret = producer.send_sync(msg)
print(f'status:{ret.status}') # 0表示OK
print(f'msg_id:{ret.msg_id}') # 消息id,同消费者获取到的消息id
print(f'offset:{ret.offset}') # 偏移量,默认从0开始,1,2。。。

producer.shutdown() # 关闭

二、PullConsumer

# coding:utf-8

from rocketmq.client import  PullConsumer

consumer = PullConsumer('CID-001') # 指定group-id
consumer.set_namesrv_addr('xxxxxx:xx') # rocketmq队列接口地址(服务器ip:port)
consumer.start() # 开启

# 可重复性消费
# 指定topic-name
for msg in consumer.pull('your_topic_name'):
    print(f'id:{msg.id}') # 消息id
    print(f'topic:{msg.topic}') # 消息topic_name
    print(f'tags:{msg.tags}') # 消息tags
    print(f'keys:{msg.keys}') # 消息Keys
    print(f'body:{msg.body}') # 消息体
    print('-'*25+'分隔符'+'-'*25)
    
consumer.shutdown() # 关闭

三、PushConsumer

# coding:utf-8
import time

from rocketmq.client import PushConsumer

# 回调函数,参数是消息对象
def callback(msg):
    print(msg.id, msg.body)


consumer = PushConsumer('CID_XXX') # 指定group-id
consumer.set_namesrv_addr('127.0.0.1:9887') # rocketmq队列接口地址(服务器ip:port)
consumer.subscribe('Your_topic', callback) # 订阅
consumer.start() # 开启

while True:
    time.sleep(3600)

consumer.shutdown() # 关闭

rocketmq官网:https://rocketmq.apache.org/
github:https://github.com/apache/rocketmq

来源:
rocketmq详解:https://blog.csdn.net/qq_21561501/article/details/105684989
python实现的消息队列RocketMQ客户端使用:https://www.cnblogs.com/eliwang/p/15698648.html

其它阅读:
Rocketmq原理&复杂分布式事务解法:https://www.jianshu.com/p/2838890f3284
RocketMQ实战:https://www.jianshu.com/p/3afd610a8f7d
RocketMQ入门教程:https://www.cnblogs.com/ryelqy/p/14317528.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

[1100]rocketmq详解 的相关文章

  • Weblogic XMLDecoder 反序列化漏洞原理与漏洞复现(基于vulhub,保姆级的详细教程)

    漏洞原理 本文介绍了Weblogic XMLDecoder反序列化相关的漏洞原理 并以CVE 2017 10271为例在vulhub上进行了复现 有关Weblogic XMLDecoder反序列化漏洞包括CVE 2017 3506 CVE
  • springcloud之seata在微服务模块全局异常捕捉后导致事务不会滚优雅方案解决

    全网独一份 原创第一 更多好文 请关注下方微信公众号 方案一 这个是全局异常在一个方法中进行处理的 下面是大家习惯了的异常处理习惯 代码如下 原理都一样 描述 全局异处理器 br 时间 2020 06 07 16 32 br 作者 IT学习
  • 常用中间件控制台默认登陆

    以下用本地127 0 0 1作为ip前缀方便本地直接点击跳转使用 实际部署时替换该ip即可 中间件名称 访问地址端口 默认账号密码 rabbitmq http 127 0 0 1 15672 guest guest activemq htt
  • 【Docker】docker基础使用

    文章目录 docker概念 什么是docker docker引擎迭代 docker与虚拟机 docker版本发展 docker基础 docker架构 docker Registry 镜像仓库 镜像仓库使用流程 实际研发镜像仓库使用 不同镜像
  • [1228]Python prometheus-client使用方式

    文章目录 安装 prometheus client 基本使用介绍 应用实例 收集 CPU 使用率指标 收集自定义指标 Python封装 调用 github https github com prometheus client python
  • RocketMQ-名词和架构

    RocketMQ rocketMQ是做什么的我就不用解释了吧 以及他的背景 本文主要是为了让大家明白RocketMQ的工作原理 架构图 上图 双箭头代表是双向通信 ProducerGroup和ConsumerGroup以及Broker集群
  • es 搜索推荐:Suggest

    搜索推荐 Suggest 概述 搜索一般都会要求具有 搜索推荐 或者叫 搜索补全 的功能 即在用户输入搜索的过程中 进行自动补全或者纠错 以此来提高搜索文档的匹配精准度 进而提升用户的搜索体验 这就是Suggest 四种Suggester
  • Node利用connect中间件 及bodyParser处理文件上传

    1 html
  • 【使用 flink-cdc 将数据从 mysql 同时同步到 redis, elastisearch, clickhouse】

    要从 MySQL 同时同步到 Redis Elasticsearch 和 Clickhouse 可以使用 Flink CDC 和 Flink Table API 来实现 首先 需要在 Flink 中配置 CDC 数据源 使其能够连接到 My
  • redis安装+主从+哨兵模式和坑。

    一 安装 版本 3 2 11 1 解压 2 进入解压后的目录 执行 make 3 执行 make install 这一步需要root权限的用户执行 注 不使用root用户安装时 在make install 后添加PREFIX usr loc
  • 为什么要使用MQ消息中间件?这几个问题必须拿下!

    V xin ruyuanhadeng获得600 页原创精品文章汇总PDF 这篇文章开始 我们把消息中间件这块高频的面试题给大家说一下 也会涵盖一些MQ中间件常见的技术问题 假如面试官看你简历里写了MQ中间件的使用经验 很可能会有如下问题 你
  • 线上一次JVM FullGC搞得整晚都没睡,彻底崩溃~

    V xin ruyuanhadeng获得600 页原创精品文章汇总PDF 这篇文章给大家聊一次线上生产系统事故的解决经历 其背后代表的是线上生产系统的JVM FullGC可能引发的严重故障 一 业务场景介绍 先简单说说线上生产系统的一个背景
  • [1100]rocketmq详解

    文章目录 rocketmq入门 消息队列 rocketmq示例图 rocketmq应用场景 搭建环境 环境安装 Linux RocketMQ下载及安装 RocketMQ目录结构 RocketMQ启动及测试 管理工具 mqadmin管理工具
  • Nacos启动出现Error creating bean with name ‘memoryMonitor‘ 、‘externalDumpService‘

    目录 问题 解决方法 这里是CS大白话专场 让枯燥的学习变得有趣 没有对象不要怕 我们new一个出来 每天对ta说不尽情话 好记性不如烂键盘 自己总结不如收藏别人 问题 用KubeSphere创建Nacos时出现Error creating
  • Redis基础知识(三):缓存穿透、缓存击穿、缓存雪崩

    文章目录 一 缓存穿透 出现过程 解决方法 二 缓存击穿 出现过程 解决方法 三 缓存雪崩 出现过程 解决方法 我们在项目中大量使用Redis承接海量数据的冲击 但是使用过程中也会遇到一些特殊的情况 这个就是缓存击穿 缓存穿透 缓存雪崩 一
  • rabbitMQ初识

    消息队列 RabbitMQ 认识MQ 同步和异步通讯 微服务间通讯有同步和异步两种方式 同步通讯 就像打电话 需要实时响应 异步通讯 就像发邮件 不需要马上回复 同步通讯 同步调用的优点 时效性较强 可以立即得到结果 同步调用的问题 耦合度
  • C语言static和extern关键字

    1 static static修饰的变量 自始至终只有一块空间 当前文件才能使用 生命周期是从定义开始直到程序结束 全局变量的定义是在 c文件中 但是声明全局变量是在 h中 static 静态型 用它定义的变量自动初始化为0值或空值 常用于
  • 深聊性能测试,从入门到放弃之:我只做了这几点,公司的架构师也对我刮目相看

    1 引言 2 执行步骤 2 1 测试确认 2 2 通过标准 2 3 测试设计 2 4 数据准备 2 5 处理问题 3 总结 1 引言 接着上一篇 深聊性能测试 从入门到放弃之 性能测试如何做 这篇我们看看 到底做到那几点 架构师也对我刮目相
  • thinkphp6 入门(6)--中间件是什么 怎么用

    一 什么是中间件 当客户端发送请求至服务器时 HTTP请求会经过多个中间件 最后返回响应给客户端 中间件可以 在请求到达目标控制器或动作之前对请求进行操作 可以在响应离开目标控制器或动作之前对响应进行操作 二 中间件的作用 我们可以在不修改
  • 分布式 dataX 详细 (落地) 设计

    1 背景 分布式 DataX 基于 datax 打造的语义分分布式 ETL 平台 Datax 提供 reader framework writer 框架 方便开发两种异构数据源数据同步 但开源的 datax 缺少分布式特性 本文介绍基于 e

随机推荐