RocketMQ(三) broker启动

2023-11-08

RocketMQ源码版本V5.0.0,可兼容之前的版本,因为整理资料的时候,之前的版本,和V5版本有所出入,核心流程基本还是大同小异的。

此前已经总结了NameServer的启动流程源码:现在来了解Broker的启动流程。在RocketMQ启动的时候,首先要启动NameServer,然后再启动Broker。

Broker模块主要复制消息的存储、投递、查询,以及服务高可用的保证。

为了实现这些功能,Broker主要有以下几个比较重要的模块:

1.Remoting Module:整个Broker的实体,复制处理来自Client端的请求。

2.Client Manager:复制管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息。

3.Store Service:提供方便简单的API接口处理消息存储到物理硬盘的查询功能。

4.HA Service:高可用服务,提供Master Broker 和 Salve Broker之间的数据同步功能。

5.Index Service:根据特定的Message Key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

1. Broker启动入口

Broker的启动入口就是broker模块的BrokerStartup类的main方法。我们先来看BrokerStartup的main方法,其实和NamesrvStartup几乎是一个模子,都是先创建一个Controller,然后再启动它;

 2. createBrokerController函数创建BrokerController实例对象

创建BrokerController时,核心就做了2件事情

        1.解析各种配置(命令行等)、创建BrokerController需要的各种配置对象:BrokerConfig、NettyServerConig、NettyClientConfig、MessageStoreConfig;

        2.BrokerController相当于Broker的一个中央控制类。创建了BrokerController的对象后,再调用BrokerController对象的initialize方法,进行初始化操作。

 

从创建BrokerController的代码中可以看出,BrokerController的依赖的四个核心配置如下图所示:

 这些配置类实际上就是一些普通的POJO类。所以此时Broker的整个组件结构应该是这样的:

 Broker整个JVM进行运行期间,都是由BrokerController这个管控组件去管理Broker的请求处理、后台线程以及磁盘数据的。

 2.1 创建各种配置类

 creatBrokerController方法中,首先就会创建各种配置类,重要的broker相关的配置类有如下几种:

        1. BrokerConfig:Broker的配置类,包含Broker的各种配置信息,比如ROCKET_HOME、namesrvAddr、brokerName、brokerId属性等。

        2. NettyServiceConfig:NettyServer的配置类,包含Broker作为服务器端时的各种属性,比如客户端进行交互的时候。就设置NettyServer的监听端口为10911。即客户端与Broker通信时使用10911端口。

        3. NettyClientConfig:NettyClient的配置类,包含Broker最为客户端时的各种属性,Broker与NameServer进行交互的时候,就会作为客户端存在。

        4. MessageStoreConfig:Broker消息存储的配置类,包含了消息存储的相关配置。比如各种文件的目录,大小等信息。

        配置完成以上几个核心配置文件后,还会进行外部文件的解析,将-c指令指定的外部配置文件的属性设置给这些配置类。

        设置了对应的配置信息后,还会进行一系列的校验,例如:

1. ROCKETMQ_HOME校验:如果在启动参数中没有指定ROCKETMQ_HOME属性,那么打印异常并退出程序,并提示让我们自己进行配置该属性。ROCKETMQ_HOME就是指定RocketMQ的配置文件路径。

2. namesrvAddr校验:我们可以配置多个NameServer地址,以“;”分割,Broker会通过将各个NameServer的字符串地址转换为InetSocketAddress(实现 IP 套接字地址(IP 地址 + 端口号))来校验各个地址的合法性。

3. 设置、校验brokerId,如果broker是同步master或者异步master角色,则设置brokerId为0,如果是slave角色,则校验设置的brokerId,如果不大于0,则打印异常。

根据检查broker的角色配置brokerId:默认角色是ASYNC_MASTER 通过此配置可知brokerId为0表示master,非0表示slave

broker的角色分为:

ASYNC_MASTER(异步主机):异步同步消息到slave

SYNC_MASTER(同步主机):同步同步消息到slave

SLAVE(从机)

2.2 BrokerController类

我们在继续宁BrokerController如何初始化之前,先来看一下BrokerController内部到底是怎么样的。我们说过BrokerController相当于broker的一个中央控制器,各种组件角色的交互都是通过BrokerController来完成的,而不是组件的直接互相调用。

从下面的源码中不难看出,实例化BrokerController的时候,会一块实例化很多的配置类和线程池队列。

  

 2.3 初始化BrokerController控制器

初始化步骤大致如下:

1.  加载配置文件:topic配置文件、topicQueue相关配置、消费者消费偏移量配置文件、订阅分组配置文件、消费者订单信息管理文件。

2.  实例化和初始化消息存储服务相关类DefaultMessageStore。

3.  通过DefaultMessageStore加载消息存储的相关文件。

比如:commitLog日志文件、consumequeue消费消息队列文件的加载、indexFiles索引文件的构建 messsageStore还会将这些文件的内容加载到内存中,并且完成RocketMQ的数据恢复 这是broker启动是核心步骤之一。

4.  初始化Broker通信层,创建netty远程服务 (remotingServer 和 fastRemotingServer)。

创建broker的netty远程服务,端口为10911,可用于处理客户端的所有请求; 创建broker的快速netty远程服务,端口号为普通端口号-2(默认10909),这就是所谓的快速通道,对应可以处理客户端除了拉取消息之外的所有请求,所谓的VIP端口。

5.  创建各种线程池,主要有两类:

第一类:负责处理别人发过来的请求;

第二类:负责处理自己的一些后台任务。

这一步创建了很多线程池,因为RocketMQ为了性能,对过多的请求进行异步优化处理,因此需要许多线程池。

6.  RocketMQ底层通信基于netty,这里注册netty请求处理器。

        (1) registerProcessor方法将处理器和对应的线程池绑定为一个Pair对象,并且将这个pair对象放入processorTable中, 其值就是pair对象,key就是对应的请求编码RequestCode。

        (2) 每个请求,都会根据自己携带的RequestCode在processorTable中查找对应的处理器以及对应的执行器线程池来处理请求。 RocketMQ通过这样的方式来提升处理请求的性能。

7.  启动一系列定时周期任务(定时调度线程池的后台执行)

在注册了netty消息处理器之后,将会启动一系列的定时任务 这些定时任务由BrokerController中的scheduledExecutorService去执行,该线程池只有一个线程。

8.  初始化事务消息相关服务

初始化服务采用Java SPI的方式进行加载 主要初始化三个服务:

(1)transactionalMessageService(事务消息服务):用于处理、检查事务消息

(2)transactionalMessageCheckListener(事务消息监查监听器):监听回查消息

(3)transactionalMessageCheckService(事务消息检查服务):提供了事务消息回查的逻辑,默认情况下 6s以上没commit/rollback的事务消息才会触发事务回查,而如果回查次数超过了15次则丢弃事务

9.  初始化Acl权限相关服务:加载权限相关校验器

同样是基于Java的SPI机制进行查找,并且会将找到校验器注册到RpcHook中,在请求执行之前会执行权限校验。

10. 初始化RPC调用的钩子函数

RpcHook是RocketMQ提供的钩子类,提供一种类似于类似于AOP的功能。 可以在请求被处理之前和响应被返回之前执行对应的方法。

除了以上步骤以外,还有Tls传输相关配置,通信安全的文件监听模块,用来观察网络加密配置文件的更改。

 2.3.1 加载配置文件 

 2.3.2 创建消息存储对象defaultMessageStore

        如果上面的配置文件都加载成功,则创建负责消息存储相关的对象defaultMessageStore。注意,这里所谓的加载成功,是指在加载过程中没有抛出异常,即使是没有对应的文件和临时文件,只要没有抛出异常,也会返回true,表示加载成功。

        DefaultMessageStore是RocketMQ的核心文件存储控制类,是RocketMQ对于消息存储和获取功能的抽象,DefaultMessageStore类位于store模块,通过该类可以直接控制管理commitLog、consumeQueue、indexFile等文件内容的读、写,非常重要。

        在启动Broker的时候,就会创建一个defaultMessagStore对象,随后通过load方法进行磁盘文件的加载和异常数据的恢复。

 

 2.3.2.1 解析延迟级别

 2.3.3 注册消息存储钩子

2.3.4 Load加载恢复消息文件

        DefaultMesageStore实例化之后,将会调用load方法将磁盘中的commitLog、ConsumeQueue、IndexFile文件的数据加载到内存,还会进行数据恢复操作。

主要步骤为:

        1. 调用isTempFileExit方法判断上次broker是否是正常退出。如果是正常退出不会保留absort文件,异常退出则会;broker在启动时会创建abort文件,并且注册钩子函数,在JVM退出时删除abort文件,如果下一次启动时存在abort文件,说明broker是异常退出的,文件数据可能存在不一致的情况,需要进行数据修复。

        2. 加载CommitLog日志文件。CommitLog文件才是真正消息存储的地方(即消息主题以及元数据的存储主题,存储Producer端写入的消息主题内容,消息内容是不定长的)。单个大小默认1G。官方描述如下:单个文件大小默认1G,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息顺序写入日志文件,效率很高,当文件满了,写入下一个文件。

        3. 加载ConusmeQueue文件。 ConsumeQueue文件可以看作是CommitLog是索引文件,其存储了它所属topic的信息在CommitLog中的偏移量。消费者拉取消息的时候,可以从ConsumeQueue中快速的根据偏移量定位消息在CommitLog中的位置。

        4. 加载checkpoint检查点文件。StoreCheckpoint记录这commitLog、ConsumeQueue、Index文件的最后更新时间点。当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复,这决定着文件从哪里开始恢复,甚至是删除文件。

        5. 加载index索引文件。index索引文件用以通过时间区间快速查询消息,底层为HashMap结构,实现为hash索引。如果不是正常退出,并且最大更新时间比checkpoint文件中是时间戳大,则删除该index文件。

      6. 恢复ConsumeQueue文件和CommitLog文件,将正确的数据恢复至内存,删除错误数据和文件

2.3.4.1 isTempFileExist 判断是否存在临时文件

2.3.4.2 commitLog#load加载消息日志文件

        通过内部的CommitLog对象的load方法加载Commit Log日志文件。CommitLog的load方法实际上是委托内部的mappedFileQueue的load方法进行加载。

        MappedFileQueue#load方法会就是将commitLog目录路径下的commotlog文件进行全部的加载为MappedFile对象。 

         在物理上commitlog目录下面是一个个的commitlog文件,但是在java中进行了三层映射。CommitLog-MappedFileQueue-MappedFile。CommitLog中包含MappedFileQueue,以及commitlog相关的其他服务,例如刷盘服务;MappedFileQueue中包含MappedFile集合,以及单个commotlog文件大小等属性,而MappedFile才是真正的一个commotlog文件在Java中的映射,包含文件名、大小、mmap对象mappedByteBuffer等属性。

        实际上MappedFileQueue和MappedFile都是通用类,commitlog、comsumequeue、indexfile文件都会使用到。

        创建MappedFile映射文件。MappedFile作为一个RocketMQ的物理文件在Java中的映射类。commitLog consumerQueue、indexFile3种文件磁盘的读写都是通过MappedFile操作的。

