rocketMq介绍和安装

2023-11-17

rocketMq介绍和安装

Mq介绍

MQ:MessageQueue,消息队列。 队列,是一种FIFO 先进先出的数据结构。消息由生产者发送到MQ进行排队,然后按原来的顺序交由消息的消费者进行处理。

QQ和微信就是典型的MQ。

MQ的作用

主要有以下三个方面:

异步

例子:快递员发快递,直接到客户家效率会很低。引入菜鸟驿站后,快递员只需

要把快递放到菜鸟驿站,就可以继续发其他快递去了。客户再按自己的时间安排

去菜鸟驿站取快递。

作用:异步能提高系统的响应速度、吞吐量。

解耦

例子:《Thinking in JAVA》很经典,但是都是英文,我们看不懂,所以需要编

辑社,将文章翻译成其他语言,这样就可以完成英语与其他语言的交流。作用:

1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以

及可扩展性。

2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者

多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

削峰

例子:长江每年都会涨水,但是下游出水口的速度是基本稳定的,所以会涨水。

引入三峡大坝后,可以把水储存起来,下游慢慢排水。

作用:以稳定的系统资源应对突发的流量冲击。

MQ的缺点

上面MQ的所用也就是使用MQ的优点。 但是引入MQ也是有他的缺点的:

系统可用性降低

系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影

响。这就需要考虑如何保证MQ的高可用。

系统复杂度提高

引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入

MQ后,会变为异步调用,数据的链路就会变得更复杂。并且还会带来其他一些问

题。比如:如何保证消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问

题。

消息一致性问题

A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统

处理成功,C系统处理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。

几大MQ产品特点比较

常用的MQ产品包括Kafka、RabbitMQ和RocketMQ。我们对这三个产品做下简单的比较,重点需要理解他们的适用场景。

这里有rocketMq官网的对比链接:https://rocketmq.apache.org/zh/docs/#rocketmq-vs-activemq-vs-kafka

优点 缺点 使用场景
kafka kafka吞吐量非常大,性能非常好。集群高可用。 会丢数据,功能比较单一。 日志分析。
rabbitMq 大数据采集RabbitMQ消息可靠性高,功能全面。 吞吐量比较低,消息积累会影响性能,erlang语言不好定制。 小规模场景
rocketMq RocketMQ高吞吐,高性能,高可用。功能全面。 开源版功能不如云上版,官方文档比较简单 几乎全场景

rocketMq介绍

官网地址:https://rocketmq.apache.org/zh/

github : https://github.com/apache/rocketmq https://github.com/apache/rocketmq-externals

NameServer:http://www.tianshouzhi.com/api/tutorials/rocketmq/408

RocketMQ是阿里巴巴开源的一个消息中间件,基于java开发,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。目前RocketMQ在阿里云上有一个购买即可用的商业版本,商业版本集成了阿里内部一些更深层次的功能及运维定制。我们这里学习的是Apache的开源版本。开源版本相对于阿里云上的商业版本,功能上略有缺失,但是大体上功能是一样的。

1.1、RocketMQ的发展历程
早期阿里使用ActiveMQ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是,阿里开始关注Kafka。但是Kafka是针对日志收集场景设计的,他的并发性能并不是很理想。尤其当他的Topic过多时,由于Partition文件也会过多,会严重影响IO性能。于是阿里才决定自研中间件,最早叫做MetaQ,后来改名成为RocketMQ。最早他所希望解决的最大问题就是多Topic下的IO性能压力。但是产品在阿里内部的不断改进,RocketMQ开始体现出一些不一样的优势。

1.2、RocketMQ产品特点比较
RocketMQ的消息吞吐量虽然依然不如Kafka,但是却比RabbitMQ高很多。在阿里内部,RocketMQ集群每天处理的请求数超过5万亿次,支持的核心应用超过3000个。RocketMQ天生就为金融互联网而生,因此他的消息可靠性相比Kafka也有了很大的提升,而消息吞吐量相比RabbitMQ也有很大的提升。另外,RocketMQ的高级功能也越来越全面,广播消费、延迟队列、死信队列等等高级功能一应俱全,甚至某些业务功能比如事务消息,已经呈现出领先潮流的趋势。RocketMQ的源码是用Java开发的,这也使得很多互联网公司可以根据自己的业务需求做深度定制。而RocketMQ经过阿里双十一多次考验,源码的稳定性是值得信赖的,这使得功能定制有一个非常高的起点。 传统意义上,RocketMQ有一个比较大的局限,就是他的客户端只支持Java语言。但RocketMQ作为一个开源软件,自身产品不断成熟的同时,周边的技术生态也需要不断演进。RocketMQ成为Apache顶级项目后,又继续通过社区开发出了很多与主流技术生态融合的周边产品。例如在RocketMQ的社区,也正在开发GO,Python,Nodejs等语言的客户端。\

rocketMq重要变化

官方文档:https://rocketmq.apache.org/zh/docs/deploymentOperations/15deploy

