RocketMQ Bug修复记录

2023-10-31

1、Bug详情及解决

1.1. Bug 来龙去脉

这是RocketMQ核心流程里面,BrokerServerNameServer发送心跳时的一个BUG。

首先呢?我们先看一段源代码。

public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {

        // 初始化一个List,存放每个NameServer注册结果的
        // 多线程 会有并发问题吧
        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
        /**
         * 之所以使用 Vector:
         * 线程安全的集和容器无非就3个:
         * 1、Collections.synchronizedList()方式:
         * 2、Vector()方式:适用于写多的场景
         * 3、CopyOnWriteArrayList()方式:适用于读多写少的场景
         *
         * 由于此处的场景只是用到了写,所以我们选择了 Vector()方式
         */
        // 获取 NameServer 地址列表
        List<String> nameServerAddressList =
                this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            // 构建请求头,在请求头里面放很多的信息,比如说 BrokerId 和 BrokerName
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);

            // 构建请求体,包含一些配置
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            // 使用CountDownLatch同步计数器,保证注册完全部的 NameServer之后才往下走,
            // 执行其他逻辑
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            // 遍历NameServer 地址列表,使用线程池去注册
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 调用 registerBroker 真正执行注册
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }

                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            // 注册完,执行 countDownLatch.countDown(); 同步计数器 -1
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                // 等待所有 NameServer 都注册完,才返回注册结果
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

        return registerBrokerResultList;
    }

Bug就是出现在这段代码里面。那到底是什么问题呢?其实就是 多线程并发修改 ArrayList可能会出现并发安全问题。首先我们看下面这段代码。

// 初始化一个List,存放每个NameServer注册结果的
// 多线程 会有并发问题吧
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();

他的底层是

  @GwtCompatible(serializable = true)
  public static <E> ArrayList<E> newArrayList() {
    return new ArrayList<E>();
  }

本质上就是一个ArrayList。ok下面我们看看他是如何被赋值的。

// 遍历NameServer 地址列表,使用线程池去注册
for (final String namesrvAddr : nameServerAddressList) {
    brokerOuterExecutor.execute(new Runnable() {
        @Override
        public void run() {
            try {
                // 调用 registerBroker 真正执行注册
                RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                if (result != null) {
                    registerBrokerResultList.add(result);
                }
                log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
            } catch (Exception e) {
                log.warn("registerBroker Exception, {}", namesrvAddr, e);
            } finally {
                // 注册完,执行 countDownLatch.countDown(); 同步计数器 -1
                countDownLatch.countDown();
            }
        }
    });
}

上述代码的逻辑很简单,就是遍历NameServer地址列表,使用线程池去NameServer注册。

所以

                if (result != null) {
                    registerBrokerResultList.add(result);
                }

并发修改ArrayList就会出现线程安全问题。这里我们可以搞一个简单的测试。

1.2. 验证这真的是一个BUG

首先我们按照上面的代码新建一个类。代码结构如下所示。

在这里插入图片描述

1.2.1 BrokerFixedThreadPoolExecutor
package com.example.test.arraylistrocketmq;

import java.util.concurrent.*;

/**
 * @author Chen
 * @version 1.0
 * @date 2020/7/20 9:29
 * @description:
 */

public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
                                         final TimeUnit unit,
                                         final BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
                                         final TimeUnit unit,
                                         final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
                                         final TimeUnit unit,
                                         final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime,
                                         final TimeUnit unit,
                                         final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory,
                                         final RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
        return new FutureTaskExt<T>(runnable, value);
    }
}

1.2.2 FutureTaskExt
package com.example.test.arraylistrocketmq;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

/**
 * @author Chen
 * @version 1.0
 * @date 2020/7/20 9:30
 * @description:
 */

public class FutureTaskExt<V> extends FutureTask<V> {
    private final Runnable runnable;

    public FutureTaskExt(final Callable<V> callable) {
        super(callable);
        this.runnable = null;
    }

    public FutureTaskExt(final Runnable runnable, final V result) {
        super(runnable, result);
        this.runnable = runnable;
    }

    public Runnable getRunnable() {
        return runnable;
    }
}

1.2.3 RegisterBrokerResult
package com.example.test.arraylistrocketmq;

import lombok.Data;