2.3.4.3 consumeQueueStore#load加载消费队列文件

        ConsumeQueue对象建立之后,会对自己管理的队列id目录下面的ConsumeQueue文件进行加载。内部就是调用mappedFileQueue的load方法,该方法我们前面讲过了,会对每个ConsumeQueue文件创建一个MappedFile对象并且进行内存映射mmap操作。

2.3.4.4 创建StoreCheckpoint检查点对象

        在commitlog和consumequeue文件都加载成功之后,加载checkpoint 检查点文件,创建storeCheckpoint对象。

        StoreCheckpoint记录着commitLog、consumeQueue、index文件的最后更新时间点,当上一次broker是异常结束时,会根据StoreCheckpoint的数据进行恢复,这决定着文件从哪里开始恢复,甚至是删除文件。

StoreCheckpoint记录了四个关键属性(之前版本是前三个,现在这个版本又加了第4个):

1. physicMsgTimestamp:最新commitLog文件的刷盘时间戳,单位毫秒;

2. logicsMsgTimestamp:最新consumeQueue文件的刷盘时间戳,单位毫秒;

3. indexMsgTimestamp:创建最新indexfile文件的时间戳,单位毫秒;

4. masterFlushedOffset:获取master刷新偏移。

2.3.4.5 加载indexFile文件

index 索引文件用于通过时间区间来快速查询消息,底层为HashMap结构,实现为hash索引。

最终一个index文件对应着一个IndexFile实例,并且会加到indexFileList集合中。还会判断如果上次broker不是正常退出,并且并且当前index文件中最后一个消息的落盘时间戳大于StoreCheckpoint中的最后一个index索引文件创建时间,则该索引文件被删除。

2.3.4.6 恢复ConsumeQueue文件和commitLog文件以及TopicQueueTable,将正确的数据恢复至内存,删除错误数据和文件


ConusmeQueue

步骤一:


    /**
     * 进行ConsumeQueue的恢复
     * 默认采用的是正常恢复,不是并发恢复
     */
    private void recoverConsumeQueue() {
        if (!this.brokerConfig.isRecoverConcurrently()) {
            this.consumeQueueStore.recover();
        } else {
            this.consumeQueueStore.recoverConcurrently();
        }
    }

 步骤二:

    /**
     * 正常恢复
     */
    public void recover() {
        //遍历consumeQueueTable的value集合,即queueId到ConsumeQueue的map映射
        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
            //遍历所有的ConsumeQueueInterface
            for (ConsumeQueueInterface logic : maps.values()) {
                //恢复ConsumeQueue,删除无效的ConsumeQueue文件
                this.recover(logic);
            }
        }
    }

步骤三: 

    public void recover(ConsumeQueueInterface consumeQueue) {
        //获取文件队列生命周期
        FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
        fileQueueLifeCycle.recover();
    }

步骤四:

 /**
     * ConsumeQueue的recover()方法
     */
    @Override
    public void recover() {
        //获取所有的ConsumeQueue文件映射的mappedFiles集合
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        //如果存在commitlog文件集合
        if (!mappedFiles.isEmpty()) {

            //从倒数第三个文件开始恢复
            int index = mappedFiles.size() - 3;
            //不足3个文件,则直接从第1个文件开始进行恢复
            if (index < 0) {
                index = 0;
            }

            //consumequeue映射文件的大小
            int mappedFileSizeLogics = this.mappedFileSize;
            //获取文件对应的映射对象
            MappedFile mappedFile = mappedFiles.get(index);
            //文件映射对应的DriectByteBuffer
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            //获取文件映射的初始物理偏移量,其实和文件名相同
            long processOffset = mappedFile.getFileFromOffset();
            //consumequeue映射文件的有效offset
            long mappedFileOffset = 0;
            long maxExtAddr = 1;
            while (true) {
                //校验每一个索引条目的有效性,CQ_STORE_UNIT_SIZE是每隔条目的大小,默认20
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                    //获取该条目对应的消息在commitlog文件中的物理偏移量
                    long offset = byteBuffer.getLong();
                    //获取该条目对应的消息在commitlog文件中的总长度
                    int size = byteBuffer.getInt();
                    //获取该条目对应的消息的tag哈希值
                    long tagsCode = byteBuffer.getLong();

                    //如果offset 和 size 都大于0,则表示当前条目有效
                    if (offset >= 0 && size > 0) {
                        //更新当前ConsumeQueue文件中的有效数据偏移量
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        //跟新当前queueId目录下所有的ConsumeQueue文件中的最大有效物理偏移量
                        this.maxPhysicOffset = offset + size;
                        if (isExtAddr(tagsCode)) {
                            maxExtAddr = tagsCode;
                        }
                    } else {
                        //否则,表示当前条目无效了,后续的条目不会遍历
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                        break;
                    }
                }

                //如果当前ConsumeQueue文件中的有效数据偏移量和文件大小一样,则表示该ConsumeQueue文件中的所有条目都是有效的
                if (mappedFileOffset == mappedFileSizeLogics) {
                    //遍历下一个文件
                    index++;
                    //遍历到了最后一个文件,则结束遍历
                    if (index >= mappedFiles.size()) {

                        log.info("recover last consume queue file over, last mapped file "
                            + mappedFile.getFileName());
                        break;
                    } else {
                        //获取下一个文件的数据
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    //如果不相等,则表示当前ConsumeQueue有部分无效数据,恢复结束
                    log.info("recover current consume queue over " + mappedFile.getFileName() + " "
                        + (processOffset + mappedFileOffset));
                    break;
                }
            }

            //该文件映射的已恢复的物理偏移量
            processOffset += mappedFileOffset;
            //设置当前queueId下面的所有的ConsumeQueue文件的最新数据
            //设置刷盘最新位置,提交的最新位置
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            /**
             * 删除文件中最大有效数据偏移量processOffset以后的所有数据
             * 该方法会校验,如果文件最大数据偏移量大于最大有效数据偏移量
             * 最大数据偏移量:文件的最大数据偏移
             * 最大有效数据偏移量:最后一个有效条目在自身文件中的偏移量
             *      1. 那么将文件起始偏移量大于最大有效数据偏移量的文件进行整个删除
             *      2. 否则设置该文件的有效数据位置为最大有效数据偏移量
             */
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            //是否启用外部读取,默认不启用
            if (isExtReadEnable()) {
                this.consumeQueueExt.recover();
                log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
            }
        }
    }