RocketMQ 5.0 引入了全新的弹性无状态代理模式,将当前的Broker职责进行拆分,对于客户端协议适配、权限管理、消费管理等计算逻辑进行抽离,独立无状态的代理角色提供服务,Broker则继续专注于存储能力的持续优化。这套模式可以更好地实现在云环境的资源弹性调度。 值得注意的是RocketMQ 5.0的全新模式是和4.0的极简架构模式相容相通的,5.0的代理架构完全可以以Local模式运行,实现与4.0架构完全一致的效果。开发者可以根据自身的业务场景自由选择架构部署。

部署的架构介绍

RocketMQ由以下这几个组件组成

  • NameServer : 提供轻量级的Broker路由服务,可以理解为为rocketMq单独部署的注册中心,每个NameServer 之间无数据交互,相互独立,都存放有全部的路由信息(broker信息等)。
  • Broker:实际处理消息存储、转发等服务的核心组件。
  • Producer:消息生产者集群。通常是业务系统中的一个功能模块。
  • Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。

所以需要在服务器上部署NameServer 和Broker两个服务

master/slave

同步数据说明 :https://developer.aliyun.com/article/666458

slave->master同步:主要是指slave会定期向master发起同步数据请求,master向slave返回数据。

master->slave同步:在设置为同步双写的时候,master每写入一条消息都会同步到slave当中。

rocketMq安装(单组节点)

单组节点单副本模式:https://rocketmq.apache.org/zh/docs/quickStart/02quickstart/

下载界面地址 :https://rocketmq.apache.org/zh/download#rocketmq

官方介绍:https://rocketmq.apache.org/zh/docs/4.x/introduction/03whatis#%E5%90%8D%E5%AD%97%E6%9C%8D%E5%8A%A1%E5%99%A8-nameserver

环境准备和安装包下载

前置

请安装jdk1.8+版本

1、上传安装包、解压

unzip rocketmq-all-5.0.0-bin-release
mv rocketmq-all-5.0.0-bin-release rocketmq

2、

我们需要创建一个操作用户用来运行自己的程序,与root用户区分开。使用root用户创建一个oper用户,并给他创建一个工作目录。(仅做记录)

[root@worker1 ~]# useradd oper
[root@worker1 ~]# passwd oper
设置用户密码
[root@worker1 ~]# chown oper:oper /usr/local/app
# 切换用户
su oper

3、添加相关环境变量

系统环境变量修改:https://blog.csdn.net/fj_changing/article/details/116407529

jdk和rocketmq的全部环境变量配置,

# 编辑
vim /etc/profile

export JAVA_HOME=/usr/local/app/java
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export  PATH=${JAVA_HOME}/bin:$PATH

export ROCKETMQ_HOME=/usr/local/app/rocketmq
PATH=$ROCKETMQ_HOME/bin:$PATH
export PATH

# 生效配置
source  /etc/profile
# 其中多次执行号造成path有重复,可以 echo $PATH ,
export PATH=/usr/local/app/rocketmq/bin:/usr/local/app/java/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin


这个ROCKETMQ_HOME的环境变量是必须要单独配置的,如果不配置的话,启动NameSever和Broker都会报错。
这个环境变量的作用是用来加载$ROCKETMQ_HOME/conf下的除broker.conf以外的几个配置文件。所以实际情况中,可以不按这个配置,但是一定要能找到配置文件。

NameServer启动

启动NameServer非常简单, 在$ROCKETMQ_HOME/bin目录下有个mqadminsrv。直接执行这个脚本就可以启动RocketMQ的NameServer服务。

但是要注意,RocketMQ默认预设的JVM内存是4G,这是RocketMQ给我们的最
佳配置。但是通常我们用虚拟机的话都是不够4G内存的,所以需要调整下JVM内存大小。修改的方式是直接修改runserver.sh。 用vi runserver.sh编辑这个脚本,在脚本中找到这一行调整内存大小为512M,当然如果是java8以上版本,修改下图中else部分。

-server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m

然后我们用静默启动的方式启动NameServer服务:启动完成后,在nohup.out里看到这一条关键日志就是启动成功了。并且使用jps
指令可以看到有一个NamesrvStartup进程。

### 启动namesrv 建议增加>/dev/null 2>&1 
$ nohup sh bin/mqnamesrv &
 
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

​ 关键日志标识启动成功

 INFO main - The Name Server boot success. serializeType=JSON

Broker服务搭建

启动Broker的脚本是runbroker.sh。Broker的默认预设内存是8G,启动前,如果内存不够,同样需要调整下JVM内存。vi runbroker.sh,找到这一行,进行内存调整

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"

然后我们需要找到$ROCKETMQ_HOME/conf/broker.conf, vi指令进行编辑,
在最下面加入一个配置:

autoCreateTopicEnable=true

然后也以静默启动的方式启动runbroker.sh

### 先启动broker 建议增加>/dev/null 2>&1 
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/broker_default.log 
The broker[broker-a,192.169.1.2:10911] boot success...

启动完成后,同样是检查nohup.out日志,有这一条关键日志就标识启动成功了。 并且jps指令可以看到一个BrokerStartup进程。在观察runserver.sh和runbroker.sh时,我们还可以查看到其他的JVM执行参数,这些参数都可以进行定制。例如我们观察到一个比较有意思的地方,nameServer使用的是CMS垃圾回收器,而Broker使用的是G1
垃圾回收器。