/**
 * @author Chen
 * @version 1.0
 * @date 2020/7/20 9:21
 * @description:
 */
@Data
public class RegisterBrokerResult {
    private String haServerAddr;
    private String masterAddr;
}

1.2.4 TestChen2
package com.example.test.arraylistrocketmq;

import com.google.common.collect.Lists;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Chen
 * @version 1.0
 * @date 2020/7/20 9:20
 * @description:
 */
public class TestChen2 {


    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    deal(finalI);
                }
            });
            thread.start();

            thread.join();
        }
    }

    public synchronized void deal(int index) {
        // 初始化一个List,存放每个NameServer注册结果的
        final List<RegisterBrokerResult> registerBrokerResultList =
                Lists.newArrayList();
        // 获取 NameServer 地址列表
        List<String> nameServerAddressList = new ArrayList<>();
        for (int j = 0; j < 10; j++) {
            nameServerAddressList.add("192.168.0." + j);
        }
        AtomicInteger atomicInteger = new AtomicInteger(0);
        // 执行其他逻辑
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());

        BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
                new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));

        // 遍历NameServer 地址列表,使用线程池去注册
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 调用 registerBroker 真正执行注册
                        RegisterBrokerResult result = new RegisterBrokerResult();
                        result.setHaServerAddr("chen" + atomicInteger.getAndIncrement());
                        Thread.sleep(100);
                        if (result != null) {
                            // 注册成功结果放到一个List里去
                            registerBrokerResultList.add(result);
                        }

                    } catch (Exception e) {
                        System.out.println("-----------wei---------------> " + index);
                        System.out.println(e);
                        System.out.println("-----------wei---------------> " + index);
                    } finally {
                        // 注册完,执行 countDownLatch.countDown(); 同步计数器 -1
                        countDownLatch.countDown();
                    }
                }
            });
        }

        try {
            // 等待所有 NameServer 都注册完,才返回注册结果
            countDownLatch.await(1000000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }

        System.out.println(registerBrokerResultList.size());
        System.out.println("-----------chen---------------------------------------> " + index);
    }
}

1.2.5 ThreadFactoryImpl
package com.example.test.arraylistrocketmq;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author Chen
 * @version 1.0
 * @date 2020/7/20 9:31
 * @description:
 */
public class ThreadFactoryImpl implements ThreadFactory {
    private final AtomicLong threadIndex = new AtomicLong(0);
    private final String threadNamePrefix;
    private final boolean daemon;

    public ThreadFactoryImpl(final String threadNamePrefix) {
        this(threadNamePrefix, false);
    }

    public ThreadFactoryImpl(final String threadNamePrefix, boolean daemon) {
        this.threadNamePrefix = threadNamePrefix;
        this.daemon = daemon;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
        thread.setDaemon(daemon);
        return thread;
    }
}

这里我将无关的代码都删除了,只保留了上述代码。我们运行TestChen2

运行结果如下:

com.example.test.arraylistrocketmq.TestChen2
10
-----------chen---------------------------------------> 0
10
-----------chen---------------------------------------> 1
10
-----------chen---------------------------------------> 2
10
-----------chen---------------------------------------> 3
10
-----------chen---------------------------------------> 4
10
-----------chen---------------------------------------> 5
10
-----------chen---------------------------------------> 6
10
-----------chen---------------------------------------> 7
10
-----------chen---------------------------------------> 8
10
-----------chen---------------------------------------> 9
10
-----------chen---------------------------------------> 10
9
-----------chen---------------------------------------> 11
10
-----------chen---------------------------------------> 12
9
-----------chen---------------------------------------> 13
10
-----------chen---------------------------------------> 14
10
-----------chen---------------------------------------> 15
10
-----------chen---------------------------------------> 16
10
-----------chen---------------------------------------> 17
10
-----------chen---------------------------------------> 18
10
-----------chen---------------------------------------> 19
10
-----------chen---------------------------------------> 20
9
-----------chen---------------------------------------> 21
10
-----------chen---------------------------------------> 22
10
-----------chen---------------------------------------> 23
10
-----------chen---------------------------------------> 24
10
-----------chen---------------------------------------> 25
10
-----------chen---------------------------------------> 26
9
-----------chen---------------------------------------> 27
10
-----------chen---------------------------------------> 28
10
-----------chen---------------------------------------> 29
10
-----------chen---------------------------------------> 30
10
-----------chen---------------------------------------> 31
10
-----------chen---------------------------------------> 32
10
-----------chen---------------------------------------> 33
10
-----------chen---------------------------------------> 34
10
-----------chen---------------------------------------> 35
10
-----------chen---------------------------------------> 36
10
-----------chen---------------------------------------> 37
10
-----------chen---------------------------------------> 38
10
-----------chen---------------------------------------> 39
10
-----------chen---------------------------------------> 40
10
-----------chen---------------------------------------> 41
10
-----------chen---------------------------------------> 42
6
-----------chen---------------------------------------> 43
10
-----------chen---------------------------------------> 44
10
-----------chen---------------------------------------> 45
9
-----------chen---------------------------------------> 46
8
-----------chen---------------------------------------> 47
10
-----------chen---------------------------------------> 48
10
-----------chen---------------------------------------> 49
10
-----------chen---------------------------------------> 50
10
-----------chen---------------------------------------> 51
9
-----------chen---------------------------------------> 52
10
-----------chen---------------------------------------> 53
10
-----------chen---------------------------------------> 54
10
-----------chen---------------------------------------> 55
10
-----------chen---------------------------------------> 56
10
-----------chen---------------------------------------> 57
10
-----------chen---------------------------------------> 58
10
-----------chen---------------------------------------> 59
10
-----------chen---------------------------------------> 60
10
-----------chen---------------------------------------> 61
10
-----------chen---------------------------------------> 62
10
-----------chen---------------------------------------> 63
10
-----------chen---------------------------------------> 64
10
-----------chen---------------------------------------> 65
10
-----------chen---------------------------------------> 66
8
-----------chen---------------------------------------> 67
10
-----------chen---------------------------------------> 68
10
-----------chen---------------------------------------> 69
10
-----------chen---------------------------------------> 70
8
-----------chen---------------------------------------> 71
10
-----------chen---------------------------------------> 72
10
-----------chen---------------------------------------> 73
10
-----------chen---------------------------------------> 74
10
-----------chen---------------------------------------> 75
10
-----------chen---------------------------------------> 76
10
-----------chen---------------------------------------> 77
10
-----------chen---------------------------------------> 78
10
-----------chen---------------------------------------> 79
10
-----------chen---------------------------------------> 80
10
-----------chen---------------------------------------> 81
10
-----------chen---------------------------------------> 82
10
-----------chen---------------------------------------> 83
10
-----------chen---------------------------------------> 84
10
-----------chen---------------------------------------> 85
9
-----------chen---------------------------------------> 86
10
-----------chen---------------------------------------> 87
10
-----------chen---------------------------------------> 88
10
-----------chen---------------------------------------> 89
10
-----------chen---------------------------------------> 90
9
-----------chen---------------------------------------> 91
10
-----------chen---------------------------------------> 92
10
-----------chen---------------------------------------> 93
10
-----------chen---------------------------------------> 94
10
-----------chen---------------------------------------> 95
10
-----------chen---------------------------------------> 96
9
-----------chen---------------------------------------> 97
9
-----------chen---------------------------------------> 98
10
-----------chen---------------------------------------> 99

Process finished with exit code 0

我们发现并不是每次 registerBrokerResultList的大小都是 10,也就是出现了并发安全问题。这里之所以在测试的时候在deal方法前面加上synchronized,是因为调用registerBrokerAll方法的函数是synchronized。但是public synchronized void registerBrokerAll() 这段代码只是让下面这段代码

 // 启动了一个定时调度的任务,让他去给 NameServer 进行注册
        // 默认 30s 发送一次
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    // 将自己注册到NameServer
                    BrokerController.this.
                            registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

线程池里面的线程安全。也就是说 ,他只是保证了定时任务不同批次任务的线程安全。但是仍然无法保证定时任务同一个批次内的线程安全。比如说:某次发送心跳。Broker需要向 10个NameServer 发送心跳。public List<RegisterBrokerResult> registerBrokerAll方法是开启10个线程去发送心跳,如果此时同时返回结果,同时修改registerBrokerResultList这个ArrayList仍然是有并发安全问题的。

public synchronized void registerBrokerAll() 这段代码只是保证 比如说 时刻 0 和时刻 30 。他们发送的心跳没有并发安全问题。