步骤五:

    /**
     * MappedFileQueue的方法
     * @param offset 文件的最大有效数据偏移量
     */
    public void truncateDirtyFiles(long offset) {
        //待移除的文件集合
        List<MappedFile> willRemoveFiles = new ArrayList<>();

        //遍历内部所有的MappedFile文件
        for (MappedFile file : this.mappedFiles) {
            //获取当前文件自身的最大数据偏移
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
            //如果最大数据偏移量大于最大有效数据偏移量
            if (fileTailOffset > offset) {
                //如果最大有效数据偏移量大于等于该文件的起始偏移量,那么说明文件当中有一部分数据是有效的,那么设置该文件的的有效属性
                if (offset >= file.getFileFromOffset()) {
                    //设置当前文件的刷盘、提交、写入指针为当前最大有效数据偏移量
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {
                    //如果最大有效数据偏移量小于该文件的起始偏移量,那么删除该文件
                    file.destroy(1000);
                    //记录到待删除的文件集合中
                    willRemoveFiles.add(file);
                }
            }
        }

        //将等待移除的文件整体从mappedFiles中移除
        this.deleteExpiredFile(willRemoveFiles);
    }

 commitLog.recoverNormally(正常恢复)

步骤一:

 /**
     * When the normal exit, data recovery, all memory data have been flush
     * 该方法用于broker上次正常关闭的时候恢复commitlog,其逻辑与recoverConsumeQueue恢复ConsumeQueue文件的方法差不多
     * 当正常退出、数据恢复时,所有内存数据都已刷新到磁盘
     * @param maxPhyOffsetOfConsumeQueue consumequeue文件中记录的最大有效commitlog文件偏移量
     */
    public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
        /**
         * 是否检查消耗记录的CRC32。默认为true
         * 目的是:
         *      这样可以确保消息不会发生在线或磁盘损坏。
         *      该检查增加了一些开销,因此在寻求极端性能的情况下可能会禁用该检查。
         */
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        //检查重复信息
        boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
        //获取commitlog文件集合
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        //如果存在commitlog文件
        if (!mappedFiles.isEmpty()) {
            // Began to recover from the last third file
            //从倒数第三个文件开始恢复
            int index = mappedFiles.size() - 3;
            //不足3个文件,则直接从第1个开始恢复
            if (index < 0) {
                index = 0;
            }

            //获取文件对应的映射对象
            MappedFile mappedFile = mappedFiles.get(index);
            //文件映射对应的DiectByteBuffer
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            //获取文件映射的初始物理偏移量,其实和文件名相同
            long processOffset = mappedFile.getFileFromOffset();
            //当前commitlog映射文件的有效offset
            long mappedFileOffset = 0;
            long lastValidMsgPhyOffset = this.getConfirmOffset();
            // normal recover doesn't require dispatching
            //正常恢复不需要调度
            boolean doDispatch = false;
            while (true) {
                //生成DispatchRequest,验证本条消息是否合法
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
                //获取消息大小
                int size = dispatchRequest.getMsgSize();
                // Normal data
                //如果数据是正常的
                if (dispatchRequest.isSuccess() && size > 0) {
                    //最后有效的物理地址偏移 = 文件的初始物理偏移量 + commitlog文件有效的偏移量
                    lastValidMsgPhyOffset = processOffset + mappedFileOffset;
                    //更新mappedFilOffset的值加上本条消息长度
                    mappedFileOffset += size;
                    /**
                     * 将在新的调度请求发送到消息存储时触发。
                     */
                    this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
                }
                // Come the end of the file, switch to the next file Since the
                // return 0 representatives met last hole,
                // this can not be included in truncate offset
                //如果消息的请求是正常的但是size为0,表示到了文件的末尾,则尝试跳到下一个commitlog文件进行检测
                else if (dispatchRequest.isSuccess() && size == 0) {
                    /**
                     * 将在新的调度请求发送到消息存储时触发。
                     */
                    this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, true);
                    index++;
                    //如果最后一个文件查找完毕,结束循环
                    if (index >= mappedFiles.size()) {
                        // Current branch can not happen
                        log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        //如果最后一个文件没有查找完毕,那么跳转到下一个文件
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
                // Intermediate file read error
                //如果当前消息异常,那么不继续校验
                else if (!dispatchRequest.isSuccess()) {
                    if (size > 0) {
                        log.warn("found a half message at {}, it will be truncated.", processOffset + mappedFileOffset);
                    }
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
            }

            //commitlog文件的最大有效区域的偏移量
            processOffset += mappedFileOffset;
            // Set a candidate confirm offset.
            // In most cases, this value will be overwritten by confirmLog.init.
            // It works if some confirmed messages are lost.
            this.setConfirmOffset(lastValidMsgPhyOffset);
            //设置当前commitlog下面的所有的commitlog文件的最新数据
            //设置刷盘最新位置,提交的最新位置
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            /**
             * 删除文件最大有效数据偏移量processOffset之后的所有数据
             */
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            // Clear ConsumeQueue redundant data
            //如果最consmequeue文件记录的最大有效commitlog文件偏移量,大于等于commitlog文件本身记录的最大有效区域的偏移量
            //那么以consumequeue文件的有效数据为准,再次清楚consumequeue文件中的脏数据
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        } else {
            // Commitlog case files are deleted
            //如果不存在commitlog文件
            log.warn("The commitlog files are deleted, and delete the consume queue files");
            //那么重置刷盘最新位置,提交的最新位置,并且清除所有的consumequeue索引文件
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }
    }

步骤二:


    /**
     * MappedFileQueue的方法
     * @param offset 文件的最大有效数据偏移量
     */
    public void truncateDirtyFiles(long offset) {
        //待移除的文件集合
        List<MappedFile> willRemoveFiles = new ArrayList<>();

        //遍历内部所有的MappedFile文件
        for (MappedFile file : this.mappedFiles) {
            //获取当前文件自身的最大数据偏移
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
            //如果最大数据偏移量大于最大有效数据偏移量
            if (fileTailOffset > offset) {
                //如果最大有效数据偏移量大于等于该文件的起始偏移量,那么说明文件当中有一部分数据是有效的,那么设置该文件的的有效属性
                if (offset >= file.getFileFromOffset()) {
                    //设置当前文件的刷盘、提交、写入指针为当前最大有效数据偏移量
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {
                    //如果最大有效数据偏移量小于该文件的起始偏移量,那么删除该文件
                    file.destroy(1000);
                    //记录到待删除的文件集合中
                    willRemoveFiles.add(file);
                }
            }
        }

        //将等待移除的文件整体从mappedFiles中移除
        this.deleteExpiredFile(willRemoveFiles);
    }

commitLog.recoverNormally(异常恢复 - 当前版本已经不采用,简单说一下) 

 步骤一:

  @Deprecated
    public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        boolean checkDupInfo = this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Looking beginning to recover from which file
            int index = mappedFiles.size() - 1;
            MappedFile mappedFile = null;
            //倒叙遍历所有的commitlog文件执行检查恢复
            for (; index >= 0; index--) {
                mappedFile = mappedFiles.get(index);
                //首先校验当前commitlog文件是否是一个正确的文件
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this mapped file " + mappedFile.getFileName());
                    break;
                }
            }

            //从第一个正确的commitlog文件开始遍历恢复
            if (index < 0) {
                index = 0;
                mappedFile = mappedFiles.get(index);
            }

            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            long lastValidMsgPhyOffset = this.getConfirmOffset();
            // abnormal recover require dispatching
            boolean doDispatch = true;
            while (true) {
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
                int size = dispatchRequest.getMsgSize();

                if (dispatchRequest.isSuccess()) {
                    // Normal data
                    if (size > 0) {
                        lastValidMsgPhyOffset = processOffset + mappedFileOffset;
                        mappedFileOffset += size;

                        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                            if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                                this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
                            }
                        } else {
                            this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, false);
                        }
                    }
                    // Come the end of the file, switch to the next file
                    // Since the return 0 representatives met last hole, this can
                    // not be included in truncate offset
                    else if (size == 0) {
                        this.getMessageStore().onCommitLogDispatch(dispatchRequest, doDispatch, mappedFile, true, true);
                        index++;
                        if (index >= mappedFiles.size()) {
                            // The current branch under normal circumstances should
                            // not happen
                            log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                            break;
                        } else {
                            mappedFile = mappedFiles.get(index);
                            byteBuffer = mappedFile.sliceByteBuffer();
                            processOffset = mappedFile.getFileFromOffset();
                            mappedFileOffset = 0;
                            log.info("recover next physics file, " + mappedFile.getFileName());
                        }
                    }
                } else {

                    if (size > 0) {
                        log.warn("found a half message at {}, it will be truncated.", processOffset + mappedFileOffset);
                    }

                    log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                    break;
                }
            }

            processOffset += mappedFileOffset;
            // Set a candidate confirm offset.
            // In most cases, this value will be overwritten by confirmLog.init.
            // It works if some confirmed messages are lost.
            this.setConfirmOffset(lastValidMsgPhyOffset);
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            // Clear ConsumeQueue redundant data
            if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
                this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
            }
        }
        // Commitlog case files are deleted
        else {
            log.warn("The commitlog files are deleted, and delete the consume queue files");
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }
    }

 步骤二:

 /**
     * 校验当前commitlog文件是否是一个正确的文件
     * @param mappedFile 需要判断的commitlog文件
     * @return
     */
    private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();

        //获取文件开头的魔数
        int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
        //如果文件的魔数与commitlog文件的正确的魔数不一致,则直接返回false,表示不是正确的commitlog文件
        if (magicCode != MESSAGE_MAGIC_CODE) {
            return false;
        }

        int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);
        int bornhostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 8 : 20;
        int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + bornhostLength;
        long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
        //如果消息存盘时间为 0,则直接返回false,表示为存储任何消息
        if (0 == storeTimestamp) {
            return false;
        }

        //如果messafeIndexEnable为true, 并且使用安全的消息索引功能,即可靠模式,那么Index文件进行校验
        if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
            && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
            //如果StoreCheckpoint的最小时间戳索引大于等于当前文件的存储时间,那么返回true,表示当前文件至少有部分是可靠的
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
                log.info("find check timestamp, {} {}",
                    storeTimestamp,
                    UtilAll.timeMillisToHumanString(storeTimestamp));
                return true;
            }
        } else {
            //如果StoreCheckpoint的最小时间戳大于等于当前文件的存储时间,那么返回true,表示当前文件至少有部分是可靠的
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
                log.info("find check timestamp, {} {}",
                    storeTimestamp,
                    UtilAll.timeMillisToHumanString(storeTimestamp));
                return true;
            }
        }

        return false;
    }

恢复TopicQueueTable

 步骤一:

    public void recoverTopicQueueTable() {
        //获取commitlog的最小偏移量
        long minPhyOffset = this.commitLog.getMinOffset();
        this.consumeQueueStore.recoverOffsetTable(minPhyOffset);
    }

 步骤二:

    /**
     * DefaultMessageStore的方法
     *
     * @param minPhyOffset
     */
    public void recoverOffsetTable(long minPhyOffset) {
        ConcurrentMap<String, Long> cqOffsetTable = new ConcurrentHashMap<>(1024);
        ConcurrentMap<String, Long> bcqOffsetTable = new ConcurrentHashMap<>(1024);

        //遍历consumequeueQueueTable。即consumequeue文件的集合
        for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueueInterface logic : maps.values()) {
                String key = logic.getTopic() + "-" + logic.getQueueId();

                long maxOffsetInQueue = logic.getMaxOffsetInQueue();
                if (Objects.equals(CQType.BatchCQ, logic.getCQType())) {
                    //将"topicName-queueId"作为key,将当前queueId下面最大的相对偏移量作为value存入table
                    bcqOffsetTable.put(key, maxOffsetInQueue);
                } else {
                    //将"topicName-queueId"作为key,将当前queueId下面最大的相对偏移量作为value存入table
                    cqOffsetTable.put(key, maxOffsetInQueue);
                }

                this.correctMinOffset(logic, minPhyOffset);
            }
        }

        //Correct unSubmit consumeOffset
        if (messageStoreConfig.isDuplicationEnable()) {
            SelectMappedBufferResult lastBuffer = null;
            long startReadOffset = messageStore.getCommitLog().getConfirmOffset() == -1 ? 0 : messageStore.getCommitLog().getConfirmOffset();
            while ((lastBuffer = messageStore.selectOneMessageByOffset(startReadOffset)) != null) {
                try {
                    if (lastBuffer.getStartOffset() > startReadOffset) {
                        startReadOffset = lastBuffer.getStartOffset();
                        continue;
                    }

                    ByteBuffer bb = lastBuffer.getByteBuffer();
                    int magicCode = bb.getInt(bb.position() + 4);
                    if (magicCode == CommitLog.BLANK_MAGIC_CODE) {
                        startReadOffset += bb.getInt(bb.position());
                        continue;
                    } else if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE) {
                        throw new RuntimeException("Unknown magicCode: " + magicCode);
                    }

                    lastBuffer.getByteBuffer().mark();
                    DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(lastBuffer.getByteBuffer(), true, true, true);
                    if (!dispatchRequest.isSuccess())
                        break;
                    lastBuffer.getByteBuffer().reset();

                    MessageExt msg = MessageDecoder.decode(lastBuffer.getByteBuffer(), true, false, false, false, true);
                    if (msg == null)
                        break;

                    String key = msg.getTopic() + "-" + msg.getQueueId();
                    cqOffsetTable.put(key, msg.getQueueOffset() + 1);
                    startReadOffset += msg.getStoreSize();
                } finally {
                    if (lastBuffer != null)
                        lastBuffer.release();
                }

            }
        }

        //设置为topicQueueTable
        this.setTopicQueueTable(cqOffsetTable);
        this.setBatchTopicQueueTable(bcqOffsetTable);
    }

2.3.5  计划消息服务在消息存储服务之后加载 加载RocketMQ延迟消息的服务,包括延时等级,配置文件等等

步骤一:

  @Override
    public boolean load() {
        boolean result = super.load();
        //解析延迟级别
        result = result && this.parseDelayLevel();
        //校正延迟偏移
        result = result && this.correctDelayOffset();
        return result;
    }

步骤二:

    public boolean load() {
        String fileName = null;
        try {
            //获取配置文件路径
            fileName = this.configFilePath();
            //加载配置文件得到内部的json字符串数据
            String jsonString = MixAll.file2String(fileName);

            //如果加载的json字符串为null 或者长度为0,那么就加载bak备份文件
            if (null == jsonString || jsonString.length() == 0) {
                return this.loadBak();
            } else {
                //如果加载的json字符串不为空,那么将json字符串反序列化为对象属性
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " failed, and try to load backup file", e);
            return this.loadBak();
        }
    }

    public abstract String configFilePath();

    private boolean loadBak() {
        String fileName = null;
        try {
            //获取配置文件路径
            fileName = this.configFilePath();
            //加载配置文件得到内部的json字符串数据
            String jsonString = MixAll.file2String(fileName + ".bak");
            //如果加载的json字符串不为null 并且 长度 > 0,就进行反序列化
            if (jsonString != null && jsonString.length() > 0) {
                this.decode(jsonString);
                log.info("load " + fileName + " OK");
                return true;
            }
        } catch (Exception e) {
            log.error("load " + fileName + " Failed", e);
            return false;
        }

        return true;
    }

步骤三(解析延迟级别):