测试消息收发

在进行消息收发之前,我们需要告诉客户端NameServer的地址,RocketMQ有多种方式在客户端中设置NameServer地址,这里我们利用环境变量NAMESRV_ADDR

$ export NAMESRV_ADDR=localhost:9876
# 集群需如下设置
  export NAMESRV_ADDR='k8s-master:9876;k8s-node1:9876;k8s-node2:9876'

# 启动生产者发送消息,默认发送1000条消息,会打印消息发送日志
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...
# 会打印消息机接收日志,这个Consume指令并不会结束,他会继续挂起,等待消费其他的消息。我们可以
使用CTRL+C停止该进程。
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

日志中MessageExt后的整个内容就是一条完整的RocketMQ消息。我们要对这个消息的结构有个大概的了解,后面会对这个消息进行深入的理解。其中比较关键的属性有:brokerName,queueId,msgId,topic,cluster,tags,body,transactionId。先找下这些属性在哪里。

ConsumeMessageThread_please_rename_unique_group_name_4_1 Receive New Messages: [MessageExt [brokerName=k8s-node1, queueId=3, storeSize=241, queueOffset=235, sysFlag=0, bornTimestamp=1666608764903, bornHost=/150.133.36.64:46330, storeTimestamp=1666608764904, storeHost=/150.133.36.64:10911, msgId=9685244000002A9F000000000003756F, commitLogOffset=226671, bodyCRC=494684516, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, TRACE_ON=true, MAX_OFFSET=250, MSG_REGION=DefaultRegion, CONSUME_START_TIME=1666608785055, UNIQ_KEY=96852440724710F87F487A7F53E703AD, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 52, 49], transactionId='null'}]]

关闭服务

$ sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

$ sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

# 记录一下
$ sh bin/mqshutdown proxy

RocketMq 集群部署

请先查看rocketMq 安装部分,有部分前置安装说明

根据rocketMq的部署图,这里我部署三台NameServer服务,和两主两从的Broker服务,具体如下

NameServer部署 broker部署
47.100.56.197 NameServer
43.143.136.203 NameServer broker-a,broker-b-s
61.171.5.6 NameServer broker-b,broker-a-s

复制安装包到对应的服务器

多种集群模式介绍

官方的多种部署模式:https://rocketmq.apache.org/zh/docs/deploymentOperations/15deploy

1、单组节点单副本模式 :NameServer、Broker、Proxy 等都部署一个节点,部署在同一台机器上

2、多组节点(集群)单副本模式

一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

3、多节点(集群)多副本模式-异步复制

每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

4、多节点(集群)多副本模式-同步双写

每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

部署三台NameServer

如果内存不够,注意修改启动bin/runserver.sh启动参数,注意执行的目录

### 启动namesrv 建议增加>/dev/null 2>&1 
$ nohup sh bin/mqnamesrv &
 
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

Broker 配置

服务端配置:https://rocketmq.apache.org/zh/docs/4.x/parameterConfiguration/24server

安装包提供了多种集群部署的配置文件,对应不同的集群模式,在conf目录下2m-开头的相关文件,配置文件的只对应一个nameServer,我们稍作修改

以下文件部分内容全部注释,没有在官方文档上找到对应配置,使用时注意

broker-a.properties

# 集群名称
brokerClusterName=DefaultCluster
# broker名字,名字一样的节点就是一组主从节点。
brokerName=broker-a
# broker id, 0 表示 master, 其他的正整数表示 slave
brokerId=0
# 当前 broker 监听的 IP,避免选择不正确的网卡
brokerIP1=43.143.136.203

# 在每天的什么时间删除已经超过文件保留时间的 commit log
deleteWhen=04
# 以小时计算的文件保留时间
fileReservedTime=48
# SYNC_MASTER/ASYNC_MASTER/SLAVE 前两个对应两种集群模式,其中SLAVE对应从节点
brokerRole=ASYNC_MASTER
# flushDiskType表示刷盘策略,分为SYNC_FLUSH和ASYNC_FLUSH两种,分别代表同步刷盘和异步刷盘。同步刷盘情况下,消息真正写入磁盘后再返回成功状态;异步刷盘情况下,消息写入page_cache后就返回成功状态。
flushDiskType=ASYNC_FLUSH

# 以下是新增配置

#Broker 对外服务的监听端口(默认10911)
listenPort=10911

# nameServer地址,分号分割,此处需要在三台机器的hosts中配置映射
namesrvAddr=k8s-master:9876;k8s-node1:9876;k8s-node2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

#commitlog存储路径
storePathCommitLog=/usr/local/app/rocketmq/store/commitlog/
#consumequeue存储路径
storePathConsumerQueue=/usr/local/app/rocketmq/store/consumequeue/
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# 以下配置仅做记录

#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
#diskMaxUsedSpaceRatio=88