1.3. 修复BUG

其实解决并发安全问题我们也是有几种解决方案的。比如用线程安全的集和容器,而线程安全的集和容器无非就3个:

1、Collections.synchronizedList()方式:
2、Vector()方式:适用于写多的场景
3、CopyOnWriteArrayList()方式:适用于读多写少的场景

但是直接使用上述方案,锁太粗,不利于高并发,所以我们只要在list.add()方法上加个锁就行了。即修改TestChen2代码如下:

1.3.1 TestChen2
package com.example.test.arraylistrocketmq;

import com.google.common.collect.Lists;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Chen
 * @version 1.0
 * @date 2020/7/20 9:20
 * @description:
 */
public class TestChen2 {


    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    deal(finalI);
                }
            });
            thread.start();

            thread.join();
        }
    }

    public synchronized void deal(int index) {
        // 初始化一个List,存放每个NameServer注册结果的
        final List<RegisterBrokerResult> registerBrokerResultList =
                Lists.newArrayList();
        // 获取 NameServer 地址列表
        List<String> nameServerAddressList = new ArrayList<>();
        for (int j = 0; j < 10; j++) {
            nameServerAddressList.add("192.168.0." + j);
        }
        AtomicInteger atomicInteger = new AtomicInteger(0);
        // 执行其他逻辑
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());

        BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
                new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));

        // 遍历NameServer 地址列表,使用线程池去注册
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 调用 registerBroker 真正执行注册
                        RegisterBrokerResult result = new RegisterBrokerResult();
                        result.setHaServerAddr("chen" + atomicInteger.getAndIncrement());
                        Thread.sleep(10);
                        if (result != null) {
                            // 注册成功结果放到一个List里去
                            // 锁细化
                            // make ArrayList thread safe, here we don't need to
                            // use Vector or Collections.synchronizedList() or CopyOnWriteArrayList
                            synchronized (registerBrokerResultList) {
                                registerBrokerResultList.add(result);
                            }
                        }

                    } catch (Exception e) {
                        System.out.println("-----------wei---------------> " + index);
                        System.out.println(e);
                        System.out.println("-----------wei---------------> " + index);
                    } finally {
                        // 注册完,执行 countDownLatch.countDown(); 同步计数器 -1
                        countDownLatch.countDown();
                    }
                }
            });
        }

        try {
            // 等待所有 NameServer 都注册完,才返回注册结果
            countDownLatch.await(1000000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
        }

        System.out.println(registerBrokerResultList.size());
        System.out.println("-----------chen---------------------------------------> " + index);
    }
}

核心的就是修改:

if (result != null) {
    // 注册成功结果放到一个List里去
    // 锁细化
    // make ArrayList thread safe, here we don't need to
    // use Vector or Collections.synchronizedList() or CopyOnWriteArrayList
    synchronized (registerBrokerResultList) {
        registerBrokerResultList.add(result);
    }
}

ok ,保存之后再运行一遍。运行结果如下:

10
-----------chen---------------------------------------> 0
10
-----------chen---------------------------------------> 1
10
-----------chen---------------------------------------> 2
10
-----------chen---------------------------------------> 3
10
-----------chen---------------------------------------> 4
10
-----------chen---------------------------------------> 5
10
-----------chen---------------------------------------> 6
10
-----------chen---------------------------------------> 7
10
-----------chen---------------------------------------> 8
10
-----------chen---------------------------------------> 9
10
-----------chen---------------------------------------> 10
10
-----------chen---------------------------------------> 11
10
-----------chen---------------------------------------> 12
10
-----------chen---------------------------------------> 13
10
-----------chen---------------------------------------> 14
10
-----------chen---------------------------------------> 15
10
-----------chen---------------------------------------> 16
10
-----------chen---------------------------------------> 17
10
-----------chen---------------------------------------> 18
10
-----------chen---------------------------------------> 19
10
-----------chen---------------------------------------> 20
10
-----------chen---------------------------------------> 21
10
-----------chen---------------------------------------> 22
10
-----------chen---------------------------------------> 23
10
-----------chen---------------------------------------> 24
10
-----------chen---------------------------------------> 25
10
-----------chen---------------------------------------> 26
10
-----------chen---------------------------------------> 27
10
-----------chen---------------------------------------> 28
10
-----------chen---------------------------------------> 29
10
-----------chen---------------------------------------> 30
10
-----------chen---------------------------------------> 31
10
-----------chen---------------------------------------> 32
10
-----------chen---------------------------------------> 33
10
-----------chen---------------------------------------> 34
10
-----------chen---------------------------------------> 35
10
-----------chen---------------------------------------> 36
10
-----------chen---------------------------------------> 37
10
-----------chen---------------------------------------> 38
10
-----------chen---------------------------------------> 39
10
-----------chen---------------------------------------> 40
10
-----------chen---------------------------------------> 41
10
-----------chen---------------------------------------> 42
10
-----------chen---------------------------------------> 43
10
-----------chen---------------------------------------> 44
10
-----------chen---------------------------------------> 45
10
-----------chen---------------------------------------> 46
10
-----------chen---------------------------------------> 47
10
-----------chen---------------------------------------> 48
10
-----------chen---------------------------------------> 49
10
-----------chen---------------------------------------> 50
10
-----------chen---------------------------------------> 51
10
-----------chen---------------------------------------> 52
10
-----------chen---------------------------------------> 53
10
-----------chen---------------------------------------> 54
10
-----------chen---------------------------------------> 55
10
-----------chen---------------------------------------> 56
10
-----------chen---------------------------------------> 57
10
-----------chen---------------------------------------> 58
10
-----------chen---------------------------------------> 59
10
-----------chen---------------------------------------> 60
10
-----------chen---------------------------------------> 61
10
-----------chen---------------------------------------> 62
10
-----------chen---------------------------------------> 63
10
-----------chen---------------------------------------> 64
10
-----------chen---------------------------------------> 65
10
-----------chen---------------------------------------> 66
10
-----------chen---------------------------------------> 67
10
-----------chen---------------------------------------> 68
10
-----------chen---------------------------------------> 69
10
-----------chen---------------------------------------> 70
10
-----------chen---------------------------------------> 71
10
-----------chen---------------------------------------> 72
10
-----------chen---------------------------------------> 73
10
-----------chen---------------------------------------> 74
10
-----------chen---------------------------------------> 75
10
-----------chen---------------------------------------> 76
10
-----------chen---------------------------------------> 77
10
-----------chen---------------------------------------> 78
10
-----------chen---------------------------------------> 79
10
-----------chen---------------------------------------> 80
10
-----------chen---------------------------------------> 81
10
-----------chen---------------------------------------> 82
10
-----------chen---------------------------------------> 83
10
-----------chen---------------------------------------> 84
10
-----------chen---------------------------------------> 85
10
-----------chen---------------------------------------> 86
10
-----------chen---------------------------------------> 87
10
-----------chen---------------------------------------> 88
10
-----------chen---------------------------------------> 89
10
-----------chen---------------------------------------> 90
10
-----------chen---------------------------------------> 91
10
-----------chen---------------------------------------> 92
10
-----------chen---------------------------------------> 93
10
-----------chen---------------------------------------> 94
10
-----------chen---------------------------------------> 95
10
-----------chen---------------------------------------> 96
10
-----------chen---------------------------------------> 97
10
-----------chen---------------------------------------> 98
10
-----------chen---------------------------------------> 99

Process finished with exit code 0

从结果我们发现我们的方法是线程安全的。ok,我们修改RocketMQ中的源码

public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {

        // 初始化一个List,存放每个NameServer注册结果的
        // 多线程 会有并发问题吧
        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
        /**
         * 之所以使用 Vector:
         * 线程安全的集和容器无非就3个:
         * 1、Collections.synchronizedList()方式:
         * 2、Vector()方式:适用于写多的场景
         * 3、CopyOnWriteArrayList()方式:适用于读多写少的场景
         *
         * 由于此处的场景只是用到了写,所以我们选择了 Vector()方式
         */
        // 获取 NameServer 地址列表
        List<String> nameServerAddressList =
                this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            // 构建请求头,在请求头里面放很多的信息,比如说 BrokerId 和 BrokerName
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);

            // 构建请求体,包含一些配置
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            // 使用CountDownLatch同步计数器,保证注册完全部的 NameServer之后才往下走,
            // 执行其他逻辑
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            // 遍历NameServer 地址列表,使用线程池去注册
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 调用 registerBroker 真正执行注册
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if (result != null) {
                                // 注册成功结果放到一个List里去
                                // 锁细化
                                // make ArrayList thread safe, here we don't need to
                                // use Vector or Collections.synchronizedList() or CopyOnWriteArrayList
                                synchronized (registerBrokerResultList) {
                                    registerBrokerResultList.add(result);
                                }

                            }

                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            // 注册完,执行 countDownLatch.countDown(); 同步计数器 -1
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                // 等待所有 NameServer 都注册完,才返回注册结果
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

        return registerBrokerResultList;
    }