public boolean parseDelayLevel() {
        HashMap<String, Long> timeUnitTable = new HashMap<>();
        timeUnitTable.put("s", 1000L);
        timeUnitTable.put("m", 1000L * 60);
        timeUnitTable.put("h", 1000L * 60 * 60);
        timeUnitTable.put("d", 1000L * 60 * 60 * 24);

        /**
         * 获取延迟级别的字符串形式
         * 默认的延迟级别:
         * messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
         */
        String levelString = this.brokerController.getMessageStoreConfig().getMessageDelayLevel();
        try {
            //以空格划分为延迟级别字符数组
            String[] levelArray = levelString.split(" ");
            //遍历延迟级别数组
            for (int i = 0; i < levelArray.length; i++) {
                String value = levelArray[i];
                //截取value字符串的最后一个字符,作为字符串ch,ch为延迟级别的单位
                String ch = value.substring(value.length() - 1);
                //根据延迟级别的单位从timeUnitTable集合中获取对应的value值
                Long tu = timeUnitTable.get(ch);

                int level = i + 1;
                //更新最大延迟级别
                if (level > this.maxDelayLevel) {
                    this.maxDelayLevel = level;
                }
                //获取messageDelayLevel中去掉单位的延迟事件的大小(只包含数字,不包含单位)
                long num = Long.parseLong(value.substring(0, value.length() - 1));
                /**
                 * 获取延迟时间,以s为单位:
                 * 例如:
                 *      tu = 1000L * 60
                 *      num = 3
                 *      delayTimeMillis = tu * num
                 */
                long delayTimeMillis = tu * num;
                /**
                 * 将延迟级别以及对应的延迟时间以键值对的形式放入delayLevelTable中,
                 * delayLevelTable为ConcurrentMap集合,线程安全的集合
                 */
                this.delayLevelTable.put(level, delayTimeMillis);
                /**
                 * 是否启用异步传递,默认不启用
                 * enableAsyncDeliver = false
                 */
                if (this.enableAsyncDeliver) {
                    //deliverPendingTable:将延迟等级,及 阻塞队列以键值对的形式放入传递待定表中
                    this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
                }
            }
        } catch (Exception e) {
            log.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
            return false;
        }

        return true;
    }

步骤四(校正延迟偏移):

 /**
     * 校正延迟偏移量
     * @return
     */
    public boolean correctDelayOffset() {
        try {
            /**
             * 遍历delayLevelTable:
             * delayLevelTable键存储的是:延迟级别
             * delayLevelTable值存储的是:延迟级别对应的时间长短(以s为单位)
             */
            for (int delayLevel : delayLevelTable.keySet()) {
                //获得一个消费队列接口
                ConsumeQueueInterface cq =
                    brokerController.getMessageStore().getQueueStore().findOrCreateConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                        delayLevel2QueueId(delayLevel));
                /**
                 * 偏移量表offsetTable是一个ConcurrentMap集合,线程安全的集合
                 * offsetTable键存储的是:延迟级别
                 * offsetTable值存储的是:延迟级别对应的偏移量
                 */
                //根据延迟级别获取当前的延迟偏移量
                Long currentDelayOffset = offsetTable.get(delayLevel);
                //延迟偏移量 == null 或者 消费队列接口 == null,终止
                if (currentDelayOffset == null || cq == null) {
                    continue;
                }
                //正确的延迟偏移量 = 当前的延迟偏移量
                long correctDelayOffset = currentDelayOffset;
                //获取最小偏移量
                long cqMinOffset = cq.getMinOffsetInQueue();
                //获取最大偏移量
                long cqMaxOffset = cq.getMaxOffsetInQueue();

                if (currentDelayOffset < cqMinOffset) {
                    correctDelayOffset = cqMinOffset;
                    //调度CQ偏移无效。偏移量={},cqMinOffset={},cqMaxOffset=},queueId=}
                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
                        currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
                }

                if (currentDelayOffset > cqMaxOffset) {
                    correctDelayOffset = cqMaxOffset;
                    //调度CQ偏移无效。偏移量={},cqMinOffset={},cqMaxOffset=},queueId=}
                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
                        currentDelayOffset, cqMinOffset, cqMaxOffset, cq.getQueueId());
                }
                if (correctDelayOffset != currentDelayOffset) {
                    //从{}到{}的正确延迟偏移量[delayLevel{}]
                    log.error("correct delay offset [ delayLevel {} ] from {} to {}", delayLevel, currentDelayOffset, correctDelayOffset);
                    offsetTable.put(delayLevel, correctDelayOffset);
                }
            }
        } catch (Exception e) {
            log.error("correctDelayOffset exception", e);
            return false;
        }
        return true;
    }

 2.3.6  初始化Broker通信层,创建netty远程服务

 步骤一:

protected void initializeRemotingServer() throws CloneNotSupportedException {
        //创建broker的netty远程服务,端口号为10911, 可以用来处理客户端的所有请求
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        //创建一个broker的netty快速远程服务的配置
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        //设置快递netty远程服务的配置监听端口号为普通服务端口号 - 2, 默认10909
        int listeningPort = nettyServerConfig.getListenPort() - 2;
        if (listeningPort < 0) {
            listeningPort = 0;
        }
        //设置broker的netty快速远程服务的端口号
        fastConfig.setListenPort(listeningPort);

        //创建broker的netty快读远程服务,这就是所谓的快速通道,对应可以处理客户端除了拉取消息之外的所有请求,所谓的VIP端口
        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
    }

2.3.7 创建各种线程池

主要有两类:

 第一类:负责处理别人发过来的请求;

 第二类:负责处理自己的一些后台任务 *

这一步创建了很多线程池,因为RocketMQ为了性能,对过多的请求进行异步优化处理,因此需要许多线程池

 步骤一:

/**
     * Initialize resources including remoting server and thread executors.
     * 初始化资源,包括远程处理服务器和线程执行器。
     */
    protected void initializeResources() {
        //创建Broker控制器计划线程池
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
            new ThreadFactoryImpl("BrokerControllerScheduledThread", true, getBrokerIdentity()));

        //创建处理发送消息请求的线程池
        this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getSendMessageThreadPoolNums(),
            this.brokerConfig.getSendMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.sendThreadPoolQueue,
            new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));

        //创建处理拉取消息请求的线程池
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_", getBrokerIdentity()));

        //创建精简拉取消息执行器
        this.litePullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getLitePullMessageThreadPoolNums(),
            this.brokerConfig.getLitePullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.litePullThreadPoolQueue,
            new ThreadFactoryImpl("LitePullMessageThread_", getBrokerIdentity()));

        this.putMessageFutureExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPutMessageFutureThreadPoolNums(),
            this.brokerConfig.getPutMessageFutureThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.putThreadPoolQueue,
            new ThreadFactoryImpl("SendMessageThread_", getBrokerIdentity()));

        //创建ack消息执行器
        this.ackMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getAckMessageThreadPoolNums(),
            this.brokerConfig.getAckMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.ackThreadPoolQueue,
            new ThreadFactoryImpl("AckMessageThread_", getBrokerIdentity()));

        //创建查询请求的线程池
        this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.queryThreadPoolQueue,
            new ThreadFactoryImpl("QueryMessageThread_", getBrokerIdentity()));

        //broker 管理线程池,作为默认处理器的线程池
        this.adminBrokerExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getAdminBrokerThreadPoolNums(),
            this.brokerConfig.getAdminBrokerThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.adminBrokerThreadPoolQueue,
            new ThreadFactoryImpl("AdminBrokerThread_", getBrokerIdentity()));

        //创建客户端管理的线程池
        this.clientManageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getClientManageThreadPoolNums(),
            this.brokerConfig.getClientManageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.clientManagerThreadPoolQueue,
            new ThreadFactoryImpl("ClientManageThread_", getBrokerIdentity()));

        //创建broker心跳处理的线程池
        this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.heartbeatThreadPoolQueue,
            new ThreadFactoryImpl("HeartbeatThread_", true, getBrokerIdentity()));

        //创建消费者管理的线程池
        this.consumerManageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getConsumerManageThreadPoolNums(),
            this.brokerConfig.getConsumerManageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumerManagerThreadPoolQueue,
            new ThreadFactoryImpl("ConsumerManageThread_", true, getBrokerIdentity()));

        //创建reply消息请求的线程池
        this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.replyThreadPoolQueue,
            new ThreadFactoryImpl("ProcessReplyMessageThread_", getBrokerIdentity()));

        //创建事务消息相关处理的线程池
        this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.endTransactionThreadPoolQueue,
            new ThreadFactoryImpl("EndTransactionThread_", getBrokerIdentity()));

        //创建负载均衡的线程池
        this.loadBalanceExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
            this.brokerConfig.getLoadBalanceProcessorThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.loadBalanceThreadPoolQueue,
            new ThreadFactoryImpl("LoadBalanceProcessorThread_", getBrokerIdentity()));

        //创建同步broker成员组执行器服务
        this.syncBrokerMemberGroupExecutorService = new ScheduledThreadPoolExecutor(1,
            new ThreadFactoryImpl("BrokerControllerSyncBrokerScheduledThread", getBrokerIdentity()));
        //创建broker心跳执行器服务
        this.brokerHeartbeatExecutorService = new ScheduledThreadPoolExecutor(1,
            new ThreadFactoryImpl("rokerControllerHeartbeatScheduledThread", getBrokerIdentity()));

        //创建topic队列映射清理服务
        this.topicQueueMappingCleanService = new TopicQueueMappingCleanService(this);
    }

2.3.8 RocketMQ底层通信基于netty,这里注册netty请求处理器。

1. registerProcessor方法将处理器和对应的线程池绑定为一个Pair对象,并且将这个pair对象放入processorTable中, 其值就是pair对象,key就是对应的请求编码RequestCode。

2. 每个请求,都会根据自己携带的RequestCode在processorTable中查找对应的处理器以及对应的执行器线程池来处理请求。 RocketMQ通过这样的方式来提升处理请求的性能。

  /**
     * 从这里的源码能够看出来,除了pullMessageProcessor处理器只会被注册到remotingServer之外,
     * 其他处理器会被注册到remotingServer和fastRemotingServer这两个netty服务中。
     *
     * 所以Vip通道服务不能够处理拉取消息的请求
     */
    public void registerProcessor() {
        /*
         * SendMessageProcessor
         */
        sendMessageProcessor.registerSendMessageHook(sendMessageHookList);
        sendMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendMessageProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
        /**
         * PullMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.LITE_PULL_MESSAGE, this.pullMessageProcessor, this.litePullMessageExecutor);
        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
        /**
         * PeekMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.PEEK_MESSAGE, this.peekMessageProcessor, this.pullMessageExecutor);
        /**
         * PopMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.POP_MESSAGE, this.popMessageProcessor, this.pullMessageExecutor);

        /**
         * AckMessageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, this.ackMessageProcessor, this.ackMessageExecutor);
        /**
         * ChangeInvisibleTimeProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CHANGE_MESSAGE_INVISIBLETIME, this.changeInvisibleTimeProcessor, this.ackMessageExecutor);
        /**
         * notificationProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.NOTIFICATION, this.notificationProcessor, this.pullMessageExecutor);

        /**
         * pollingInfoProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.POLLING_INFO, this.pollingInfoProcessor, this.pullMessageExecutor);

        /**
         * ReplyMessageProcessor
         */

        replyMessageProcessor.registerSendMessageHook(sendMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);

        /**
         * QueryMessageProcessor
         */
        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);

        /**
         * ClientManageProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);

        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientManageProcessor, this.clientManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientManageProcessor, this.clientManageExecutor);

        /**
         * ConsumerManageProcessor
         */
        ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
        this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
        this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

        this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);

        /**
         * QueryAssignmentProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_ASSIGNMENT, queryAssignmentProcessor, loadBalanceExecutor);
        this.remotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SET_MESSAGE_REQUEST_MODE, queryAssignmentProcessor, loadBalanceExecutor);

        /**
         * EndTransactionProcessor
         */
        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, endTransactionProcessor, this.endTransactionExecutor);

        /*
         * Default
         */
        AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
        this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
        this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
    }

