RocketMQ的消费者,在订阅topic的时候需要遵循“订阅关系一致性”原则,即:一个消费者分组(group)下的所有消费者实例的处理逻辑必须一致,一旦订阅关系不一致就会导致消费混乱,甚至消息丢失。对大多数分布式应用来说,一个group下通常会挂有多个consumer实例。由于RocketMq的消费者订阅关系由Topic+Tag组成,因此保持订阅一致就意味着,所有consumer实例需要保证:
- 订阅的topic必须一致
- 订阅topic中的tag必须一致
通俗的讲就是一个消费者组GroupA,有consumerA和consumerB,消费者A订阅了topicA、tagA,消费者B订阅了topicB、tagB就会导致订阅不一致问题。
不一致原因解析
问题一:订阅消息相互覆盖
消费者的信息在broker里面是通过一个map存储的,key是groupName,value是组的信息
//org.apache.rocketmq.broker.client.ConsumerManager
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
ConsumerGroupInfo里面存储了该group里面订阅的topic信息,同样使用map存储,key是topic,所以当相同group下面的消费者,订阅的topic如果不一致,就会覆盖map里面的值。
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>();
下面我们来看看具体的源码实现:当我们启动消费者的时候(调用DefaultMQPushConsumer.start()方法),会启动MQ客户端。
//DefaultMQPushConsumerImpl.start()
///
mQClientFactory.start();
mq客户端会启动心跳发送线程,定时向broker发送心跳信息。
//MQClientInstance.startScheduledTask()
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//清理离线broker
MQClientInstance.this.cleanOfflineBroker();
//发送心跳信息到broker
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
重点看这个sendHeartbeatToAllBrokerWithLock方法,点进去之后继续看sendHeartbeatToAllBroker()方法
private void sendHeartbeatToAllBroker() {
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
//如果消费者和生产者信息为空,直接返回
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer");
return;
}
if (!this.brokerAddrTable.isEmpty()) {
//记录了发送心跳的次数
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
//向broker心跳。
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr);
}
}
}
}
}
}
}
}
发送心跳代码this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);一个broker地址的参数。一个心跳信息,一个超时时间。
看看heartbeatData传了什么数据
public class HeartbeatData extends RemotingSerializable {
// 客户端id
private String clientID;
//生产者信息 里面只存了一个groupName
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
//消费者信息,存储了groupName,消费类型(pull,push),消息模式(集群、广播),以及订阅的topic、tag、version的信息
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
//get set省略
}
心跳信息发送给broker,我们再来看看broker对心跳信息的处理逻辑
在rocketMQ的broker模块下ClientManageProcessor.heartBeat方法
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
//心跳信息解码
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
//循环消费者集合
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
//通过groupName获取订阅组配置信息
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
//注册消费者信息
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
继续看registerConsumer这个方法,通过groupName获取到消费者组信息后,去更新消费这订阅信息updateSubscription,如果订阅信息变更了,broker通知consumer,Id列表发生了变化
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
//获取消费者组信息。
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
//更新消费者订阅信息
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
//通知Consumer,Id列表发生变化
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
}
更新订阅者信息就在consumerGroupInfo.updateSubscription这个方法里面了,再点进去看看
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
//步骤一
for (SubscriptionData sub : subList) {
//根据topic在订阅表中查找
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
//订阅表找不到,就将新的订阅信息放入订阅表
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
if (null == prev) {
updated = true;
log.info("subscription changed, add new topic, group: {} {}",
this.groupName,
sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) {
//如果新的版本比老的版本大,更新订阅信息 || 版本号是一个时间戳,也就是后来的总比以前的大
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
);
}
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
//步骤二
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
String oldTopic = next.getKey();
boolean exist = false;
//对比当前的订阅信息表和心跳包传过来的最新订阅信息,找到老的中存在,心跳包中不存在的订阅信息,把它删掉
for (SubscriptionData sub : subList) {
if (sub.getTopic().equals(oldTopic)) {
exist = true;
break;
}
}
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",
this.groupName,
oldTopic,
next.getValue().toString()
);
it.remove();
updated = true;
}
}
this.lastUpdateTimestamp = System.currentTimeMillis();
return updated;
}
更新方法循环心跳包发送过来的订阅信息,先在本地的(也就是老的)订阅表中根据topic查找,找不到就说明是新增的,将新的订阅信息加入订阅表。如果能找到,接着在比较两者的version,version是个时间戳,新来的肯定比老的version大,也就是新来的都会把老的给覆盖掉。接着步骤二中,对比现在更新后的订阅表,和心跳包发送的心跳信息,将订阅表中存在但心跳包不存在的订阅信息删除,也就会保证更新后的订阅表信息和心跳包的信息一致。接着说一下Tag,虽说我们的的订阅表是以topic为key存储的,但是value每次也会根据version去更新,tag不一样同样每次也会被覆盖。
问题二:消息负载均衡后导致消息延迟消费,甚至报错The consumer’s subscription not exist
同样的在启动消费者客户端的时候也会启动负载均衡线程,一层层跟进:
//MQClientInstance.start()
this.rebalanceService.start();
//RebalanceService继承了ServiceThread,继续找到RebalanceService.run()方法找到
this.mqClientFactory.doRebalance();
//继续跟进找到RebalanceImpl.doRebalance
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//关键方法
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
doRebalance首先找到消费者客户端的订阅消息表,然后遍历订阅消息表,以topic为单位进行负载均衡,再看this.rebalanceByTopic(topic, isOrder);我们以集群消费模式为例来看
case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//根据策略进行分配
allocateResult = strategy.allocate(//
this.consumerGroup, //
this.mQClientFactory.getClientId(), //
mqAll, //
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
首先获取到topic对应的队列信息。然后找到所有消费者id集合,看方法this.mQClientFactory.findConsumerIdList(topic, consumerGroup);点进去看,它是通过topic找到找到所有broker集群信息,然后随机取了一个broker地址,再通过这个broker地址和groupName获取到所有消费者Id的集合。我们前面说过,客户端开启时会向所有broker发送心跳包,所以随便获取一个broker就可以了。接着看strategy.allocate消息队列分配算法,RocketMq实现了五种负载均衡算法,默认使用AllocateMessageQueueAveragely平均分配算法。平均分配算法也很好理解,获取到了所有的该topic下的队列和所有订阅该topic的消费者个数,取模算出每个消费者分配多少个队列,例如:topicA下面有8个messageQueue,有两个消费者consumerA和consumerB订阅该topic,负载均衡后,每个consumer分配四个messageQueue。这时候就会有个问题,在groupA下面有两个消费者客户端A和B,A订阅了topicA,B订阅了topicB,如果topicA下有8个消息队列,topicA经过负载均衡后,会分配给groupA下面的两个消费者客户端A和B各四个消息队列,然后B并没有订阅topicA,却被分配了topicA的消息队列,这样topicA就有一半的消息消费不到,可能需要延迟一段时间重新分配后才能消费。
继续往下,通过负载均衡之后得到队列集合allocateResult,根据allocateResult更新processQueueTable处理队列表。
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
生成PullRequest之后放入pullRequestQueue消息拉取请求队列中。
另外在客户端启动的时候也会启动消息拉取线程
//MQclientInstance.start()
this.pullMessageService.start();
//它的run方法
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) { //没有停止
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest); //一直循环发拉取消息的请求,过程中被pullRequestQueue阻塞队列阻塞。
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
消息拉取线程,不断的在pullRequestQueue获取PullRequest,然后向broker发送拉取消息的请求。现假设有GroupA,下面有两个consumer,A和B。两个consumer分别订阅了TopicA和topicB,A先向broker发送心跳包,注册了topicA的订阅,并且A的pullRequest已经添加到pullRequestQueue中,而此时B也向broker发送心跳包,将GroupA下面的订阅信息更新为topicB。这是A继续通过A的pullRequestQueue中的pullRequest去向broker发送拉取消息的请求。这时broker已经找不到topicA的订阅信息,被B覆盖了。就会出现The consumer’s subscription not exist 的错误
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
return response;
}