#存储路径
storePathRootDir=/usr/local/app/rocketmq/store/
#消息索引存储路径
storePathIndex=/usr/local/app/rocketmq/store/index/
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/app/rocketmq/store/checkpoint/
# abort 文件存储路径
abortFile=/usr/local/app/rocketmq/store/abort/


#限制的消息大小
maxMessageSize=65536

#一次刷盘至少需要脏页的数量,针对commitlog文件
#flushCommitLogLeastPages=4
# 一次刷盘至少需要脏页的数量,默认2页,针对 Consume文件
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000


#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

过期判断

  • fileReservedTime
    文件删除主要是由这个配置属性,文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以删除。
  • deletePhysicFilesInterval
    删除物理文件的时间间隔(默认是 100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除, 因此删除一个文件后需要间隔 deletePhysicFilesInterval 这个时间再删除另外一个文件,由于删除文件是一个非常耗费 IO 的操作,会引起消息插入消 费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。
  • destroyMapedFileIntervalForcibly
    在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳 destroyMapedFileIntervalForcibly 这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少 1000,直到引用 小于等于 0 为止,即可删除该文件。

broker-a-s.properties

其他配置和broker-a.properties相同

brokerId=1
brokerRole=SLAVE

#Broker 对外服务的监听端口(默认10911)
listenPort=11011
# 当前 broker 监听的 IP,避免选择不正确的网卡
brokerIP1=61.171.5.6

#commitlog存储路径
storePathCommitLog=/usr/local/app/rocketmq/store_slave/commitlog/
#consumequeue存储路径
storePathConsumerQueue=/usr/local/app/rocketmq/store_slave/consumequeue/

#存储路径
storePathRootDir=/usr/local/app/rocketmq/store_slave/
#消息索引存储路径
storePathIndex=/usr/local/app/rocketmq/store_slave/index/
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/app/rocketmq/store_slave/checkpoint/
# abort 文件存储路径
abortFile=/usr/local/app/rocketmq/store_slave/abort/

broker-b.properties

其他配置和broker-a.properties相同

brokerName=broker-b


# 当前 broker 监听的 IP,避免选择不正确的网卡
brokerIP1=61.171.5.6

broker-b-s.properties

其他配置和broker-a.properties相同

brokerName=broker-b
brokerId=1
brokerRole=SLAVE

# 当前 broker 监听的 IP,避免选择不正确的网卡
brokerIP1=43.143.136.203


#Broker 对外服务的监听端口(默认10911)
listenPort=11011

# 当前 broker 监听的 IP,避免选择不正确的网卡
brokerIP1=43.143.136.203

#commitlog存储路径
storePathCommitLog=/usr/local/app/rocketmq/store_slave/commitlog/
#consumequeue存储路径
storePathConsumerQueue=/usr/local/app/rocketmq/store_slave/consumequeue/

#存储路径
storePathRootDir=/usr/local/app/rocketmq/store_slave/
#消息索引存储路径
storePathIndex=/usr/local/app/rocketmq/store_slave/index/
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/app/rocketmq/store_slave/checkpoint/
# abort 文件存储路径
abortFile=/usr/local/app/rocketmq/store_slave/abort/

放通端口

参考 : https://blog.csdn.net/Taiyii/article/details/125526511

#Broker 对外服务的监听端口(默认10911)
listenPort=10911

nameserver端口 : 9876

其他端口待定

10909 : dashboard日志打印报错的端口

11009

11011 : 我配置文件中从broker的监听端口

启动Broker+Proxy集群

由于我这边没有机器,这里只不同的beoker主从节点交叉部署,具体配置的不同在配置文件中有体现

# 如果配置文件中,没有指定nameserver地址,可以添加 -n k8s-master:9876;k8s-node1:9876;k8s-node2:9876 进行指定,多个地址使用逗号分隔

### 在机器A,启动第一个Master, 43.143.136.203
nohup sh bin/mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
 
### 在机器B,启动第二个Master,61.171.5.6
nohup sh bin/mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
 
### 在机器B,启动第一个Slave,61.171.5.6
nohup sh bin/mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
 
### 在机器A,启动第二个Slave,43.143.136.203
nohup sh bin/mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &

# 通过nohup.out查看启动是否成功,jps查看对应的进程是否存在
# 不同机器上执行,对应相应的nameserver 地址, 指定多个地址时报错,这里后续可能有问题,再说
nohup sh bin/mqproxy -n 47.100.56.197:9876 &
nohup sh bin/mqproxy -n 61.171.5.6:9876 &
nohup sh bin/mqproxy -n 43.143.136.203:9876  &

master是负责接收消息的,所以集群中至少有一个master才能写入消息,master挂了之后,对应的slave不会自动切换为master,slave保证了master挂了之后消息还是可以被正常消费

主备自动切换模式部署

Controller 组件

官方:https://rocketmq.apache.org/zh/docs/deploymentOperations/16autoswitchdeploy

该文档主要介绍如何部署支持自动主从切换的 RocketMQ 集群,其架构如上图所示,主要增加支持自动主从切换的 Controller 组件,其可以独立部署也可以内嵌在 NameServer 中。

仅仅做记录

Dledger快速搭建

官方:https://rocketmq.apache.org/zh/docs/bestPractice/16dledger

DLedger是一套基于Raft协议的分布式日志存储组件,部署 RocketMQ 时可以根据需要选择使用DLeger来替换原生的副本存储机制。本文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的 RocketMQ 集群。

仅仅做记录

从官方文档上看Controller 对应的主备切换似乎更简单一点,就了解而言DLedger似乎会造成主从节点之间大量通讯,浪费带宽资源,似乎使用不多

RocketMQ Promethus Exporter

官网:https://rocketmq.apache.org/zh/docs/deploymentOperations/18Exporter

Rocketmq-exporter 是用于监控 RocketMQ broker 端和客户端所有相关指标的系统,通过 mqAdmin 从 broker 端获取指标值后封装成 87 个 cache。包含前端展示界面。

仅仅做记录

RocketMQ Dashboard

RocketMQ Dashboard 是 RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。

官网地址:https://rocketmq.apache.org/zh/docs/4.x/deployment/17Dashboard

这里采用源码安装的方式

源码地址:apache/rocketmq-dashboard

下载并解压,切换至源码目录 rocketmq-dashboard-master/

① 编译 rocketmq-dashboard

# 执行前安装maven,添加环境变量,注意编译目录不要包含中文
$ mvn clean package -Dmaven.test.skip=true

② 运行 rocketmq-dashboard

有需要的,自行添加jvm参数

$ java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar &

编译完成后,获取target下的jar包,就可以直接执行。但是这个时候要注意,在这个项目的application.yml中需要指定nameserver的地址。默认这个属性是指向本地。如果配置为空,会读取环境变量NAMESRV_ADDR。那我们可以在jar包的当前目录下增加一个application.yml文件,覆盖jar包中默认的一个属性,也可以先修改配置文件再打包,或者直接在java -jar 启动参数中给对应属性赋值 ,这里我直接修改后再打包:

rocketmq:
  config:
    # if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, default localhost:9876
    # configure multiple namesrv addresses to manage multiple different clusters
    namesrvAddrs:
      - k8s-master:9876
      - k8s-node1:9876
      - k8s-node2:9876

提示:Started App in x.xxx seconds (JVM running for x.xxx) 启动成功

浏览器页面访问:namesrv.addr:18080,这里由于8080端口死活访问不通,修改了启动端口18080

关闭 rocketmq-dashboard : ctrl + c

开启登录模式

mvn打包前修改application.yml 中 rocketmq.config下配置,修改为ture

# must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
loginRequired: false

修改users.properties

# Format: a user per line, username=password[,N] #N is optional, 0 (Normal User); 1 (Admin)
如注释所描述的一样
每行定义一个用户, username=password[,N]  #N是可选项,可以为0 (普通用户); 1 (管理员)  
其本身的默认用户,请自行修改
# Define Admin
admin=admin,1

# Define Users
user1=user1
user2=user2

系统参数调优

官方地址;https://rocketmq.apache.org/zh/docs/bestPractice/19JVMOS

到这里,我们的整个RocketMQ的服务就搭建完成了。但是在实际使用时,我们说rocketMQ的吞吐量、性能都很高,那要发挥RocketMQ的高性能,还需要对RocketMQ以及服务器的性能进行定制
1、配置RocketMQ的JVM内存大小:
之前提到过,在runserver.sh中需要定制nameserver的内存大小,在runbroker.sh中需要定制broker的内存大小。这些默认的配置可以认为都是经过检验的最优化配置,但是在实际情况中都还需要根据服务器的实际情况进行调整。这里以
runbroker.sh中对G1GC的配置举例,在runbroker.sh中的关键配置:

JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -
XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -
XX:SoftRefLRUPolicyMSPerMB=0"
JAVA_OPT="${JAVA_OPT} -verbose:gc -
Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails -
XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -
XX:+PrintAdaptiveSizePolicy"
JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -
XX:GCLogFileSize=30m"

-XX:+UseG1GC: 使用G1垃圾回收器, -XX:G1HeapRegionSize=16m 将G1的region块大小设为16M,-XX:G1ReservePercent:在G1的老年代中预留25%空闲
内存,这个默认值是10%,RocketMQ把这个参数调大了。-XX:InitiatingHeapOccupancyPercent=30:当堆内存的使用率达到30%之后就会启动G1垃圾回收器尝试回收垃圾,默认值是45%,RocketMQ把这个参数调小了,也就是提高了GC的频率,但是避免了垃圾对象过多,一次垃圾回收时间太长的问题。

然后,后面定制了GC的日志文件,确定GC日志文件的地址、打印的内容以及控制每个日志文件的大小为30M并且只保留5个文件。这些在进行性能检验时,是相当重
要的参考内容。

2、RocketMQ的其他一些核心参数
例如在conf/dleger/broker-n0.conf中有一个参数:
sendMessageThreadPoolNums=16。这一个参数是表明RocketMQ内部用来发送消息的线程池的线程数量是16个,其实这个参数可以根据机器的CPU核心数进行适当调整,例如如果你的机器核心数超过16个,就可以把这个参数适当调大。

3、Linux内核参数定制
我们在部署RocketMQ的时候,还需要对Linux内核参数进行一定的定制。例如

  • ulimit,需要进行大量的网络通信和磁盘IO。
  • vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)
  • vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。
  • vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。
  • vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。
  • File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。