2.3.9 启动一系列定时周期任务(定时调度线程池的后台执行)

在注册了netty消息处理器之后,将会启动一系列的定时任务 这些定时任务由BrokerController中的scheduledExecutorService去执行,该线程池只有一个线程。

 步骤一:

protected void initializeScheduledTasks() {

        /**
         * 初始化broker计划任务
         */
        initializeBrokerScheduledTasks();

        //每隔5s进行ScheduledTask元数据的刷新
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.brokerOuterAPI.refreshMetadata();
                } catch (Exception e) {
                    LOG.error("ScheduledTask refresh metadata exception", e);
                }
            }
        }, 10, 5, TimeUnit.SECONDS);

        //如果namesrvAddr不为null,每隔120s,更新一次nameServer地址列表
        if (this.brokerConfig.getNamesrvAddr() != null) {
            this.updateNamesrvAddr();
            LOG.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
            // also auto update namesrv if specify
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.updateNamesrvAddr();
                    } catch (Throwable e) {
                        LOG.error("Failed to update nameServer address list", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
        //每隔120s,获取nameServer的地址
        else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                    } catch (Throwable e) {
                        LOG.error("Failed to fetch nameServer address", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
    }

步骤二:

protected void initializeBrokerScheduledTasks() {
        //延迟启动的时间
        final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
        final long period = TimeUnit.DAYS.toMillis(1);
        //每隔24小时执行一次,打印昨天生产和消费的消息数量
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.getBrokerStats().record();
                } catch (Throwable e) {
                    LOG.error("BrokerController: failed to record broker stats", e);
                }
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);

        //每隔5s将消费者offset进行持久化操作,存入consumerOffset.json文件中
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerOffsetManager.persist();
                } catch (Throwable e) {
                    LOG.error(
                        "BrokerController: failed to persist config file of consumerOffset", e);
                }
            }
        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        //每隔10s将消费过滤信息 和 消费者订单信息 进行持久化,存入consumerFilter.json文件 和 consumerOrderInfo.json中
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerFilterManager.persist();
                    BrokerController.this.consumerOrderInfoManager.persist();
                } catch (Throwable e) {
                    LOG.error(
                        "BrokerController: failed to persist config file of consumerFilter or consumerOrderInfo",
                        e);
                }
            }
        }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

        /**
         * 每隔3m将检查消费者的消费进度
         * 每当消费者 isDisableConsumeIfConsumerReadSlowly (消费者消费缓慢)= true(默认false) 并且 进度落后阈值的时候 ,就停止消费者消费,保护broker,避免消息积压
         */
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.protectBroker();
                } catch (Throwable e) {
                    LOG.error("BrokerController: failed to protectBroker", e);
                }
            }
        }, 3, 3, TimeUnit.MINUTES);

        /**
         *  每隔1s将打印
         *  发送消息线程池队列,
         *  拉取消息线程池队列,
         *  查询消息线程池队列,
         *  精简拉取线程池队列、
         *  结束事务线程池队列
         *  客户端管理器线程池队列、
         *  心跳线程池队列、
         *  ack线程池队列的大小
         */
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.printWaterMark();
                } catch (Throwable e) {
                    LOG.error("BrokerController: failed to print broker watermark", e);
                }
            }
        }, 10, 1, TimeUnit.SECONDS);

        //每隔1m将打印已经存储在commitlog提交日志文件中,但尚未分派到consume queue(消费队列)的字节数
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //调度任务落后于提交日志{}字节
                    LOG.info("Dispatch task fall behind commit log {}bytes",
                        BrokerController.this.getMessageStore().dispatchBehindBytes());
                } catch (Throwable e) {
                    LOG.error("Failed to print dispatchBehindBytes", e);
                }
            }
        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

        //如果没有开启DLeger服务(DLeger开启后表示支持高可用的主从自动切换)并且 不允许重复复制 并且 没有启动控制器模式,不支持自动切换代理的角色。
        if (!messageStoreConfig.isEnableDLegerCommitLog() && !messageStoreConfig.isDuplicationEnable() && !brokerConfig.isEnableControllerMode()) {
            //如果当前的broker是从节点
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                //根据是否配置了HA地址 并且 HA地址的长度是否大于等于设置的默认最小长度(6),来更新HA地址
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= HA_ADDRESS_MIN_LENGTH) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }

                //每隔3s同步一次slave从节点信息
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            //如果当前时间 - 上一次的同步时间 > 6s
                            if (System.currentTimeMillis() - lastSyncTimeMs > 60 * 1000) {
                                /**
                                 *  更新从机配置信息,包括:
                                 *  同步topic配置信息
                                 *  同步消费者Offset信息
                                 *  同步延迟Offset信息
                                 *  同步订阅组配置信息
                                 *  同步消息请求模式信息
                                 *  如果启用了计时器车轮,还需要同步计时器指标
                                 */
                                BrokerController.this.getSlaveSynchronize().syncAll();
                                //更新上一次同步的时间
                                lastSyncTimeMs = System.currentTimeMillis();
                            }
                            //timer checkpoint, latency-sensitive, so sync it more frequently
                            BrokerController.this.getSlaveSynchronize().syncTimerCheckPoint();
                        } catch (Throwable e) {
                            //未能同步从属设备的所有配置
                            LOG.error("Failed to sync all config for slave.", e);
                        }
                    }
                }, 1000 * 10, 3 * 1000, TimeUnit.MILLISECONDS);

            } else {
                //如果是主节点,每隔60s将打印主从节点的差异
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            LOG.error("Failed to print diff of master and slave.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }

        //如果启动控制器模式,支持自动切换代理的角色。
        if (this.brokerConfig.isEnableControllerMode()) {
            //
            this.updateMasterHAServerAddrPeriodically = true;
        }
    }

2.3.10 初始化事务消息相关服务

初始化服务采用Java SPI的方式进行加载 主要初始化三个服务:

1.transactionalMessageService(事务消息服务):用于处理、检查事务消息 2.transactionalMessageCheckListener(事务消息监查监听器):监听回查消息 3.transactionalMessageCheckService(事务消息检查服务):提供了事务消息回查的逻辑,默认情况下 6s以上没commit/rollback的事务消息才会触发事务回查,而如果回查次数超过了15次则丢弃事务

private void initialTransaction() {
        /**
         * 基于Java的SPI机制,查找"META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService"文件里面的SPI实现
         * 事务消息服务:用于处理、检查事务消息。
         */
        this.transactionalMessageService = ServiceProvider.loadClass(TransactionalMessageService.class);
        if (null == this.transactionalMessageService) {
            //如果没有通过SPI指定具体的实现,那么就使用默认实现,TransactionalMessageServiceImpl
            this.transactionalMessageService = new TransactionalMessageServiceImpl(
                    new TransactionalMessageBridge(this, this.getMessageStore()));
            //加载默认事务消息挂钩服务:{}
            LOG.warn("Load default transaction message hook service: {}",
                    TransactionalMessageServiceImpl.class.getSimpleName());
        }

        /**
         * 基于Java的SPI机制,查找"META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener"文件里面的SPI实现
         * 事务消息回查服务监听器:监听回查消息
         */
        this.transactionalMessageCheckListener = ServiceProvider.loadClass(
                AbstractTransactionalMessageCheckListener.class);
        if (null == this.transactionalMessageCheckListener) {
            //如果没有通过SPI制定具体的实现,那么使用默认实现,DefaultTransactionalMessageCheckListener
            this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
            //加载默认丢弃消息挂钩服务:{}
            LOG.warn("Load default discard message hook service: {}",
                    DefaultTransactionalMessageCheckListener.class.getSimpleName());
        }
        this.transactionalMessageCheckListener.setBrokerController(this);
        /**
         * 事务消息检查服务:提供了事务消息回查的逻辑。
         *
         * 创建TransactionMessageCheckService服务,该服务内部有一个线程
         * 默认情况下,6秒以上没commit/rollback的事务消息才会触发事务回查,而如果回查次数超过15次则丢弃事务。
         */
        this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
    }

2.3.11 初始化Acl权限相关服务:加载权限相关校验器

同样是基于Java的SPI机制进行查找,并且会将找到校验器注册到RpcHook中,在请求执行之前会执行权限校验。

private void initialAcl() {
        //校验是都开启了Acl,默认为false,所以直接返回了
        if (!this.brokerConfig.isAclEnable()) {
            //代理未启用acl
            LOG.info("The broker dose not enable acl");
            return;
        }

        //如果开启了Acl,则首先通过SPI机制获取AccessValidator
        List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
        if (accessValidators.isEmpty()) {
            //ServiceProvider加载了没有AccessValidator,使用default org.apache.rocketmq.acl.plain.plainAccessValidator(默认的访问验证器)
            LOG.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
            accessValidators.add(new PlainAccessValidator());
        }

        //将校验器存入accessValidatorMap,并且注册到RpcHook中,在请求之前会执行校验
        for (AccessValidator accessValidator : accessValidators) {
            final AccessValidator validator = accessValidator;
            accessValidatorMap.put(validator.getClass(), validator);
            //注册RPC钩子函数
            this.registerServerRPCHook(new RPCHook() {
            
                @Override
                public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
                    //Do not catch the exception
                    //在执行之前会进行校验
                    validator.validate(validator.parse(request, remoteAddr));
                }

                @Override
                public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {
                }

            });
        }
    }

2.3.12 初始化RPC调用的钩子函数

RpcHook是RocketMQ提供的钩子类,提供一种类似于类似于AOP的功能。 可以在请求被处理之前和响应被返回之前执行对应的方法。

    private void initialRpcHooks() {

        //通过SPI机制获取RPCHook
        List<RPCHook> rpcHooks = ServiceProvider.load(RPCHook.class);
        //如果没有配置的RpcHook,那么直接返回
        if (rpcHooks == null || rpcHooks.isEmpty()) {
            return;
        }
        //遍历并且注册所有的RpcHook
        for (RPCHook rpcHook : rpcHooks) {
            this.registerServerRPCHook(rpcHook);
        }
    }

2.3.13 Tls传输相关配置,通信安全的文件监听模块,用来观察网络加密配置文件的更改

//Tls传输相关配置,通信安全的文件监听模块,用来观察网络加密配置文件的更改
            //默认是PERMISSIVE,因此会进入代码块
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                // Register a listener to reload SslContext
                try {
                    //实例化文件监听服务,并且初始化事务消息服务
                    fileWatchService = new FileWatchService(
                        new String[] {
                            TlsSystemConfig.tlsServerCertPath,
                            TlsSystemConfig.tlsServerKeyPath,
                            TlsSystemConfig.tlsServerTrustCertPath
                        },
                        new FileWatchService.Listener() {
                            boolean certChanged, keyChanged = false;

                            @Override
                            public void onChanged(String path) {
                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                    //信任证书已更改,请重新加载ssl上下文
                                    LOG.info("The trust certificate changed, reload the ssl context");
                                    reloadServerSslContext();
                                }
                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                    certChanged = true;
                                }
                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                    keyChanged = true;
                                }
                                if (certChanged && keyChanged) {
                                    //证书和私钥已更改,请重新加载ssl上下文
                                    LOG.info("The certificate and private key changed, reload the ssl context");
                                    certChanged = keyChanged = false;
                                    reloadServerSslContext();
                                }
                            }

                            private void reloadServerSslContext() {
                                ((NettyRemotingServer) remotingServer).loadSslContext();
                                ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                            }
                        });
                } catch (Exception e) {
                    result = false;
                    LOG.warn("FileWatchService created error, can't load the certificate dynamically");
                }
            }
        }

        return result;
    }

