RocketMQ源码(十三)—消费者DefaultMQPushConsumer启动主要流程源码

2023-11-07

此前我们学习了Broker和Producer的启动源码,以及Producer发送消息源码和Broker接收存储消息的源码,现在,我们来学习Consumer的启动以及消费消息的源码。Consumer的启动源码和Producer的启动源码还是有很多相似的地方的。

目录

1 创建DefaultMQPushConsumer实例 

2 subscribe订阅

3 start启动消费者

3.1 copySubscription拷贝订阅关系

4 小结


客户端常用的消费者类是DefaultMQPushConsumer,此类的简单消费者案例如下,在RocketMQ源码的example模块下 找到更多快速案例。

消费者代码举例:

public class Consumer {

   public static void main(String[] args) throws InterruptedException, MQClientException {

       // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c1-group");

       // 设置NameServer的地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

       //TODO:订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("test_topic", "*");
       // TODO: 注册消息监听器,用来消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                try {

                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

                    boolean consume = new Random().nextBoolean();
                    if(!consume) {
                        //TODO:消费失败,等待重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

                //TODO: 消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //TODO: 启动消费者实例
        consumer.start();

        System.out.printf("Consumer Started.%n");
   }
}

我们本次分析RocketMQ消费者启动的源码。实际上就是分析DefaultMQPushConsumer的构造器以及start方法的源码。

1 创建DefaultMQPushConsumer实例 

DefaultMQPushConsumer的构造器有很多,但最终都是调用下面四个参数的构造函数:

  /**
     * Constructor specifying namespace, consumer group, RPC hook and message queue allocating algorithm.
     * DefaultMQPushConsumer的构造器有很多,但最终都是调用下面四个参数的构造函数:
     * 这个构造器是指定了命名空间、生产者组、RPC钩子和消费者之间消息分配的策略算法的构造器,其内部创建了一个DefaultMQPushConsumerImpl实例,DefaultMQPushConsumer可以看作是DefaultMQPushConsumerImpl的包装类,开放给开发人员使用;
     * DefaultMQPushConsumer中的几乎所有的方法内部都是由DefaultMQPushConsumerImpl实现的。这是门面模式设计模式。
     * @param namespace Namespace for this MQ Producer instance.                   namespace地址
     * @param consumerGroup Consume queue.                                         消费者组
     * @param rpcHook RPC hook to execute before each remoting command.            在每个远程处理命令之前执行的RPC勾子
     * @param allocateMessageQueueStrategy Message queue allocating algorithm.     消费者之间消息分配的策略算法
     */
    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.consumerGroup = consumerGroup;
        this.namespace = namespace;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        //创建DefaultMQPushConsumerImpl实例
        defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    }

这个构造器是指定了命名空间、生产者组、RPC钩子和消费者之间消费分配的策略算法的构造器,其内部创建了一个DefaultMQPushConsumerImpl的实例,DefaultMQPushConsumer可以看作是

DefaultMQPushConsumerImpl的包装类,开放给开发人员使用,DefaultMQPushConsumer中的几乎所有的方法内部都是由DefaultMQPushConsumerImpl实现的。这个是典型的门面模式设计模式。

下面是DefaultMQPushConsumerImpl的构造器,也很简单。

   /**
     * DefaultMQPushConsumerImpl
     * @param defaultMQPushConsumer
     * @param rpcHook
     */
    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
        this.defaultMQPushConsumer = defaultMQPushConsumer;
        this.rpcHook = rpcHook;
        //consumer  状态错误时采用定时任务定时执行拉取请求的时间间隔
        this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
    }

创建了DefaultMQPushConsumer实例之后,会设置一些属性,包括namesrvAddr、consumeFromWhere、注册messageListener消息监听器等等,这些都是简单的属性赋值操作,除了subscribe方法。

2 subscribe订阅

subscribe方法表示Consumer订阅的自己感兴趣的Topic,并且支持对消息进行过滤,过滤表达式支持TAG 和SQL92两种类型,他们都会被解析成SubscriptionData对象,最终将topic与SubscriptionData的关系维护到RebalanceImpl内部的subscriptionInner这个map集合中。

  /**
     * Subscribe a topic to consuming subscription.
     * DefaultMQPushConsumer的方法
     * 订阅topic、支持消息过滤表达式
     * @param topic topic to subscribe.(订阅的topic)
     * @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
     *                      订阅表达式,它仅支持或操作,如"tag1 || tag2 || tag3",如果为null 或*,则表示订阅全部。
     * if null or * expression,meaning subscribe all
     * @throws MQClientException if there is any client error.
     */
    @Override
    public void subscribe(String topic, String subExpression) throws MQClientException {
        this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
    }