这些参数在CentOS7中的配置文件都在 /proc/sys/vm目录下。另外,RocketMQ的bin目录下有个os.sh里面设置了RocketMQ建议的系统内核参数,可以根据情况进行调整。

rocketMq相关知识

rocketMq相关知识 :https://blog.csdn.net/qq_42792950/article/details/123590855

1、为什么RocketMQ不用Zookeeper而要自己实现一个NameServer来进行注册?

https://www.codenong.com/js456f9759bb97/

从cap的角度,首先zookeeper满足的是cp,当其中一个节点宕机后,需要一定的时间选举出新的leader来对外提供服务,且zookeeper确定数据接收成功需要一半以上的节点都接收成功才行,耗时更长,就rocketmq应用场景,更适合使用满足cp理论的nameServer,至于为什么不用nacos,相比于nacos提供的众多服务,nameServer是轻量的,更适合rocketmq,如果使用nacos,还需要考虑版本的兼容问题

  • 一致性(Consistency):Name Server 集群中的多个实例,彼此之间是不通信的,这意味着某一时刻,不同实例上维护的元数据可能是不同的,客户端获取到的数据也可能是不一致的。
  • 可用性(Availability):只要不是所有NameServer节点都挂掉,且某个节点可以在指定之间内响应客户端即可。
  • 分区容错(Partiton Tolerance):对于分布式架构,网络条件不可控,出现网络分区是不可避免的,只要保证部分NameServer节点网络可达,就可以获取到数据。具体看公司如何实施,例如:为了实现跨机房的容灾,可以将NameServer部署的不同的机房,某个机房出现网络故障,其他机房依然可用,当然Broker集群/Producer集群/Consumer集群也要跨机房部署。