2.4 添加钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作


            /**
             * 9. 添加钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作
             * 关闭定时器,向所有nameServer注销注册信息
             */
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);

                @Override
                public void run() {
                    synchronized (this) {
                        log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                        if (!this.hasShutdown) {
                            this.hasShutdown = true;
                            long beginTime = System.currentTimeMillis();
                            /**
                             * 关闭定时器,向nameServer注销注册信息
                             * 并且还会在messageStore#shutdown方法中将abort临时文件删除
                             */
                            controller.shutdown();
                            long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                        }
                    }
                }
            }, "ShutdownHook"));

2.5 调用controller.start();

启动broker, 发送心跳包的入口

    public void start() throws Exception {

        this.shouldStartTime = System.currentTimeMillis() + messageStoreConfig.getDisappearTimeAfterStart();

        //副本数 > 1 && 是否启用从机代理主机 || 是启动控制器模式,支持自动切换代理的角色。
        if (messageStoreConfig.getTotalReplicas() > 1 && this.brokerConfig.isEnableSlaveActingMaster() || this.brokerConfig.isEnableControllerMode()) {
            isIsolated = true;
        }
        //代理外部API 不为空
        if (this.brokerOuterAPI != null) {
            //启动代理外部API
            this.brokerOuterAPI.start();
        }

        //启动基础服务
        startBasicService();

        //如果 isIsolated 为false && 没有开启DLeger的相关配置 && 没有开启重复复制的功能
        if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) {
            //更改特殊服务的状态 将brokerId更改为0
            changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID);
            /**
             * 在broker首次启动时,强制进行broker的注册
             * 强制注册当前broker信息到所有的nameServer中
             */
            this.registerBrokerAll(true, false, true);
        }

        /**
         * 设置一个定时任务,默认情况下每隔30s调用registorBrokerAll方法向所有的nameServer进行一次注册broker信息,时间间隔可以配置registorNameServerPeriod属性,允许的值是在1万到6万毫秒之间,也就是(10s到60s的范围)
         * 这个定时任务就是broker向nameServer发送的心跳包的定时任务,包括topic名,读、写队列个数,队列权限,是否有序等信息。
         */
        scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
            @Override
            public void run0() {
                try {
                    if (System.currentTimeMillis() < shouldStartTime) {
                        BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime);
                        return;
                    }
                    if (isIsolated) {
                        BrokerController.LOG.info("Skip register for broker is isolated");
                        return;
                    }
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    BrokerController.LOG.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

        /**
         * isEnableSlaveActingMaster(),默认为false。不启用slave代理master
         * 故障切换时,从设备将充当主设备,例如,如果这设备关闭,计时器或事务消息在从属设备中过期
         */
        if (this.brokerConfig.isEnableSlaveActingMaster()) {
            scheduleSendHeartbeat();

            scheduledFutures.add(this.syncBrokerMemberGroupExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) {
                @Override
                public void run0() {
                    try {
                        BrokerController.this.syncBrokerMemberGroup();
                    } catch (Throwable e) {
                        BrokerController.LOG.error("sync BrokerMemberGroup error. ", e);
                    }
                }
            }, 1000, this.brokerConfig.getSyncBrokerMemberGroupPeriod(), TimeUnit.MILLISECONDS));
        }

        /**
         * isEnableControllerMode():是否启动控制器模式,支持自动切换代理的角色。
         * 默认为false
         */
        if (this.brokerConfig.isEnableControllerMode()) {
            scheduleSendHeartbeat();
        }

        /**
         * isSkipPreOnline():默认为false
         */
        if (brokerConfig.isSkipPreOnline()) {
            //无条件启动broker
            startServiceWithoutCondition();
        }
    }

2.5.1 启动基础服务 startBasicService();

protected void startBasicService() throws Exception {

        /**
         * 启动消息存储服务
         * 处理消息存储相关的日志,比如CommitLog、ConsumeQueue等
         */
        if (this.messageStore != null) {
            this.messageStore.start();
        }

        //启动计时器消息存储
        if (this.timerMessageStore != null) {
            this.timerMessageStore.start();
        }

        //启动复制副本管理器功能
        if (this.replicasManager != null) {
            this.replicasManager.start();
        }

        if (remotingServerStartLatch != null) {
            remotingServerStartLatch.await();
        }

        /**
         * 启动netty路由服务
         * broker的服务端,处理消费者和生产者的请求
         *
         */
        if (this.remotingServer != null) {
            this.remotingServer.start();

            // In test scenarios where it is up to OS to pick up an available port, set the listening port back to config
            if (null != nettyServerConfig && 0 == nettyServerConfig.getListenPort()) {
                nettyServerConfig.setListenPort(remotingServer.localListenPort());
            }
        }

        /**
         * 启动快速netty快速路由服务
         * 只给消息生产者的服务端
         */
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }

        this.storeHost = new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort());

        for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
            if (brokerAttachedPlugin != null) {
                brokerAttachedPlugin.start();
            }
        }

        if (this.popMessageProcessor != null) {
            this.popMessageProcessor.getPopLongPollingService().start();
            this.popMessageProcessor.getPopBufferMergeService().start();
            this.popMessageProcessor.getQueueLockManager().start();
        }

        if (this.ackMessageProcessor != null) {
            this.ackMessageProcessor.startPopReviveService();
        }

        if (this.topicQueueMappingCleanService != null) {
            this.topicQueueMappingCleanService.start();
        }

        /**
         * 文件监听器启动,关注文件变更的服务,及时加载最新的ssl证书
         * 通过对文件进行hash,判断新的hash和当前hash是否一致
         * 如果不一致,表示文件变更了
         */
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }

        /**
         * 长轮询拉取消息挂起服务启动
         * 处理push模式消费,或者延迟消费的服务
         */
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }

        /**
         * 客户端连接心跳服务启动
         * 启动定时器 每10s清理没用的链接
         */
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }

        /**
         * 过滤服务管理器启动
         * 启动定时器,每30s 通过shell脚本启动startfsrv.sh
         * 自定义消息过滤服务,如果用系统的tag或者是sql  不需要开启该服务
         */
        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }

        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }

        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }

        if (this.broadcastOffsetManager != null) {
            this.broadcastOffsetManager.start();
        }

        if (this.escapeBridge != null) {
            this.escapeBridge.start();
        }

        if (this.topicRouteInfoManager != null) {
            this.topicRouteInfoManager.start();
        }

        if (this.brokerPreOnlineService != null) {
            this.brokerPreOnlineService.start();
        }

        //Init state version after messageStore initialized.
        //初始化状态版本
        this.topicConfigManager.initStateVersion();
    }

2.5.2 在broker首次启动时,强制进行broker的注册registerBrokerAll

    /**
     * 把broker注册到所有的nameServer中,发送心跳包
     * @param checkOrderConfig  是否校验 顺序消息配置
     * @param oneway            是否是单向发送, 单向发送不接收返回值
     * @param forceRegister     是否是强制注册
     */
    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {

        /**
         * 根据topicConfigManager中的topic信息构建topic信息的传输协议对象 topicConfigWrapper
         * 在此前topicConfigManager.load()方法中已经加载了所有的topic信息
         * topicConfigWrapper 中封装了该broker上的topic信息和dataVersion
         */
        TopicConfigAndMappingSerializeWrapper topicConfigWrapper = new TopicConfigAndMappingSerializeWrapper();

        //设置版本号
        topicConfigWrapper.setDataVersion(this.getTopicConfigManager().getDataVersion());
        //设置TopicConfigTable表
        topicConfigWrapper.setTopicConfigTable(this.getTopicConfigManager().getTopicConfigTable());

        //设置TopicQueueMappingInfoMap
        topicConfigWrapper.setTopicQueueMappingInfoMap(this.getTopicQueueMappingManager().getTopicQueueMappingTable().entrySet().stream().map(
            entry -> new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), TopicQueueMappingDetail.cloneAsMappingInfo(entry.getValue()))
        ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));

        /**
         * 这块代码的作用是将topicConfigWrapper中的值取出来重新封装一遍,又再塞回topicConfigWrapper
         * 理解:这昂做的目的主要是为了将this.brokerConfig.getBrokerPermission()的属性值set进去
         * 不过这并不是重要的细节,我们只需要知道topicConfigWrapper中至少包含了该broker上的topic信息以及dataVersion即可
         */
        //如果当前broker权限不支持读或者写
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                //那么重新配置topic权限
                TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        topicConfig.getPerm() & this.brokerConfig.getBrokerPermission(), topicConfig.getTopicSysFlag());
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }

        /**
         * 这一块内容是重点
         *
         * needRegister()方法用于判断当前broker是否需要向nameServer进行注册,当forceRegistor参数为true时,表示强制注册
         * 那么该方法的结果是无所谓的,如果forceRegistor为false,那么broker是否需要向nameServer注册就得看这个方法的结果了
         */
        //如果forceRegistor为true,表示强制注册, 或者如果当前broker应该注册,那么向nameServer进行注册
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isInBrokerContainer())) {
            //执行注册
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

2.5.3 needRegister

 /**
     * broker是否需要向nameServer中注册
     * @param clusterName  集群名
     * @param brokerAddr   broker地址
     * @param brokerName   broker名字
     * @param brokerId     brokerId
     * @param timeoutMills 超时时间
     * @param isInBrokerContainer 是否在broker容器中
     * @return broker是否需要向nameServer中注册
     */
    private boolean needRegister(final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final int timeoutMills,
        final boolean isInBrokerContainer) {

        /**
         * 根据topicConfigManager中的topic信息构建topic信息的传输协议对象 topicConfigWrapper
         * 在此前topicConfigManager.load()方法中已经加载了所有topic信息
         * topicConfigWrapper 中封装了该broker上的topic信息和dataVersion
         */
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
        /**
         * 获取nameServer的DataVerison数据,一一对比自身数据是否一致
         * 如果有一个nameServer的DataVersion数据版本不一致则重新注册
         */
        List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills, isInBrokerContainer);
        boolean needRegister = false;
        //如果有一个和nameServer的数据版本不一致,则需要重新注册
        for (Boolean changed : changeList) {
            if (changed) {
                needRegister = true;
                break;
            }
        }
        return needRegister;
    }