下面是DefaultMQPushConsumerImpl的方法实现:

   /**
     * DefaultMQpushConsumerImpl的方法
     * 订阅topic
     * @param topic
     * @param subExpression
     * @throws MQClientException
     */
    public void subscribe(String topic, String subExpression) throws MQClientException {
        try {
            //解析订阅表达式,构建SubscriptionData
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression);
            //将topic与SubscriptionData的关系维护到RebalanceImpl内部的subscriptionInner
            this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            if (this.mQClientFactory != null) {
                //如果mQClientFactory不为null,则将心跳信息发送给所有broker
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

3 start启动消费者

DefaultMQPushConsumer的构造器实际上没做什么太多的操作,主要是start方法内部会执行很多初始化操作,因此使用时,我们需要在消费或者查询消息之前调用该方法。

   /**
     * This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
     * DefaultMQPushConsumer的方法
     * 启动消费者
     * @throws MQClientException if there is any client error.
     */
    @Override
    public void start() throws MQClientException {
        //根据namespace和consumerGroup设置消费者组
        setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
        //默认消费者实现启动
        this.defaultMQPushConsumerImpl.start();
        //消费者轨迹跟踪服务,默认null
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }

暴露给开发者的DefaultMQPushConsumer是一个外观类,真正工作的是其内部的DefaultMQPushConsumerImpl,所以我们看下DefaultMQPushConsumerImpl#start()的逻辑。

主要是DefaultMQPushConsumerImpl#start方法,该方法实现生产者的启动,主要步骤有如下几步:

1. 调用checkConfig方法检查消费者的配置信息是否合法

        1.1 如果consumerGroup为空,或者长度大于255,或者包含非法字符(正常的匹配模式为 ^[%|a-zA-Z0-9_-]+$),或者消费者组名为默认组名DEFAULT_CONSUMER,直接抛出异常;

        1.2 校验消费模式:集群/广播

        1.3 校验ConsumeFromWhere(指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费);

        1.4 校验开始消费的指定时间;

        1.5 校验AllocateMessageQueueStratrgy;

        1.6 校验订阅关系;

        1.7 校验是否注册消息监听;

        1.8 校验消费线程数,consumeThreadMin 和 consumeThreadMax 默认值都是20,取值区间都是[1, 1000];

        1.9 校验本地队列缓存消息的最大数,默认的1000,取值范围是[1, 1024],主要是做流控用的。

        1.10 校验拉取消息的时间间隔,pullInterval参数,默认是不存在间隔,取值范围是[0, 65535]。当消费速度比生产速度快,可以设置这个参数,避免花费大概率从broker拉取空消息;

        1.11 校验单词拉取的最大消息数,consumeMessageBatchMaxSize参数,默认是1,取值范围是[1, 1024];

        1.12 校验单词消费的最大消息数,pullBatchSize参数,默认是32,取值范围是[1, 1024].

2. 调用copySubscription方法,拷贝订阅关系,然后为集群消费模式的消费者,配置其对应的重试主题retryTopic = %RETRY% + consumerGroup并且设置当前消费者自动订阅该消费者组对应的重试topic,用于实现消费重试。

3. 调用getOrCreateMQClientInstance方法,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量。该方法我们在生产者启动源码部分已经讲过了。

4. 设置负载均衡服务rebalanceImpl的相关属性。

5. 创建消息拉取核心对象PullAPIWrapper,封装了消息拉取机结果解析逻辑的API。

6. 创建消息模式设置不同的OffstStore,用于实现消费者的消费偏移量offset的管理。如果是广播消费模式,则是LocalFileOffsetStore,消息消费进度即offset存储在本次磁盘中。如果是集群广播消费模式,则是RemoteBrokerOffsetStore,消息消费进度即offset存储在远程broker中。

7. 调用offsetStore.load加载消费偏移量,LocalFileOffsetStore会加载本地磁盘中的数据,RemoteBrokerOffsetStore则是一个空实现。

8. 根据消息监听器MessageListener的类型创建不同的消息消费服务ConsumeMessageService。如果是MessageListenerOrderly类型,则表示顺序消费,创建CosnumeMessageOrderlyService。如果是MessageListenerConcurrently类型,则表示并发消费,创建ConsumeMessageOrderlyService。

9. 调用consumeMessageService.start 启动消息消费服务。消息拉取服务PullMessageService拉取到消息后,会构建ConsumeRequest对象交给consumeMessageService去消费。

10. 注册消费者组和消费者到MQClientInstance中的consumerTable中,如果没注册成功,那么可能是因为同一个程序中存在同名消费者组的不同消费者,则抛出异常。

11. 调用mQClientFactory#start方法启动CreateMQClientInstance客户端通信实例,初始化netty服务,各种定时任务,拉取消息服务,rebalabceService服务等等。CreateMQClientInstance仅会被初始化一次,其源码我们在生产者启动源码部分已经讲过了。

12. 进行后续处理:

        12.1 调用updateTopicSubscribeInfoWhenSubscriptionChanged方法,向NameServer拉取并更新当前消费者订阅的topic路由信息。

        12.2 调用checkClientInBroker方法,随机选择一个Broker,发送检查客户端tag配置的请求,主要是检测Broker是否支持SQL92类型的tag过滤以及SQL92的tag语法是否正确。

        12.3 调用sendHeartbeatToAllBrokerWithLock方法,主动发送心跳信息给所有broker。Broker接收到心跳后,会发送Code为NOTIFY_CONSUMER_IDS_CHANGED的请求给Group下其它消费者,要求它们重新进行负载均衡。

        12.4 调用rebalanceImmediately方法,唤醒负载均衡服务rebalanceService,主动进行一次MessageQueue的重平衡。

 /**
     * DefaultMQPushConsumerImpl的方法
     * 启动默认消费者实现
     * 主要步骤:
     * 1. 调用checkConfig方法检查消费者的配置信息,
     * 如果consumerGroup为空,或者长度大于255个字符,或者包含非法字符(正常的匹配模式为 ^[%|a-zA-Z0-9_-]+$),或者消费者组名为默认组名DEFAULT_CONSUMER,
     * 或者messageModel为空,或者consumeFromWhere为空,或者consumeTimestamp为空,或者allocateMessageQueueStrategy为空……等等属性的空校验,满足以上任意条件都校验不通过抛出异常。
     * 2. 调用copySubscription方法,拷贝拷贝订阅关系,然后为集群消费模式的消费者,配置其对应的重试主题 retryTopic = %RETRY% + consumerGroup并且设置当前消费者自动订阅该消费者组对应的重试topic,用于实现消费重试。
     * 3. 调用getOrCreateMQClientInstance方法,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量。该方法我们在生产者启动源码部分已经讲过了
     * 4. 设置负载均衡服务rebalanceImpl的相关属性。
     * 5. 创建消息拉取核心对象PullAPIWrapper,封装了消息拉取及结果解析逻辑的API。
     * 6. 根据消息模式设置不同的OffsetStore,用于实现消费者的消息消费偏移量offset的管理:
     *      如果是广播消费模式,则是LocalFileOffsetStore,消息消费进度即offset存储在本地磁盘中。
     *      如果是集群消费模式,则是RemoteBrokerOffsetStore,消息消费进度即offset存储在远程broker中。
     * 7. 调用offsetStore.load加载消费偏移量:
     *      LocalFileOffsetStore会加载本地磁盘中的数据;
     *      RemoteBrokerOffsetStore则是一个空实现。
     * 8. 根据消息监听器MessageListener的类型创建不同的消息消费服务ConsumeMessageService:
     *      如果是MessageListenerOrderly类型,则表示顺序消费,创建ConsumeMessageOrderlyService。
     *      如果是MessageListenerConcurrently类型,则表示并发消费,创建ConsumeMessageOrderlyService。
     * 9. 调用consumeMessageService.start启动消息消费服务。消息拉取服务PullMessageService拉取到消息后,会构建ConsumeRequest对象交给consumeMessageService去消费。
     * 10. 注册消费者组和消费者到MQClientInstance中的consumerTable中,如果没注册成功,那么可能是因为同一个程序中存在同名消费者组的不同消费者,抛出异常。
     * 11. 调用mQClientFactory#start方法启动CreateMQClientInstance客户端通信实例,初始化netty服务、各种定时任务、拉取消息服务、rebalanceService服务等等。
     *      CreateMQClientInstance仅会被初始化一次,其源码我们在生产者启动源码部分已经讲过了。
     * 12. 进行后续的处理:
     *      1.调用updateTopicSubscribeInfoWhenSubscriptionChanged方法,向NameServer拉取并更新当前消费者订阅的topic路由信息。
     *      2.调用checkClientInBroker方法,随机选择一个Broker,发送检查客户端tag配置的请求,主要是检测Broker是否支持SQL92类型的tag过滤以及SQL92的tag语法是否正确。
     *      3.调用sendHeartbeatToAllBrokerWithLock方法,主动发送心跳信息给所有broker。Broker接收到心跳后,会发送Code为NOTIFY_CONSUMER_IDS_CHANGED的请求给Group下其它消费者,要求它们重新进行负载均衡。
     *      4.调用rebalanceImmediately方法,唤醒负载均衡服务rebalanceService,主动进行一次MessageQueue的重平衡。
     *
     * @throws MQClientException
     */
    public synchronized void start() throws MQClientException {
        //根据服务状态选择走不同的代码分支
        switch (this.serviceState) {
            /**
             * 服务仅仅创建,而不是启动状态,那么启动服务
             */
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                //首先修改服务状态为服务启动失败,如果最终启动成功则再修改为RUNNING
                this.serviceState = ServiceState.START_FAILED;

                /**
                 * 1. 检查消费者的配置信息是否合法
                 * (1)如果consumerGroup为空,或者长度大于255,或者包含非法字符(正常的匹配模式为 ^[%|a-zA-Z0-9_-]+$),或者消费者组名为默认组名DEFAULT_CONSUMER,直接抛出异常
                 * (2)校验消费模式:集群/广播
                 * (3)校验ConsumeFromWhere(指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费)
                 * (4)校验开始消费的指定时间
                 * (5)校验AllocateMessageQueueStrategy(集群模式下的消息队列负载策略)
                 * (6)校验订阅关系
                 * (7)校验是否注册消息监听
                 * (8)校验消费线性数,consumeThreadMin 和 consumeThreadMax 默认值都是20,取值区间都是[1,1000]
                 * (9)校验本地队列缓存消息的最大数,默认1000,取值范围是[1,1024]
                 * (10)校验拉取消息的时间间隔,pullInterval参数,默认是不存在时间间隔,取范围是[0,65535]。
                 * 当消费速度比生产速度快,可以设置这个参数,避免花费大概率从broker拉取空消息
                 * (11)校验单次拉取的最大消息数,consumerMessageBatchMaxSize参数,默认是1,取值范围是[1,1024]
                 * (12) 校验单次消费的最大消息数,pullBatchSize参数,默认是32,取值范围是[1,1024]
                 */
                this.checkConfig();

                /**
                 * 2. 拷贝订阅关系,将订阅关系设置到重平衡服务类RebalanceImpl中
                 * 为集群消费模式的消费者,配置其对应的重试主题 retryTopic = %RETRY% + consumerGroup
                 * 并且设置当前消费者自动订阅该消费者组对应的重试topic,用于实现消费重试
                 */
                this.copySubscription();

                //设置消费者客户端实例名称为进程ID
                //如果是集群消费模式,如果instanceName为默认值 "DEFAULT",那么改成 UtilAll.getPid() + "#" + System.nanoTime()
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
            
                /**
                 * 3. 获取MQClient实例,然后根据clientId获取或者创建CreateMQClientInstance实例,并赋给mQClientFactory变量
                 *  MQClientInstance封装了RocketMQ底层网络处理API,Producer、Consumer都会使用到这个类,是Producer、Consumer与NameServer、Broker打交道的网络通道。
                 *  因此,同一个clientId对应同一个MQClientInstance实例就可以了,即同一个应用中的多个producer和consumer使用同一个MQClientInstance实例即可。
                 */
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                /**
                 * 4. 设置负载均衡服务的相关属性
                 */
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                /**
                 * 5. 如果pullAPIWrapper为null,创建消息拉取核心对象PullAPIWrapper,它封装了消息拉取及结果解析逻辑的API
                 */
                if (this.pullAPIWrapper == null) {
                    this.pullAPIWrapper = new PullAPIWrapper(
                        mQClientFactory,
                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                }
                //为pullAPIWrapper注册过滤消息的钩子函数
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                /**
                 * 6. 根据消息模式设置不同的OffsetStore,用于实现消费者的消息消费偏移量offset的管理
                 * 如果是广播模式(BROADCASTING),则创建LocalFileOffsetStore对象,将消费者的offset存储到本地;
                 * 默认文件路径为当前用户主目录下的 .rocketmq_offsets/clientId/{clientId}/clientId/{group}/Offsets.json。其中clientId为当前消费者id,默认为ip@default,{clientId}为当前消费者id,默认为ip@default, clientId为当前消费者id,默认为ip@default,{group}为消费者组名称
                 * 如果是集群模式(CLUSTERING),则创建RemoteBrokerOffsetStore对象,将消费者的offset存储到broker,文件路径为当前用户主目录下的store/config/consumerOffset.json
                 */
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    //根据不同的消费模式选择不同的OffsetStore实现
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            //如果是广播消费模式,则是LocalFileOffsetStore,消息消费进度即offset存储在本地磁盘中。
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            //如果是集群模式,则是RemoteBrokerOffsetStore,,消息消费进度即offset存储在远程broker中。
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                /**
                 * 7. 加载消费偏移量,LocalFileOffsetStore会记载本地磁盘的数据,RemoteBrokerOffsetStore则是一个空实现
                 */
                this.offsetStore.load();

                /**
                 * 8. 根据消息监听器的类型创建不同的消息消费服务
                 * 如果是顺序消费,则创建ConsumeMessageOrderlyService对象
                 * 如果是其他消费,则创建ConsumeMessageConcurrentlyService对象,同时内部也会创建一个ThreadPoolExecutor线程池,这个线程池非常的重要,拉取到消息后会将消息提交到这个线程池中给消费者消费。
                 */
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    //如果是MessageListenerOrderly类型,则表示顺序消费,创建顺序消息消费服务ConsumeMessageOrderlyService
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                    //POPTODO reuse Executor ?
                    this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    //如果是MessageListenerConcurrently类型,则表示并发消费,创建并发消息消费服务ConsumeMessageConcurrentlyService
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                    //POPTODO reuse Executor ?
                    this.consumeMessagePopService =
                        new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                //启动消息消费服务
                this.consumeMessageService.start();
                // POPTODO
                //启动消息弹出服务
                this.consumeMessagePopService.start();

                /**
                 * 9. 将consumer注册到本地。
                 *    注册消费者组和消费者到MQClientInstance中的consumerTable中
                 *    Map中,key=groupName; value=DefaultMQPushConsumerImpl,就是消费者对象
                 */
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    //如果没有注册成功,那么可能是因为同一个程序中存在同名消费者组的不同消费者
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                /**
                 * 10. 启动CreateMQClientInstance客户端通信实例
                 * netty服务,各种定时任务,拉取消息服务,rebalanceService服务。
                 */
                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            /**
             * 服务状态是其他的,那么抛出异常,及start方法仅能调用一次
              */
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        /**
         * 11. 后续处理
         */
        //TODO 向NameServer拉取并更新当前消费者订阅的topic路由信息
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        //TODO 随机选择一个broker,发送检查客户端tag配置的请求,主要是检查broker是否支持SQL92类型的tag过滤以及SQL92的tag语法是否正确
        this.mQClientFactory.checkClientInBroker();
        //TODO 发送心跳信息给所有broker
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        //TODO 前面的时候,启动了重平衡服务,但是因为 CountDownLatch 导致阻塞了,这里就是唤醒负载均衡服务rebalanceService,进行重平衡
        this.mQClientFactory.rebalanceImmediately();
    }

3.1 copySubscription拷贝订阅关系

该方法将defaulyMQPushConsumer中的订阅关系Map集合subscription的数据拷贝到RebalanceImpl的subscriptionInner中。

然后还有很重要的一步,就是为集群消费模式的消费者,配置其对应的重试主题retry Topic = %RETRY% + consumerGroup,并且设置当前消费者自动订阅该消费者组对应的重试topic,用于实现消费重试,而如果是广播消费模式,那么不订阅重试topic,所以说,从Consumer启动的的时候开始,就注定了广播消费模式的消费者,消费失败消息会丢失,无法重试。

/**
 * DefaultMQPushConsumerImpl的方法
 * <p>
 * 拷贝订阅关系
 *
 * @throws MQClientException
 */
private void copySubscription() throws MQClientException {
    try {
        //将订阅关系拷贝到RebalanceImpl的subscriptionInner中
        Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
        if (sub != null) {
            for (final Map.Entry<String, String> entry : sub.entrySet()) {
                final String topic = entry.getKey();
                final String subString = entry.getValue();
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }
        //如果messageListenerInner为null,那么将defaultMQPushConsumer的messageListener赋给DefaultMQPushConsumerImpl的messageListenerInner
        //在defaultMQPushConsumer的registerMessageListener方法中就已经赋值了
        if (null == this.messageListenerInner) {
            this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
        }
        //消息消费模式
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            //广播消费模式,消费失败消息会丢弃
            case BROADCASTING:
                break;
            //集群消费模式,支持消费失败重试
            //自动订阅该消费者组对应的重试topic,默认就是这个模式
            case CLUSTERING:
                //获取当前消费者对应的重试主题 retryTopic = %RETRY% + consumerGroup
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                //当前消费者自动订阅该消费者组对应的重试topic,用于实现消费重试
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
        throw new MQClientException("subscription exception", e);
    }
}

4 小结

本次我们仅仅介绍了Consumer消费者启动的主要流程,后面我们单独分析这些服务的工作原理。其中几个关键服务如下:

1. rebalanceService:消费者负载均衡服务,用于确定消费者的消息队列以及负载均衡,同时也是触发pullMessageService拉取消息的入口。由MQClientInstance启动,同一个服务器的所有Consumer使用同一个实例。

2. pullMessageService:消息拉取服务,用于拉取消息。由MQClientInstance启动,同一个服务器的所有Consumer使用同一个实例。

3. consumeMessageService:消息消费服务,消息拉取服务拉取到消息后,交给此服务消费消息。由DefaultMQPushConsumerImpl启动,每个Consumer持有一个实例。

4. OffsetStore:用于管理消费点位的上报持久化。由DefaultMQPushConsumerImpl启动,每个Consumer持有一个实例。

参考文献:

RocketMQ源码(15)—消费者DefaultMQPushConsumer启动主要流程源码_刘Java的博客-CSDN博客

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ源码(十三)—消费者DefaultMQPushConsumer启动主要流程源码 的相关文章

  • 配置 OPC UA 服务器 (Milo)?

    我刚刚查看了 Eclipse Milo 项目 https projects eclipse org proposals milo https projects eclipse org proposals milo 这对于 开放 OPC UA
  • 将嵌套的 ArrayList 转换为 Java List

    我有这个方法 public List
  • 什么是“非阻塞”并发?它与普通并发有何不同?

    什么是 非阻塞 并发 它与使用线程的普通并发有何不同 为什么不在所有需要并发的场景中都使用非阻塞并发呢 使用非阻塞并发有开销吗 我听说Java中可以实现非阻塞并发 我们是否应该在特定场景下使用此功能 将这些方法之一与集合一起使用是否有区别或
  • JCombobox 字符串项(可见)和整数键(固有)

    我有一个数据库模式 它将作为 JTable 列显示在 JCombobox 中以选择名称 但我希望将 ID 字段插入 作为外键 到另一个表中 通常 在下拉列表中选择一个项目 将所选项目带到组合框的显示区域 我想要做的是 当选择组合框中的任何项
  • 如何将 Struts 2 与 Velocity 和 Tiles 结合使用

    有人能够获得与 struts 2 一起使用的速度和图块吗 我在网上查找示例或教程时遇到一些问题 从我从邮件列表中收集到的信息来看 这似乎根本不可能 但邮件已经很旧了 https struts apache org docs tiles pl
  • 将 Java 3D 坐标转换为 2D 屏幕坐标

    我正在使用一个名为 Walrus 的 Java 3D 应用程序 该应用程序用于显示有向图 该代码已经具有突出显示节点并在给定其屏幕坐标的情况下在图形中相邻绘制标签的功能 旋转屏幕后 该节点不再突出显示 我所拥有的是 3D 中的节点坐标 我需
  • Java Sound可以用来控制系统音量吗?

    Java 声音优惠FloatControl各种声音线路功能的实例 以及MASTER GAIN http docs oracle com javase 7 docs api javax sound sampled FloatControl T
  • 使android listview布局可滚动

    我有一个 xml 文件 其布局为 ASCII 形式 ImageView TextView List
  • 使用 Oracle Wallet 身份验证从 Spring-jdbc 连接到 Oracle DB

    我将 Spring jdbc 与 org apache commons dbcp BasicDataSource 结合使用 使用用户名和密码进行连接 我想使用BasicDataSource 因为我只有一个连接 我有这个代码
  • 如何从 Jackson 中的自定义解串器调用默认解串器

    我在杰克逊的自定义解串器有问题 我想访问默认序列化器来填充我要反序列化的对象 在填充之后 我将做一些自定义的事情 但首先我想使用默认的 Jackson 行为反序列化对象 这是我目前拥有的代码 public class UserEventDe
  • 从多个地方绘制 JPanel

    我目前正在为学校开发一款 Java 2D 游戏 我们必须使用抽象工厂设计模式 对于 2D 实现 我使用工厂如下 public class Java2DFact extends AbstractFactory public Display d
  • 在类路径中使用通配符调用 java 失败

    我当前目录中有一些 jar 它们都需要位于类路径中 因此我想对类路径使用通配符约定 命令行是 java exe classpath org python util jython args 但是我收到这个错误 Exception in thr
  • 字节流和字符流

    请解释一下什么是字节流和字符流 这些究竟意味着什么 Microsoft Word 文档是面向字节的还是面向字符的 Thanks 流是一种顺序访问文件的方式 字节流逐字节访问文件 字节流适用于任何类型的文件 但不太适合文本文件 例如 如果文件
  • Java文本输出中的UTF-8编码问题

    我一直致力于测试高棉语 Unicode Wordbreaker 的各种解决方案 高棉语单词之间没有空格 这使得拼写检查和语法检查变得困难 以及从旧高棉语转换为高棉语 Unicode 我得到了一些源代码 现在在线 http www white
  • Hibernate SET 元素 order-by 子句

    我想知道 我可以平静地接受以下事实 当 fetch select 时 您可以在映射文件中的 SET 元素上设置 order by 属性 但如果您在创建查询时获取所有内容 这样安全吗 我的意思是 他们将结果放入 HashSet 中 我不认为这
  • 如何在不打开浏览器的情况下查看 Android 应用程序中的网页?

    嘿 我正在开发一个 Android 应用程序 我想连接到该应用程序内的网络 不过 我在某种程度上尝试过 WebView 但它在我的目录中显示的文件很好 但当连接到 google com 时 它显示错误 然后我添加了这个文件
  • 码头无故停止

    我需要经验丰富的码头用户的建议 我在负载均衡器 亚马逊云 后面维护着 2 台 Linux 机器 使用 Jetty 9 0 3 有时我的 Jetty 容器会被 Thread 2 无故关闭 同时地 显示以下日志并且容器无故停止 没有错误 没有例
  • 正确的单元测试技术

    在使用 TDD 时 我发现自己需要测试一个包含查找值的常量 最终 哈希图 请查看更新中出现这种情况的原因 见下文 private static final Map
  • Web 服务返回 java.lang.reflect.InitationTargetException

    我在向 java web 服务发出请求时收到上述消息 我们最初创建了一个 Java 控制台应用程序并手动提交了一个 xml 文件 当将其作为 Java 应用程序运行时 将使用 System out println 成功创建并显示响应 我们通
  • Swing:如何创建事件并将其分派给组件?

    我需要将一些事件发送到 Swing 中的组件 因此它的处理方式就像任何用户生成的标准 Swing 事件一样 基本上 类似于宏记录器 然后是 JEditorPane 的执行器 但我需要对生成的事件有更多的控制 所以 假设我有一个编辑 我想 捕

随机推荐

  • win10电脑任务栏右侧小图标消失解决方法

    WIN10系统任务栏 左边是窗口键和快捷图标 右边是时钟 系统喇叭 网线连接图标 任务栏左边没问题 窗口键和快捷图标都良好 右侧的系统图标无显示 只显示任务栏的底色 尝试操作隐藏任务栏再开启任务栏后 图标恢复正常了 再点击右侧任务栏任意图标
  • 网络环路导致公司网络瘫痪问题排查

    问题 公司网络突然很不稳定 跟踪发现大量丢包 问题排查 1 怀疑电信网络 设备有问题 联系电信经理 安排工程人员过来排查 排查发现入户网络正常 更换电信入户光猫后网络还是不稳定 还是大量丢包 2 机房排查 2 1 关闭所有交换机 然后再一台
  • Anaconda换国内源(清华源、中科大源)

    命令行执行 Windows下 Anaconda 清华源 conda config add channels https mirrors tuna tsinghua edu cn anaconda pkgs free conda config
  • PPP协议实现透明传输的2种方法以及工作状态

    文章目录 1 PPP协议帧格式 2 字节填充 2 1 零比特填充方法 不使用序号和确认机制 PPP协议的工作状态 1 PPP协议帧格式 7E 十六进制数0x7E 在PPP协议里代表帧头和帧尾 二进制表示为0111 1110 占一个子节 FF
  • 机器学习SVM函数

    目录 1 SVM的损失函数 2 SVM的核方法 2 1 什么是核函数 2 1 1 核函数概念 2 1 2 核函数举例 2 1 2 1 核方法举例1 2 1 2 2 核方法举例2 2 2 常见核函数 2 3 小结 3 SVM回归 1 SVM的
  • springboot的负载均衡

    springboot的负载均衡 eueka作为注册中心 负载均衡使用的是Ribbon Ribbon负载均衡的策略有轮询 重试 权重 默认轮询 这是它独特的算法去调用具体的服务 在消费者启动动类中加上 Bean LoadBalanced pu
  • C++ 机房预约系统(七):老师模块——老师登录和注销、查看所有预约功能、审核预约功能的具体实现

    9 教师模块 在这个模块中 登录和注销和管理员与学生的实现一样 查看所有预约也和学生的查看所有预约实现一样 审核预约基本上和学生的取消预约一样 不同的是 学生模块 是通过学号和预约状态找到可以取消的预约记录 在老师模块 是通过预约状态找到可
  • Ik分词器(自定义分词-mysql)

    引言 ik分词器的分词范围不够广泛 某些特定行业的专业用语分词能力就不够了 此时就需要自定义分词 与停顿词 1 下载ik分词器源码 git地址 https github com medcl elasticsearch analysis ik
  • Lombok 的正确使用姿势

    文章目录 1 Lombok 是什么 2 安装 Lombok 3 Spring Boot 集成 Lombok 4 使用 Lombok 4 1 注解一览表 4 2 部分使用介绍 Getter lazy true Value Builder Su
  • R语言基本统计分析——抽样

    R语言基本统计分析 抽样 简单随机抽样 简单随机抽样是指从数据总体中任意抽取指定数量的数据作为样本 其中每个可能被抽取中的样本概率相等 可以用R语言中的sample 函数进行随机抽样 抽取方法分为 重置抽样 不重置抽样 R语言命令为 sam
  • Ubuntu 18.04 配置ibus中文拼音输入法(超简单)

    Ubuntu 18 04系统想安装中文输入法 利用ibus输入法配置 只要三步 注意 你的Ubuntu需要可以上网 因为要下载一系列安装包 第一步 首先需要给Ubuntu18 04安装Chinese语言包支持 先打开Settings窗口 g
  • playwright连接已有浏览器操作

    文章目录 playwright连接已有浏览器操作 前置准备 打开本地已有缓存的Chrome 理解 指定端口打开浏览器 连接指定端口已启动浏览器 推荐 playwright连接已有浏览器操作 前置准备 pip install playwrig
  • Linux和windows下setsockopt用法

    Linux和windows下setsockopt用法 linux struct timeval timeout 3 0 3s int ret setsockopt sock fd SOL SOCKET SO SNDTIMEO timeout
  • xml 文档树

    xml 文档树 XML documents form a tree structure that starts at the root and branches to the leaves XML 文档树起始于 根元素 并以此为基础扩展文档
  • 优质网址收集

    1 免费PPT模板下载网址都是免费且免登录 网址为 http www ypppt com http 51pptmoban com 2 在线工具网站 包括格式转换 文字识别 图片压缩 视频压缩等 网址为 http www nicetool n
  • python课后作业总结

    课后作业1 一个列表中有多种字符型的元素 要求一将非字符型的全部改成字符型 要求二将所有字符型中的大写字母改成小写 需要用到的知识有 1 lower 函数 功能 将大写字母改成小写 2 列表生成式 s lower for s in L 其中
  • layui生成菜单

    layui生成菜单 thymeleaf渲染 1 ul class layui nav layui nav tree li class layui nav item a href a li ul
  • ARM 64 协程切换上下文的汇编代码解读

    ARM 64协程切换上下文的汇编代码解读 贺志国 2023 8 11 在ARM 64位架构中 有一组通用寄存器 General Purpose Registers 一组浮点寄存器 Floating point Registers 和一组特殊
  • windows10 快捷方式右键失灵问题解决

    问题现象 windows10版本系统 鼠标右键桌面快捷方式的时候 无法展开正常界面 在卡顿2秒后 刷新了整个电脑屏幕 并关闭桌面打开的文件夹 右键无效果 放到文件夹里面尝试也不行 同时选中右键我的电脑展开点击管理 无反应 对应非快捷方式鼠标
  • RocketMQ源码(十三)—消费者DefaultMQPushConsumer启动主要流程源码

    此前我们学习了Broker和Producer的启动源码 以及Producer发送消息源码和Broker接收存储消息的源码 现在 我们来学习Consumer的启动以及消费消息的源码 Consumer的启动源码和Producer的启动源码还是有