2、Consumer分组有什么用?Producer分组呢?

Consumer分组,一个分组可能包含多个机器,只要一个机器消费成功,就可以认为消息已经被消费,且消息重试机制,对于同一个消费分组,其中一台机器挂了,下次重试可以发送给另外一台机器

Producer分组更多的作用是用于事务类型的消息,消息被消费后告知消息生产者()

3、RocketMQ如何保证集群高可用?

nameserver 是ap架构,任意节点挂了之后,其他节点照样可以对外提供服务

broker集群部署,可以部署多组主备Broker,可以进行水平扩容-增加一组主备,单个master挂了之后,slave可以保证消息正常被consumer消费,不会影响整个集群对外提供服务,如果需要,可以部署contorller和Dledger ,提供主从自动切换的能力。

关联信息

  • 关联的主题:
  • 上一篇:
  • 下一篇:
  • image: 20221021/1
  • 转载自:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rocketMq介绍和安装 的相关文章

  • 存根方法时出现 InvalidUseOfMatchersException

    我有这个 TestNG 测试方法代码 InjectMocks private FilmeService filmeService new FilmeServiceImpl Mock private FilmeDAO filmeDao Bef
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • JVisualVM/JConsole 中的 System.gc() 与 GC 按钮

    我目前正在测试处理 XML 模式的概念验证原型 并围绕一个非常消耗内存的树自动机外部库 我已经获得了源代码 构建 我想绘制 真实峰值 堆 随着模式大小的增加 不同运行的内存消耗 使用的指标符合我的目的并且不会影响问题 或者至少是它的合理近似
  • 使用 GWT 读取非常大的本地 XML 文件

    我正在使用 GWT 构建我的第一个 Java 应用程序 它必须从一个非常大的 XML 文件中读取数据 当我尝试发送对文件中信息的请求时遇到问题 并且我不太确定它是否与文件的大小或我的语义有关 在我的程序中 我有以下内容 static fin
  • 如何在 Antlr4 中为零参数函数编写语法

    我的函数具有参数语法 如下面的词法分析器和解析器 MyFunctionsLexer g4 lexer grammar MyFunctionsLexer FUNCTION FUNCTION NAME A Za z0 9 DOT COMMA L
  • 打印星号的 ASCII 菱形

    我的程序打印出这样的钻石 但只有当参数或菱形的每一面为4 例如如果我输入6 底部三角形的间距是错误的 我一直在试图找出答案 当参数改变时 底部的三角形不会改变 只有顶部的三角形会改变 它只适用于输入4 public static void
  • Java:在 eclipse 中导出到 .jar 文件

    我正在尝试将 Eclipse 中的程序导出到 jar 文件 在我的项目中 我添加了一些图片和 PDF s 当我导出到 jar 文件时 似乎只有main已编译并导出 我的意愿是如果可能的话将所有内容导出到 jar 文件 因为这样我想将其转换为
  • 如何将 Mat (opencv) 转换为 INDArray (DL4J)?

    我希望任何人都可以帮助我解决这个任务 我正在处理一些图像分类并尝试将 OpenCv 3 2 0 和 DL4J 结合起来 我知道DL4J也包含Opencv 但我认为它没什么用 谁能帮我 如何转换成 INDArray 我尝试阅读一些问题here
  • Kotlin 未解决的参考:CLI 上 gradle 的 println

    放一个printlnkotlin 函数返回之前的语句会崩溃 堆栈跟踪 thufir dur NetBeansProjects kotlin thufir dur NetBeansProjects kotlin gradle clean bu
  • 需要使用 joda 进行灵活的日期时间转换

    我想使用 joda 解析电子邮件中的日期时间字符串 不幸的是我得到了各种不同的格式 例如 Wed 19 Jan 2011 12 52 31 0600 Wed 19 Jan 2011 10 15 34 0800 PST Wed 19 Jan
  • 如何检测 Java 字符串中的 unicode 字符?

    假设我有一个包含 的字符串 我如何找到所有这些 un icode 字符 我应该测试他们的代码吗 我该怎么做呢 例如 给定字符串 A X 我想将其转换为 AYXY 我想对其他 unicode 字符做同样的事情 并且我不想将它们存储在某种翻译映
  • 从 GitHub 上托管的 Spring Cloud Config Server 访问存储库的身份验证问题

    我在 GitHub 上的存储库中托管配置 如果我将回购公开 一切都好 但如果我将其设为私有 我将面临 org eclipse jgit errors TransportException https github com my user m
  • 读取电子邮件的文本文件转换为 Javamail MimeMessage

    我有一个电子邮件原始来源的文本文件 直接从 gmail 复制 如果您单击 查看原始文件 您就会看到它 我想读入该文件并将其转换为 MimeMessage 如果您好奇为什么 我设置了 JavaMaildir 并且需要用电子邮件填充它的收件箱以
  • 流中的非终结符 forEach() ?

    有时 在处理 Java Stream 时 我发现自己需要一个非终端 forEach 来触发副作用但不终止处理 我怀疑我可以用 map item gt f item 之类的方法来做到这一点 其中方法 f 执行副作用并将项目返回到流中 但这似乎
  • QuerySyntaxException:无法找到类

    我正在使用 hql 生成 JunctionManagementListDto 类的实际 Java 对象 但我最终在控制台上出现以下异常 org hibernate hql internal ast QuerySyntaxException
  • 返回 Java 8 中的通用函数接口

    我想写一种函数工厂 它应该是一个函数 以不同的策略作为参数调用一次 它应该返回一个函数 该函数根据参数选择其中一种策略 该参数将由谓词实现 嗯 最好看看condition3为了更好的理解 问题是 它没有编译 我认为因为编译器无法弄清楚函数式
  • Java Swing:需要一个高质量的带有复选框的开发 JTree

    我一直在寻找一个 Tree 实现 其中包含复选框 其中 当您选择一个节点时 树中的所有后继节点都会被自动选择 当您取消选择一个节点时 树中其所有后继节点都会自动取消选择 当已经选择了父节点 并且从其后继之一中删除了选择时 节点颜色将发生变化
  • Hamcrest Matchers - 断言列表类型

    问题 我目前正在尝试使用 Hamcrest Matchers 来断言返回的列表类型是特定类型 例如 假设我的服务调用返回以下列表 List
  • org.apache.commons.net.io.CopyStreamException:复制时捕获 IOException

    我正在尝试使用以下方法中的代码将在我的服务器中创建的一些文件复制到 FTP 但奇怪的是我随机地低于错误 我无法弄清楚发生了什么 Exception org apache commons net io CopyStreamException
  • Spring表单ModelAttribute字段验证避免400 Bad Request错误

    我有一个ArticleFormModel包含正常发送的数据html form由 Spring 使用注入 ModelAttribute注释 即 RequestMapping value edit method RequestMethod PO