2.5.4 doRegisterBrokerAll

  /**
     * 1.调用brokerOuterAPI.registerBrokerAll进行注册
     * 2.处理注册结果,registerBrokerResultList:进行master地址的更新、顺序消息Topic的配置更新
     * @param checkOrderConfig   是否检测顺序topic
     * @param oneway             是否是单向
     * @param topicConfigWrapper topic信息的传输协议包装对象
     */
    protected void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {

        if (shutdown) {
            //broker已经关闭,无需再进行注册
            BrokerController.LOG.info("BrokerController#doResterBrokerAll: broker has shutdown, no need to register any more.");
            return;
        }
        /**
         *  调用brokerOuterAPI.registerBrokerAll发送请求到NameServer进行注册,返回注册结果
         *
         *  执行注册,broker作为客户端向所有nameServer进行注册
         */
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            //包含了携带topic信息的topicConfigTable,以及版本信息的dataVersion
            //这两个信息保存在持久化文件topics.json中
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isEnableSlaveActingMaster(),
            this.brokerConfig.isCompressedRegister(),
            this.brokerConfig.isEnableSlaveActingMaster() ? this.brokerConfig.getBrokerNotActiveTimeoutMillis() : null,
            this.getBrokerIdentity());

        /**
         * 对注册结果进行处理
         */
        handleRegisterBrokerResult(registerBrokerResultList, checkOrderConfig);
    }

2.5.5 brokerOuterAPI.registerBrokerAll

    /**
     * Considering compression brings much CPU overhead to name server, stream API will not support compression and
     * compression feature is deprecated.
     * broker向nameServer注册
     * @param clusterName
     * @param brokerAddr
     * @param brokerName
     * @param brokerId
     * @param haServerAddr
     * @param topicConfigWrapper
     * @param filterServerList
     * @param oneway
     * @param timeoutMills
     * @param compressed         default false
     * @return
     */
    public List<RegisterBrokerResult> registerBrokerAll(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final boolean oneway,
            final int timeoutMills,
            final boolean enableActingMaster,
            final boolean compressed,
            final Long heartbeatTimeoutMillis,
            final BrokerIdentity brokerIdentity) {

        //创建一个CopyOnWriteArrayList类型的集合,用来保存请求的返回结果
        final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
        //获得nameServer的地址信息集合
        List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
        //如果获取到的nameServer地址信息集合 不为 null && 集合长度 > 0
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            /**
             * 封装请求头:
             * 包含了broker地址 ip:port
             * broker的id 也就是角色  id等于0,为master, id > 0 为slave
             * brokerName
             * broker集群名称
             * haServer地址
             * 是否开启压缩
             * 心跳超时毫秒(如果不为 null)
             */
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setEnableActingMaster(enableActingMaster);
            requestHeader.setCompressed(false);
            //如果心跳超时毫秒不为 null ,将其也封装在请求头里面
            if (heartbeatTimeoutMillis != null) {
                requestHeader.setHeartbeatTimeoutMillis(heartbeatTimeoutMillis);
            }

            /**
             * 封装请求体
             * 当前broker所有的topic信息,名称,读写队列数以及版本信息dataVersion
             * 使用门闩依次向各个nameServer注册
             */
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(TopicConfigAndMappingSerializeWrapper.from(topicConfigWrapper));
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            /**
             * CountDownLatch介绍:
             *      CountDownLatch是一个同步工具类,用来协调多个线程之间的同步,或者说起到线程之间的通信(而不是用作互斥的作用)。
             *      CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。
             *      当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。
             *
             * 使用CountDownLatch作为倒数计数器,用于并发控制
             * CountDownLatch 使得只有所有nameServer 的响应结果都返回时才会继续执行后续的逻辑
             *
             */
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            /**
             * 采用线程池的方式,即多线程并发的向所有的nameServer发起注册请求
             * 遍历所有的nameServer,并将注册任务registerBroker 丢进brokerOuterExecutor 线程池中执行
             */
            for (final String namesrvAddr : nameServerAddressList) {
                //并发的执行线程任务
                brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
                    @Override
                    public void run0() {
                        try {
                            /**
                             * 真正执行注册的地方
                             */
                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                            if (result != null) {
                                //保存注册结果
                                registerBrokerResultList.add(result);
                            }
                            //已完成当前broker注册到nameServer。目标主机={}
                            LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
                        } catch (Exception e) {
                            LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
                        } finally {
                            //每一个请求执行完毕,无论是正常还是异常,都需要减少一个计数
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                /**
                 * 主线程在此限时等待6000ms,直到上面的任务全部执行完毕之后,计数变为0,会唤醒主线程继续执行后面的逻辑
                 */
                if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
                    LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
                }
            } catch (InterruptedException ignore) {
            }
        }

        return registerBrokerResultList;
    }

2.5.6 registerBroker

  /**
     * registerBroker方法会通过底层的NettyClient,把这个请求发送到NameServer进行注册:
     *
     * @param namesrvAddr
     * @param oneway
     * @param timeoutMills
     * @param requestHeader
     * @param body
     * @return
     * @throws RemotingCommandException
     * @throws MQBrokerException
     * @throws RemotingConnectException
     * @throws RemotingSendRequestException
     * @throws RemotingTimeoutException
     * @throws InterruptedException
     */
    private RegisterBrokerResult registerBroker(
            final String namesrvAddr,
            final boolean oneway,
            final int timeoutMills,
            final RegisterBrokerRequestHeader requestHeader,
            final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
            InterruptedException {
        //构建远程调用请求对象,code为REGISTER_BROKER=103
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);

        /**
         * invokeSync方法:以同步的方式向客户端发送消息
         * invokeAsync方法:以异步的方式向客户端发送消息
         * invokeOneway方法:只向客户端发送消息,而不处理客户端返回的消息
         */
        //如果是单向请求,则broker发起异步请求即可返回,不必关心执行结果,注册请求不是单向请求
        if (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }

        //利用RemotingClient发送注册请求,这个RemotingClient其实就是Netty客户端
        /**
         *  最核心的就是下面这行
         *  invokeSync()方法在NettyRemotingClient类中
         *
         *  创建连接最终的核心:NettyRomotingClient底层是 - 基于Netty的Bootstrap类的connnect方法,创建了一个连接
         *  发送请求最终的核心:NettyRemotingClient底层是 - 基于Netty的Channel API,把注册的请求给发送到了NameServer就可以了
         */
        //通过remotingClient发起同步调用,非单向请求,即需要同步的获取结果
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

        //下面是处理返回结果,封装成一个RegisterBrokerResult
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                /**
                 * 解析响应数据,封装结果
                 */
                RegisterBrokerResponseHeader responseHeader =
                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
    }

2.5.7 invokeSync

 /**
     * 消息的同步发送,不仅需要将消息发送出去,还要处理消息发送的响应结果,
     *
     * 同步发送与单向发送很像,都是先根据broker地址查找连接,
     * 如果连接正常,在消息发送之前和消息发送之后就执行钩子方法,然后将消息发送出去,将消息响应结果返回
     * 如果消息发送出现异常,就关闭连接,抛出异常
     * @param addr
     * @param request
     * @param timeoutMillis
     * @return
     * @throws InterruptedException
     * @throws RemotingConnectException
     * @throws RemotingSendRequestException
     * @throws RemotingTimeoutException
     */
    @Override
    public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
        long beginStartTime = System.currentTimeMillis();

        /**
         *
         *  获取连接Channel,这个Channel可以理解成broker跟nameServer之间建立的一个连接
         */
        final Channel channel = this.getAndCreateChannel(addr);

        if (channel != null && channel.isActive()) {
            try {
                //在执行发送消息之前,执行Rpc钩子
                doBeforeRpcHooks(addr, request);
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTime) {
                    throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
                }
                /**
                 * 这里才是真正的发送请求
                 */
                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);


                //接收到消息之后,执行Rpc钩子
                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                this.updateChannelLastResponseTime(addr);
                return response;
            } catch (RemotingSendRequestException e) {
                LOGGER.warn("invokeSync: send request exception, so close the channel[{}]", addr);
                this.closeChannel(addr, channel);
                throw e;
            } catch (RemotingTimeoutException e) {
                if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
                    this.closeChannel(addr, channel);
                    LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
                }
                LOGGER.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

2.5.8 invokeSyncImpl

 /**
     * 同步发送请求:
     *
     * 同步发送方法首先创建ResponeFuture,ResponeFuture是保存请求响应结果的,opaque是请求id,将请求id与响应结果的对应关系保存在responeTable(map)中,通过请求id就可以找到对应的响应结果了。
     * 然后利用netty的Channel连接组件,将消息以同步的方式发送出去,添加一个监听器,监听消息是否成功发送,当监听到消息成功发送,就设置发送成功的标志,否则设置发送失败的标志,并且删除请求与响应的对应关系,以及异常原因。
     *
     * 消息通过netty的Channel组件连接发送后,就等待消息的响应结果。
     * 当响应结果为null,但是发送成功,那么就抛出超时的异常,否则就抛出其他异常。
     * 当响应结果不为null,就返回响应结果,最后删除请求与响应的关系。
     *
     * @param channel
     * @param request
     * @param timeoutMillis
     * @return
     * @throws InterruptedException
     * @throws RemotingSendRequestException
     * @throws RemotingTimeoutException
     */
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        //get the request id
        //请求id,通过该id可以找到该请求的响应结果
        final int opaque = request.getOpaque();

        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            //将请求id与响应结果的对应关系保存在responseTable(map)集合中
            this.responseTable.put(opaque, responseFuture);
            final SocketAddress addr = channel.remoteAddress();
            /**
             * 基于Netty的Channel组件,将请求发出去
             */
            channel.writeAndFlush(request).addListener((ChannelFutureListener) f -> {
                //消息发送成功,设置发送成功的标志
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                }

                //发送失败
                responseFuture.setSendRequestOK(false);
                //删除请求id与响应的对应关系
                responseTable.remove(opaque);
                //发送异常
                responseFuture.setCause(f.cause());
                //发送结果为null
                responseFuture.putResponse(null);
                log.warn("Failed to write a request command to {}, caused by underlying I/O operation failure", addr);
            });

            /**
             * 这里比较重要 等待请求响应的结果
             */
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
            //响应结果为空
            if (null == responseCommand) {
                //发送成功,但是没有响应,抛出超时的异常
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }

            return responseCommand;
        } finally {
            //最后删除请求id与响应的对应关系
            this.responseTable.remove(opaque);
        }
    }

2.5.9 handleRegisterBrokerResult

    /**
     * 对注册结果进行处理
     * @param registerBrokerResultList
     * @param checkOrderConfig
     */
    protected void handleRegisterBrokerResult(List<RegisterBrokerResult> registerBrokerResultList,
        boolean checkOrderConfig) {
        for (RegisterBrokerResult registerBrokerResult : registerBrokerResultList) {
            if (registerBrokerResult != null) {
                //涉及master/slave的一些机制
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                    this.messageStore.updateMasterAddress(registerBrokerResult.getMasterAddr());
                }

                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
                break;
            }
        }
    }

broker的整个启动核心过程基本就是以上的内容

参考文献:

(8条消息) RocketMQ源码(3)—Broker启动流程源码解析【一万字】_刘Java的博客-CSDN博客_rocketmq-broker源码

 (12条消息) RocketMQ源码(4)—Broker启动加载消息文件以及恢复数据源码【一万字】_刘Java的博客-CSDN博客

