背景
rocketmq支持顺序消费,是很多业务中要用的一个场景,我就好奇他是怎么实现的,需要了解背后的原理,是怎么支持顺序消费的,这样有问题的时候我们才能快速的定位问题,这是一个合格的架构师必备的能力。
分配MessageQueue
rocketmq 在启动消费时,会对topic的mq进行reblance,如果是新分配的message queue,如果是顺序消费,即isorder为true。则需要先对该
message queue 获取分布式锁,获取成功才能真正开始消费,代码入心:
boolean allMQLocked = true;
List<PullRequest> pullRequestList = new ArrayList<>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//新分配的message queue 如果是顺序消费,需要先获取锁,获取成功
//则创建messagequeue 开始拉起数据,否则不能消费给mq。
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
//如果获取失败,则不消费这个mq。
allMQLocked = false;
continue;
}
//如果是顺序消费,只有获取成功,才开始消费的准备工作。
this.removeDirtyOffset(mq);
ProcessQueue pq = createProcessQueue(topic);
pq.setLocked(true);
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
获取锁
获取锁的代码不需要看,我们只需要关心下请求参数即可,因为关键实现在broker端:
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
顺序消费获取锁的代码可用看出,需要告诉broker端三个参数:
- consumer group 消费分组。
- 客户端id,即consumer的标识
- mq,即message queue 是对那个queue的顺序消费。
请求类型是LOCK_BATCH_MQ,broker server 会用默认的processor来处理这个请。如果没有获取到锁,则lockedMq是空的,没有直,则返回false,所以接下来,我们看下服务端是怎么做的,来保证这个顺序消费。
Broker锁实现
broker server 处理LOCK_BATCH_MQ
的请求时通过defaultRequestProcessorPair来负责处理,defaultRequestProcessorPair是AdminBrokerProcessor,实现逻辑在lockBatchMQ方法,代码如下:
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
Set<MessageQueue> lockOKMQSet = new HashSet<>();
//根据group和mq,尝试对没有被其他consumer锁定会加锁,只有没有枷锁的messagequeue,或者其他的锁已经过期了,才能上锁。
//selfLockOKMQSet 是成功获取锁的message queue
Set<MessageQueue> selfLockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
requestBody.getConsumerGroup(),
requestBody.getMqSet(),
requestBody.getClientId());
//看是否要请求其他的server,客户端发起的时false,broker发起的是true
if (requestBody.isOnlyThisBroker() || !brokerController.getBrokerConfig().isLockInStrictMode()) {
lockOKMQSet = selfLockOKMQSet;
} else {
//设置OnlyThisBroker为true,让其他的server接到请求时不再请求其他的server了
requestBody.setOnlyThisBroker(true);
//获取副本数
int replicaSize = this.brokerController.getMessageStoreConfig().getTotalReplicas();
//计算过半quorum
int quorum = replicaSize / 2 + 1;
if (quorum <= 1) {
//如果就一个,则不需要再请求其他的broker server
lockOKMQSet = selfLockOKMQSet;
} else {
//有多个副本,对所有broker尝试加锁。
final ConcurrentMap<MessageQueue, Integer> mqLockMap = new ConcurrentHashMap<>();
//先对本地加锁的mq 标记为1
for (MessageQueue mq : selfLockOKMQSet) {
if (!mqLockMap.containsKey(mq)) {
mqLockMap.put(mq, 0);
}
mqLockMap.put(mq, mqLockMap.get(mq) + 1);
}
BrokerMemberGroup memberGroup = this.brokerController.getBrokerMemberGroup();
if (memberGroup != null) {
Map<Long, String> addrMap = new HashMap<>(memberGroup.getBrokerAddrs());
addrMap.remove(this.brokerController.getBrokerConfig().getBrokerId());
final CountDownLatch countDownLatch = new CountDownLatch(addrMap.size());
requestBody.setMqSet(selfLockOKMQSet);
requestBody.setOnlyThisBroker(true);
for (Long brokerId : addrMap.keySet()) {
try {
this.brokerController.getBrokerOuterAPI().lockBatchMQAsync(addrMap.get(brokerId),
requestBody, 1000, new LockCallback() {
@Override
public void onSuccess(Set<MessageQueue> lockOKMQSet) {
for (MessageQueue mq : lockOKMQSet) {
if (!mqLockMap.containsKey(mq)) {
mqLockMap.put(mq, 0);
}
//加锁成功,对加锁次数加1
mqLockMap.put(mq, mqLockMap.get(mq) + 1);
}
countDownLatch.countDown();
}
@Override
public void onException(Throwable e) {
LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
countDownLatch.countDown();
}
});
} catch (Exception e) {
LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
countDownLatch.countDown();
}
}
try {
countDownLatch.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("lockBatchMQ exception on {}, {}", this.brokerController.getBrokerConfig().getBrokerName(), e);
}
}
//计算哪些mq是成功实现过半加锁的,返回给客户端
for (MessageQueue mq : mqLockMap.keySet()) {
if (mqLockMap.get(mq) >= quorum) {
lockOKMQSet.add(mq);
}
}
}
}
上面的代码挺多,主要是实现了两个关键点,分别是对本地mq 加锁,和对其他的broker server 获取锁,计算加锁成功的broker server是否过半,过半则成功,否则失败。
- 对本地message queue 加锁
看本broker server 的message queue 尝试获取锁,能加锁成功的条件是没有加锁的mq,或者已经加锁了,但是已经过期了,其他的都是被其他的客户端锁定中,关键代码如下:
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (groupValue != null) {
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
//检查clientid和是否过期
boolean locked = lockEntry.isLocked(clientId);
if (locked) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
- 分布式锁
分布式我们都知道需要通过zk,redis,consul等实现,但是rocketmq并没有这样做,个人理解是rocketmq 不想因为这个问题要依赖其他的外部组件,因为依赖一个组件你还要对依赖组件的稳定性,所以自己巧妙的实现了对所有broker server message queue 加锁时,应用了leader选举的思想,因为broker肯定是集群部署,不同的客户端同时发起顺序消费时,很有可能链接的不同的broker server,如果只对单broker server判断获取锁成功是有问题的,通过对所有的broker server都获取锁,如果有一半以上获取锁成功,则肯定是只有一个客户端能获取到锁,类似leader选举的思路,是值得学习的地方。
定期刷新锁
顺序消费的这个锁也是一个锁租约的机制,到了时间不续租,就释放了,所以broker分布式锁除了两看consumer的客户端id,还有一个时间的限制,如果客户端出现问题,没有主动更新锁的时间,则会被其他的客户端获取到锁,续租也有可能是和其他的客户端并发的,所以就有可能锁续租失败,失败了就不能消费这个message queue了,所以在消费的时候需要检查是否持有锁,更新是通过一个定时任务更新的,时间周期为20秒一次,通过rocketmq.client.rebalance.lockInterval
变量控制。
还有一个值得注意的是,一个topic有多个message queue,两个客户端同时发起顺序消费时,在获取分布式锁时,有可能两个分别获得部分mq的锁,rocketmq的顺序是保证在mq级别的。
分发消息
获取到对应message queue的锁后,就可以创建pullRequest请求到队列messageRequestQueue
中,这时候拉消息的线程就会被换醒,去拉消息,拉到消息后,会把消息缓存在一个treeMap中,这个和并发消费是一样的,添加到treeMap中,返回结果判断是否需要提交新的ConsumeRequest task,如果前面的消费任务已经消费完了,则会返回true,即需要提交新的ConsumeRequest,代码如下:
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
顺序消费在分发的时候,不像并发消费一样,默认一个请求提交一个ConsumeRequest task到线程执行,来实现并发消费。
顺序消费如果没有入在消费的判断,在把消息加入到processQueue时会判断有没有线程在消费,如果有,则不能提交消费任务,只有没有线程消费的时候,才创建一个ConsumeRequest task到线程池执行, 因为有提交一个任务后,会不断的从processQueue 的treemap 里获取message,如果获取不到了,才把consuming的标记设置为false,下次拉到消息时,就重新提交一个新的ConsumeRequest。
ConsumeRequest 的run 方法如下:
public void run() {
.....
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
//consumeBatchSize 默认是1,从tree map里取出一批消息,默认是一条消息
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
//.... hook partion
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//这里需要加锁,一定是等前面一条消息处理完后,才能继续消费下一条消息。
this.processQueue.getConsumeLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//执行业务的消费代码
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
} finally {
this.processQueue.getConsumeLock().unlock();
}
//去掉部分代码
long consumeRT = System.currentTimeMillis() - beginTimestamp;
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
代码有点多,省掉了部分非关键的代码,ConsumeRequest 的run 方法主要干了如下几件事情:
- 首先获取锁,这个锁是以message queue为单位的,就是为每个message queue 创建了一个object,通过对synchronized 对object 加锁,防止并发执行。
- 检查processqueue 是否还被锁住,就是前面说的,会定期更新锁,即续租成功,就还是locked,如果失败,则不能消费。
- 检查消费的时间,如果持续消费超过了1分钟,说明消费有瓶颈,则等10毫秒再继续消费。
- 取消息,从msgTreeMap里获取消息,默认是一次获取1条,这里还有对这条消息做了一个暂存,存在consumingMsgOrderlyTreeMap里面,是用来消费成功后,做commit offset的。
5.获取 processqueue的consumer lock,拿到锁后,即开始执行业务的消费代码,这里的锁不是很理解,顺序消费的task 同时只有一个线程在运行,前面已经对message queue加了一个大锁。
6.执行业务的消费代码,获取消费结果。
7.处理消费结果,如果成功的情况下,会更新本地的offset,这里不更新到broker server端,还是统一通过定时任务上报给broker server的。
总结时刻
本文对rocketmq 的顺序消费模式的代码撸了一遍,让我们了解了顺序消费背后的原理和逻辑,即是怎么保证客户端能顺序消费消息的,主要有下几点:
- 顺序消费时group级别对message queue保证有顺序。
- 开始消费message queue前需要获取分布式锁,这里和选举leader一样的思路,通过对集群的broker都获取锁,有一半获取成功就说明加锁成功。
- 顺序消费时拉到消息后,只提交一个ConsumeRequest任务,甚至有可能不提交,如果前面一个还在消费的情况下,通过一个ConsumeRequest来循环从msgTree里获取,默认一次取一条消息,来执行业务的消费代码,也就是单线程在执行,虽然是线程池。
- 每消费完一条消息,更新一次消费的offset。
注:目前看机会中,关注基础架构,中间件,高并发系统建设和治理。