随机推荐

  • 单链表(数组模拟:静态链表)

    单链表 实现一个单链表 链表初始为空 支持三种操作 向链表头插入一个数 删除第 kk 个插入的数后面的数 在第 kk 个插入的数后插入一个数 现在要对该链表进行 MM 次操作 进行完所有操作后 从头到尾输出整个链表 注意 题目中第 kk 个
  • MySQL 临时表与内存表的区别

    文章目录 1 临时表 2 内存表 3 区别 4 小结 在 MySQL 中 Temporary Table 临时表 和 Memory Table 内存表 是两种不同的表类型 它们有一些重要的区别和用途 1 临时表 临时表 Temporary
  • PAA介绍

    ECCV 2020 的一篇文章 论文地址 https arxiv org abs 2007 08103 目录 一 简介 摘要 整个策略流程为 二 相关背景介绍 三 提出的方法 3 1 概率Anchor分配算法 3 2 测试阶段加入预测IoU
  • 我用ChatGPT写2023高考语文作文(一):全国甲卷

    2023年 全国甲卷 适用地区 广西 贵州 四川 西藏 人们因技术发展得以更好地掌控时间 但也有人因此成了时间的仆人 这句话引发了你怎样的联想与思考 请写一篇文章 要求 选准角度 确定立意 明确文体 自拟标题 不要套作 不得抄袭 不得泄露个
  • 怎么修复老照片?给你推荐这几个修复方法

    相信大家的家里都有老照片吧 那在你们翻看这些老照片的时候 有没有发现有些老照片变得有些破旧 泛黄 模糊等情况呢 看到这些情况 大家是不是会很心疼呢 因为这些老照片都充满了各种各样的回忆 根本拍不出第二张同样的照片 但其实我们可以使用软件来修
  • 设计模式原则-开闭原则

    设计模式原则 开闭原则 1 概述 开闭原则 Open Closed Principle 是编程中最基础 最重要的设计原则 一个软件实体如类 模块和函数应该对扩展开放 对提供方 对修改关闭 对使用方 用抽象构建框架 用实现扩展细节 当软件需要
  • 8个Python实用脚本,赶紧收藏

    脚本写的好 下班下得早 程序员的日常工作除了编写程序代码 还不可避免地需要处理相关的测试和验证工作 例如 访问某个网站一直不通 需要确定此地址是否可访问 服务器返回什么 进而确定问题在于什么 完成这个任务 如果一味希望采用编译型语言来编写这
  • java调用webservice接口 三种方法

    摘自其它 webservice的 发布一般都是使用WSDL web service descriptive language 文件的样式来发布的 在WSDL文件里面 包含这个webservice暴露在外面可供使用的接口 今天搜索到了非常好的
  • python 协程可以嵌套协程吗_Python学习后有哪些方向可以选择?Python有什么好的学习方法吗?(附教程)...

    随着人工智能的发展 Python近两年也是大火 越来越多的人加入到Python学习大军 对于毫无基础的人该如何入门Python呢 这里整理了一些个人经验和Python入门教程供大家参考 如果你是零基础入门 Python 的话 建议初学者至少
  • 3.1 向量的模和单位向量

    向量的长度和单位向量 向量的长度 模 u 3 4 该向量的大小是多少 u 5 二范数 欧拉距离 在二维空间中 可以直接根据勾股定理计算出 u OP 2 3 5 该向量的大小是多少 n维向量 求模 同理 单位向量 在向量上记 为单位向量 长度
  • 股票数据API整理

    最近在做股票分析系统 数据获取源头成了一大问题 经过仔细的研究发现了很多获取办法 这里整理一下 方便后来者使用 获取股票数据的源头主要有 数据超市 雅虎 新浪 Google 和讯 搜狐 ChinaStockWebService 东方财富客户
  • k8s--基础--23.1--认证-授权-准入控制--介绍

    k8s 基础 23 1 认证 授权 准入控制 介绍 1 介绍 k8s对我们整个系统的认证 授权 访问控制做了精密的设置 对于k8s集群来说 apiserver是整个就集群访问控制的唯一入口 我们在k8s集群之上部署应用程序的时候 可以通过宿
  • 数据结构_课程设计——最小生成树:室内布线

    转载请注明出处 http blog csdn net lttree 这道课程设计 费不少时间 太麻烦了 明明是能力不够 最小生成树 室内布线 题目要求 装修新房子是一项颇为复杂的project 如今须要写个程序帮助房主设计室内电线的布局 首
  • 数组练习题(编程题)

    1 从终端 键盘 读 20个数据到数组中 统计其中正数的个数 并计算这些正数之和 int sum 0 int count 0 int input int arr 20 0 初始化处理 arr 0x0000002d1b13f8c0 85899
  • 7.25总结,正则表达式+号的含义

    一 正则表达式 由 括起来的是需要判断的字符 eg a z A Z 0 9 在 加 号表示多次并且连续满足 条件的式子 表示有没有 String s1 123qwe13qwe s1 s1 replaceAll 0 9 替换 System o
  • 用Python爬虫接私活,赚了32K!

    网络爬虫 很多人觉得这是技术控的专属 实际上爬虫是人人都能掌握的技能 爬虫到底能干什么 基本你所能看到的全部信息 它都能抓取 例如 收集并批量下载某音乐软件付费歌曲 某视频软件的付费视频 采集北京所有小区的信息及北京所有小区的所有历史成交记
  • centos7 lvm 创建脚本

    centos7 lvm 创建脚本 Centos7 lvm创建 适用场景只有一块新加的磁盘 且未分区 挂载目录为riva 可自定义 date 2023 bin bash 注意此处变量 Disk dev sd 不同的平台会有差异 比如腾讯云为
  • Attempted to load tokenizers/punkt/PY3/english.pickle

    分明已经把punkt放到服务器相应文件下 但是还是显示没成功 错误原因是解压得时候文件目录有两个punkt
  • VUE之自定义插件

    index js文件 import promptBox from prompt box vue 定义插件对象 const PromptBox vue的install方法 用于定义vue插件 PromptBox install functio
  • rocketMq介绍和安装

    rocketMq介绍和安装 Mq介绍 MQ MessageQueue 消息队列 队列 是一种FIFO 先进先出的数据结构 消息由生产者发送到MQ进行排队 然后按原来的顺序交由消息的消费者进行处理 QQ和微信就是典型的MQ MQ的作用 主要有