主要就是修改:

在这里插入图片描述

这里是我在GitHub上提交的Issues https://github.com/apache/rocketmq/issues/2170

2、总结

  开源的代码直接运行绝大部分是对的,但是总有一些软件的更新使得作者无能为力。之前的API是对的,但是之后就废弃了或修改了是常有的事。所以我们需要跟踪源代码。这只是一个小小的问题,如果没有前辈的无私奉献,很难想象我们自己一天能学到多少内容。感谢各位前辈的辛勤付出,让我们少走了很多的弯路!

点个赞再走呗!欢迎留言哦!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RocketMQ Bug修复记录 的相关文章

  • Android:如何暂停和恢复可运行线程?

    我正在使用 postDelayed 可运行线程 当我按下按钮时 我需要暂停并恢复该线程 请任何人帮助我 这是我的主题 protected void animation music6 music4 postDelayed new Runnab
  • “源兼容性”和“目标兼容性”有什么区别?

    之间有什么关系 区别sourceCompatibility and targetCompatibility 当它们设置为不同的值时会发生什么 根据工具链和兼容性 https docs gradle org current userguide
  • 在Windows Server 2003下如何在本地系统帐户下运行jvisualvm.exe?

    我在带有 Java 1 6 u 20 的 Windows Server 2003 下将 GlassFish 3 0 1 作为 Windows 服务运行 总体上我很满意 我希望能够在这个 JVM 上使用 VisualVM 并使用无法在 Tom
  • 如何以编程方式使用包含多列的 where-in 子句执行 PostgreSQL 查询?

    我的查询是这样的 select from plat customs complex where code t code s in 01013090 10 01029010 90 它在 psql 控制台中运行良好 我的问题是如何在客户端代码中
  • 以相反的顺序打印任何集合中的项目?

    我在 使用 Java 进行数据结构和问题解决 一书中遇到以下问题 编写一个例程 使用 Collections API 以相反的顺序打印任何 Collection 中的项目 不要使用 ListIterator 我不会把它放在这里 因为我想让有
  • Java LostFocus 和 InputVerifier,按反向制表符顺序移动

    我有一个 GUI 应用程序 它使用 InputVerifier 在产生焦点之前检查文本字段的内容 这都是很正常的 然而 昨天发现了一个问题 这似乎是一个错误 但我在任何地方都找不到任何提及它的地方 在我将其报告为错误之前 我想我应该问 我在
  • 查看Java Agent修改的Java类的源代码

    我需要了解 Java 代理如何修改我的初始类 以便我能够理解代码的作用 build gradle configurations jar archiveName agent2 jar jar manifest attributes Prema
  • 通过Zuul上传大文件

    我在通过 zuul 上传大文件时遇到问题 我正在使用 apache commons 文件上传 https commons apache org proper commons fileupload https commons apache o
  • JOOQ 忽略具有默认值的数据库列

    看来JOOQ完全忽略了数据库列的默认值 既不会更新 ActiveRecord 对象 也不会在 INSERT 时跳过此列 相反 它尝试将其设置为 NULL 这在 NOT NULL 列上失败 Example CREATE TABLE bug f
  • 为什么 jar 执行的通配符在 docker CMD 中不起作用?

    我有一个Dockerfile与以下CMD启动我的 Spring Boot 应用程序 FROM java 8 jre CMD java jar app file jar 当我尝试从创建的图像启动容器时 我得到 Error Unable to
  • 在光标所在行强制关闭!

    嘿 我正在尝试创建一个应用程序来查找存储在 SQlite 数据库中的 GPS 数据 但我面临一个问题 我构建了一个 DbAdapter 类来创建数据库 现在我尝试使用以下函数从另一个类获取所有数据上的光标 public Cursor fet
  • Mockito 和 Hamcrest:如何验证 Collection 参数的调用?

    我遇到了 Mockito 和 Hamcrest 的泛型问题 请假设以下界面 public interface Service void perform Collection
  • 为什么解析这个 JSON 会抛出错误?

    我正在尝试解析这个 JSONObject query yahoo count 1 results rate Name USD INR id USDINR Time 12 19pm Date 10 31 2015 Bid 65 405 Ask
  • 如何在 IntelliJ IDEA 中运行 akka actor

    来自 Akka 网站文档 然后 这个主要方法将创建所需的基础设施 运行演员 启动给定的主要演员并安排 一旦主要参与者终止 整个应用程序就会关闭 因此 您将能够使用类似于以下的命令运行上面的代码 下列的 java classpath akka
  • 文本视图不显示全文

    我正在使用 TableLayout 和 TableRow 创建一个简单的布局 其中包含两个 TextView 这是代码的一部分
  • Android ScrollView,检查当前是否滚动

    有没有办法检查标准 ScrollView 当前是否正在滚动 方向是向上还是向下并不重要 我只需要检查它当前是否正在滚动 ScrollView当前形式不提供用于检测滚动事件的回调 有两种解决方法可用 1 Use a ListView并实施On
  • 确定 JavaFX 中是否消耗了事件

    我正在尝试使用 JavaFX 中的事件处理来做一些非滑雪道的事情 我需要能够确定手动触发事件后是否已消耗该事件 在以下示例中 正确接收了合成鼠标事件 但调用 Consumer 不会更新该事件 我对此进行了调试 发现 JavaFX 实际上创建
  • 使用 DBCP 配置 Tomcat

    在闲置一段时间 几个小时 后 我们收到了 CommunicationsException 来自 DBCP 错误消息 在异常中 位于这个问题的末尾 但我没有看到任何配置文件中定义的 wait timeout 我们应该看哪里 在 tomcat
  • Java 的“&&”与“&”运算符

    我使用的示例来自 Java Herbert Schildt 的完整参考文献 第 12 版 Java 是 14 他给出了以下 2 个示例 如果阻止 第一个是好的 第二个是错误的 因此发表评论 public class PatternMatch
  • Spring 作为 JNDI 提供者?

    我想使用 Spring 作为 JNDI 提供程序 这意味着我想在 Spring 上下文中配置一个 bean 可以通过 JNDI 访问该 bean 这看起来像这样