RocketMQ源码分析(三)——Broker启动流程 | 山海 | 专注分布式系统架构与设计 (tpvlog.com)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ(三) broker启动 的相关文章

  • 将所有 BigDecimal 运算设置为特定精度?

    我的Java程序以高精度计算为中心 需要精确到至少120位小数 因此 程序中所有非整数都将由 BigDecimal 表示 显然 我需要指定 BigDecimal 的舍入精度 以避免无限小数表达式等 目前 我发现必须在 BigDecimal
  • 如何调试使用maven构建的android应用程序

    我目前正在尝试从 Eclipse 调试我的设备上的 Android 应用程序 设备已添加 我可以在控制台和 Eclipse 中看到它 控制台 Windows adb devices List of devices attached 0019
  • 使用 TreeMap 和 Comparator 按值对 HashMap 进行排序

    我使用以下代码创建哈希图 然后使用树形图和比较器对哈希图中的值进行排序 然而 输出结果却出乎意料 所以任何关于我做错了什么的想法都会有帮助 Code public static void main String args System ou
  • 使用 Intellij 2017.2 /out 目录构建会重复 /build 目录中的文件

    更新到 Intellij 2017 2 后 构建我的项目会创建一个 out包含生成的源文件和资源文件的目录 这些文件与已包含的文件重复 build并导致duplicate class生成的类的编译器错误 关于 Gradle 或 Intell
  • 术语“引用”的起源,如“通过引用传递”

    Java C 语言律师喜欢说他们的语言按值传递引用 这意味着 引用 是调用函数时复制的对象指针 同时 在 C 中 以及 Perl 和 PHP 中更动态的形式 引用是其他名称 或动态情况下的运行时值 的别名 我对这里的词源感兴趣 参考 一词的
  • 使用 google-api-java-client 的 2 足 OAuth

    有谁知道如何将 2 legged OAuth 与 google api java client 一起使用 我正在尝试访问 Google Apps 配置 API 以获取特定域的用户列表 以下不起作用 HttpTransport transpo
  • @Cachable 在没有输入参数的方法上?

    我有问题 org springframework cache annotation Cachable注解 Bean public ConcurrentMapCache cache return new ConcurrentMapCache
  • 如何使 ScheduledExecutorService 在计划任务取消时自动终止

    我正在使用一个ScheduledExecutorService如果网络连接已打开超过几个小时 则关闭该连接 然而 在大多数情况下 网络连接在超时之前就关闭了 所以我取消了ScheduledFuture 在这种情况下 我还希望执行程序服务终止
  • 将 RequestBody json 转换为对象 - Spring Boot

    我是 java 开发的初学者 但之前有 PHP 和 Python 等编程语言的经验 对于如何进行 Spring Boot 的开发几乎没有什么困惑 我正在开发一个rest API 它有以下请求 key value key1 value1 pl
  • 维护插入顺序的并发集合[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找一个可以维护插入顺序的并发列表 有人有什么好的推荐吗 我看一些番石榴 例如SetFromMa
  • Java检测鼠标长按

    如果用户按下 JList 组件超过 3 秒 有什么方法可以捕获事件吗 我发现困难的部分是即使在用户松开鼠标左键之前也需要触发事件 这可以通过 mousePressed 和 mouseReleased 组合轻松完成 您可以在 mouseDow
  • 为什么java(>=7版本)不支持运行没有main方法的程序? [关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 class WithoutMain static System out println Without main class Sy
  • Java 泛型:将 Object o 的类与 进行比较

    假设我有以下课程 public class Test
  • 如何在Java中通过反射调用代理(Spring AOP)上的方法?

    一个接口 public interface Manager Object read Long id 实现该接口的类 Transactional Public class ManagerImpl implements Manager Over
  • 如何从Java中的连接获取查询字符串?

    我正在编写一个方法 尝试记录数据库调用 形成连接到它的连接 在查询之后 有很多地方调用方法 connect 来启动并调用 cleanUp 方法来结束 我不能并且不想修改每个地方 所以顺序是这样的 Connection con connect
  • 从数字列表中生成所有唯一对,n 选择 2

    我有一个元素列表 假设是整数 我需要进行所有可能的两对比较 我的方法是 O n 2 我想知道是否有更快的方法 这是我在java中的实现 public class Pair public int x y public Pair int x i
  • 删除Java中重载的方法

    有2个重载方法 这些方法中的每一个都将一种类型的列表转换为不同类型的列表 但第一种方法使用比较器 class SomeClass public static
  • 混合语言源目录布局

    我们正在运行一个使用多种不同语言的大型项目 Java Python PHP SQL 和 Perl 到目前为止 人们一直在自己的私有存储库中工作 但现在我们希望将整个项目合并到一个存储库中 现在的问题是 目录结构应该是什么样的 我们应该为每种
  • 在java中打印阿拉伯字符串

    我试图在 java 中显示阿拉伯语文本 但它显示垃圾字符 示例 或有时在我打印时仅显示问号 我如何才能打印阿拉伯语 我听说它与unicode和UTF 8有关 这是我第一次使用语言 所以不知道 我正在使用 Eclipse Indigo IDE
  • 在没有 ODBC 的情况下从 Java 操作 Access 数据库

    我想从我的 Java 项目操作 Microsoft Access 数据库 accdb 或 mdb 文件 我不想使用 Microsoft 的 JDBC ODBC Bridge 和 Access ODBC 驱动程序 因为 JDBC ODBC 桥

随机推荐

  • GB28181平台如何接入无人机实现智能巡检?

    大家都知道 无人机巡检系统 有效解决了传统巡查工作空间和时间局限问题 降低人力工作成本 有效替代人工巡检工作模式 智能巡检系统通过人工智能技术和机械智能技术完美结合 在工业等场景下 应用非常广泛 本文旨在讲如何实现无人机 如大疆无人机 数据
  • Java泛型机制

    Generics 泛型 一个接口或类可能被声明为一个或者多个类型的参数 该类或接口写在尖括号中 提供的实体类需要属于该接口或类 Generic Programming 泛型编程 泛型类可以是编码更安全以及方便阅读 尤其是在集合类中 Java
  • 【隧道篇 / PPTP&L2TP】(5.2) ❀ 01. PPTP & L2TP 连接 ❀ FortiGate 防火墙

    简介 虽然有SSL 但是因为要安装 FortiClient 客户端软件 很多人还是喜欢用PPTP或L2TP 理由很简单 那就是只要是Windows就可以连 不用安装其它软件 PPTP 与 L2TP 协议的联系与区别 PPTP Point t
  • JAVA三种多数据源配置详解(一)

    在大型项目的开发中 我们可能因为微服务 分布式 集群等架构的影响 而需要到不同的数据库中去查询需要对应的数据 这时 单一的数据库配置就无法满足业务需求 下面我会介绍集中不同场景下的多数据源配置 大家可以根据自身情况进行选择和实现 yml文件
  • 纪念古龙诞辰:论古龙的江湖为何没有一“tong”?

    古龙 nbsp 原名熊耀华 1938年6月7日生于香港 武侠小说家 新派武侠小说泰斗 代表作品 多情剑客无情剑 绝代双骄 楚留香传奇 武林外史 等 有人喜欢金庸 也有很多人喜欢古龙 他们之间的同与不同 以及那些至今仍然令我们备受感动的地方
  • vue中a标签下载本地文件-未找到【已解决】

    首先看一下我的情况 如下 目录如图 代码如下 a href public kjxz pptx 课件下载 a 一切看起来很正常 但是结果如下 然后我搜了一下发现原来href路径的问题 原来使用 public kjxz pptx 文件会找不到
  • 漫步IOS--三目运算符、switch、枚举

    1 三目运算符 三目运算符的定义 表达式1 表达式2 表达式3 例如 a gt b 2 5 三木运算符也是有返回值的 返回值等于对应的表达式的返回值 2 switch 在c语言中 switch只支持整型 但是这里的整型包括 整型 字符 布尔
  • 如何将文档上传到 ChatGPT

    OpenAI 一直在为 ChatGPT 添加几个有趣的功能 包括对网页浏览和插件的支持 但是 仍然没有办法本地上传文档并根据其上下文提出问题 当然 有些用户可以在他们的数据上训练 AI 聊天机器人 但并不是每个人都了解如何设置工具和库 因此
  • 华为OD机试 C++ 去除多余空格

    题目 你需要写一个功能 它能处理一段文本 去除其中不必要的空格 但是如果这些空格被一对单引号包围起来 就保留它们不变 同时 你还要调整一些特定词汇的位置 这些词汇的位置会以坐标的方式给出 坐标要基于新的文本 特别注意 关键词的位置一定不是空
  • Unity 入门 Input 类

    1 获得键盘 Input GetKey KeyCode A Input GetKeyDown KeyCode A Input GetKeyUp KeyCode A 2 获得鼠标信息 Input mousePosition 鼠标位置 Inpu
  • 关系型数据库是如何运作的

    一说到关系型数据库 我总感觉缺了点什么 如果你尝试透过 关系型数据库是如何运作的 的关键词句来进行搜索 其搜索结果是少量的而且内容是简短的 难道说是由于它已经太老旧而已经不再流行吗 作为一名开发者 我讨厌使用我不明白的技术 此外 关系型数据
  • s、x、t -learner

  • DICOM之Transfer Syntax

    Transfer Syntax A Transfer Syntax is a set of encoding rules able to unambiguously represent one or more Abstract Syntax
  • ChatGPT在线个人小助手应用搭建

    ChatGPT在线个人小助手应用搭建 在线体验 点我在线体验 因为openAI账户申请后会默认有18美元的账户 openAI每次调用大概会花掉0 01美元 所以为了防止恶意刷api 无意义聊天 页面做了密码限制 如果密码不对 是不会启用op
  • mysql存储引擎层和服务器层,MySQL底层架构原理,工作流程和存储引擎的数据结构讲解...

    数据库 DataBase 是存放用户数据的地方 当用户访问 操作数据库中的数据时 需要数据库管理系统的帮助 数据管理系统的全称是DataBase Management System 简称DBMS 通常情况下我们会把数据库和数据库管理系统笼统
  • 网页端无法复制粘贴的解决方案

    由于瑞格系统无法复制粘贴 写java代码比较难受 所以就找了一些方法来解决网页端无法复制粘贴的问题 1 打开浏览器的设置界面 并打开拓展程序 2 在拓展程序中选择左上角的拓展程序 并打开Chrome网上应用商店 3 在Chrome网上应用商
  • 多线程JUC并发篇常见面试详解

    文章目录 1 JUC 简介 2 线程和进程 3 并非与并行 4 线程的状态 5 wait sleep的区别 6 Lock 锁 重点 1 Lock锁 2 公平非公平 3 ReentrantLock 构造器 4 Lock 锁实现步骤 7 syn
  • 百炼成钢;JavaScript逆向九大专题详解

    JavaScript是一种脚本语言 通常用于在Web浏览器中编写交互式前端应用程序 它是一种解释性语言 可以在客户端 浏览器 和服务器端 Node js 上运行 JavaScript可以用于创建动态网页 Web应用程序 游戏 移动应用程序等
  • unity 获取鼠标键盘

    unity 获取鼠标键盘 在做项目中我们经常会用到鼠标键盘 那么怎么去获取鼠标键盘呢 接下里我带大家了解一下 首先是获取鼠标 大家记住无论是获取鼠标还是获取键盘都要用到unity中的一个小小的组件首先在unity上方的选项卡中选择edit
  • RocketMQ(三) broker启动

    RocketMQ源码版本V5 0 0 可兼容之前的版本 因为整理资料的时候 之前的版本 和V5版本有所出入 核心流程基本还是大同小异的 此前已经总结了NameServer的启动流程源码 现在来了解Broker的启动流程 在RocketMQ启