随机推荐

  • hql取满足条件最新一条记录_统计学习方法 - 序列最小最优化算法(SMO)解析

    本文包括 支持向量机的低效问题 序列最小最优化算法 SMO 的思路 两个变量二次规划的求解方法 变量的选择方法 精度的概念 其它有关数据分析 机器学习的文章及社群 1 支持向量机的低效问题 我们知道支持向量机的拉格朗日乘数法对偶形式的外部最
  • JavaScript基础篇

    JavaScript基础篇 一 介绍 1 JavaScript是一种专门在浏览器编译并执行的编程语言 2 JavaScript主要处理用户与浏览器之间请求问题 3 JavaScript采用 弱类型编程语言风格 对 面向对象思想 来进行实现的
  • Linux基础之常用操作

    这里介绍的是一些非常基本的命令 在linux管理中经常用到 包括用户创建 文件操作 目录操作 vim文本编辑等等 用户切换与创建 whoami命令 用于显示自身用户名称 root linux00 whoami root su命令 用于切换用
  • GIT的使用以及分支的讲解

    文章目录 前言 一 GIT是什么 二 Git的使用 1 在本地初始化一个本地仓库 2 工作区到暂存区 使用流程 3 暂存区到历史区 使用流程 4 文件夹操作 三 Git的分支 1 概述 2 命名规范 3 分支的操作 总结 前言 掌握GIT的
  • 网站存活,ip反查,权重备案查询(方法)

    常用漏洞库 佩奇漏洞文库 https www yuque com peiqiwiki peiqi poc wiki http wiki peiqi tech 白阁漏洞文库 https wiki bylibrary cn E6 BC 8F E
  • 关于CS模式和P2P模式分发文件速度的思考

    cs模式 看到这里我首先想到是 难道不是NF us F min di 吗 然后我想了一会 分发文件并不是先上传再下载 而是一个报文一个报文的上传再一个报文一个报文的下载 也就是说 这边刚上传第一个报文 另一边就开始下载 所以几乎是同时开始上
  • 排序算法(4)----快速排序

    快速排序由C A R Hoare在1962年提出 它的基本思想是 通过一趟排序将要排序的数据分割成独立的两部分 其中一部分的所有数据都比另外一部分的所有数据都要小 然后再按此方法对这两部分数据分别进行快速排序 整个排序过程可以递归进行 以此
  • 基于Java+SpringBoot+vue的租房网站设计与实现(附源码,使用教程)

    基于Java SpringBoot vue的租房网站设计与实现 文章目录 基于Java SpringBoot vue的租房网站设计与实现 一 前言介绍 二 主要技术 三 系统设计 部分 3 1 主要功能模块设计 3 2 系统登录设计 四 数
  • 【Vscode】远程内存占用大

    查看远程服务器上的扩展 依次删除 重新连接后观察内存占用 此扩展占用较高 约2G 前后端项目 依赖较多导致
  • “三天打鱼,两天晒网“的c语言实现

    中国有句俗话叫 三天大鱼 两天晒网 某渔夫从2000年1月1日开始 三天打鱼 两天晒网 问该渔夫在以后的某一天中是在 打鱼 还是在 晒网 需求 用户输入某年某月某日 判断出该日期是在打鱼还是在晒网 思路 1 接收键盘输入的日期 2 计算从2
  • [靶场] SQLi-Labs Less62-Less69

    66 Less62 请求方式 注入方式 备注 GET 盲注 130次语句以内完成 分析 我们需要指定challenges数据库中表名 表名为10个字符 包含数字和小写字母 还需要知道表中的字段名 字段名为secret XX XX为4个字符
  • LDO的原理以及重要指标

    http t csdn cn YaR0G 本文告诉你三件事 LDO的基本原理 LDO都有哪些参数 有什么意义 选型时的注意事项 1 LDO基本原理 LDO是Low Dropout Regulator的缩写 意思是低压差线性稳压器 低压差 是
  • 深度学习中的常用八种卷积运算简介

    参考资料 https towardsdatascience com a comprehensive introduction to different types of convolutions in deep learning 66928
  • iOS 14 自定义画中画悬浮窗 Custom AVPictureInPictureController 实现方案

    iOS 14 基于 AVPictureInPictureController 实现自定义画中画 涵盖所有功能与难点 市面上的各种悬浮钟和提词器的原理都是基于此 Demo源码在文末 使用 iOS 画中画的要求 真机 不能使用模拟器 iOS 1
  • 重构Webpack系列之二 ---- 入口起点

    重构Webpack系列之二 入口起点 一 概念 入口起点 entry point 指示Webpack应该使用哪个模块 来作为构建其内部依赖图的开始 进入入口起点后 Webpack会找出哪些模块和库是跟入口起点 直接或间接 有依赖的关系 默认
  • 开源云原生与行业应用

    ChinaOSC 2022开源云原生与行业应用论坛将于2022年8月21日13 30 17 15在陕西省西安高新国际会议中心召开 本论坛将邀请多位知名开源云原生领域的贡献者 实践者 分享和探讨开源云原生赋能产品迭代 行业应用创新的最佳实践
  • json格式请求http

    例子 JsonObject response postJsonData url gson toJson applyInfo applyInfo 一个java对象 发送 post 请求 param url 地址 return link Jso
  • Python---数据清洗

    首先导入数据 并读取前5行 然后处理店铺数据 清洗comment字段 先筛选出有 条 字的评论 再用spilt分割数据 提取评论条数后转换数据类型 用的是astype转换 并查看前5行 清洗其他字段也是一样的处理 比如清洗人均价格这个字段
  • el-table选中数据勾选状态不清空的做法&&监听拿到勾选的所有数据

    1 el table结合el pagination 在翻页之后再返回勾选的数据状态会消失 解决办法
  • RocketMQ Bug修复记录

    文章目录 1 Bug详情及解决 1 1 Bug 来龙去脉 1 2 验证这真的是一个BUG 1 2 1 BrokerFixedThreadPoolExecutor 1 2 2 FutureTaskExt 1 2 3 RegisterBroke