RocketMQ 顺序消费源码分析

背景

rocketmq支持顺序消费,是很多业务中要用的一个场景,我就好奇他是怎么实现的,需要了解背后的原理,是怎么支持顺序消费的,这样有问题的时候我们才能快速的定位问题,这是一个合格的架构师必备的能力。

分配MessageQueue

rocketmq 在启动消费时,会对topic的mq进行reblance,如果是新分配的message queue,如果是顺序消费,即isorder为true。则需要先对该
message queue 获取分布式锁,获取成功才能真正开始消费,代码入心:

boolean allMQLocked = true;
        List<PullRequest> pullRequestList = new ArrayList<>();
        for (MessageQueue mq : mqSet) {
            if (!this.processQueueTable.containsKey(mq)) {
                //新分配的message queue 如果是顺序消费,需要先获取锁,获取成功
                //则创建messagequeue 开始拉起数据,否则不能消费给mq。
                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                    //如果获取失败,则不消费这个mq。
                    allMQLocked = false;
                    continue;
                }
                //如果是顺序消费,只有获取成功,才开始消费的准备工作。
                this.removeDirtyOffset(mq);
                ProcessQueue pq = createProcessQueue(topic);
                pq.setLocked(true);
                long nextOffset = this.computePullFromWhere(mq);
                if (nextOffset >= 0) {
                    ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                    if (pre != null) {
                        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                    } else {
                        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                        PullRequest pullRequest = new PullRequest();
                        pullRequest.setConsumerGroup(consumerGroup);
                        pullRequest.setNextOffset(nextOffset);
                        pullRequest.setMessageQueue(mq);
                        pullRequest.setProcessQueue(pq);
                        pullRequestList.add(pullRequest);
                        changed = true;
                    }
                } else {
                    log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                }
            }

        }

获取锁

获取锁的代码不需要看,我们只需要关心下请求参数即可,因为关键实现在broker端:

 LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.getMqSet().add(mq);

            try {
                Set<MessageQueue> lockedMq =
                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                for (MessageQueue mmqq : lockedMq) {
                    ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                    if (processQueue != null) {
                        processQueue.setLocked(true);
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                    }
                }

                boolean lockOK = lockedMq.contains(mq);
                log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
                return lockOK;
            } catch (Exception e) {
                log.error("lockBatchMQ exception, " + mq, e);
            }

顺序消费获取锁的代码可用看出,需要告诉broker端三个参数:

  • consumer group 消费分组。
  • 客户端id,即consumer的标识
  • mq,即message queue 是对那个queue的顺序消费。

请求类型是LOCK_BATCH_MQ,broker server 会用默认的processor来处理这个请。如果没有获取到锁,则lockedMq是空的,没有直,则返回false,所以接下来,我们看下服务端是怎么做的,来保证这个顺序消费。

Broker锁实现

broker server 处理LOCK_BATCH_MQ 的请求时通过defaultRequestProcessorPair来负责处理,defaultRequestProcessorPair是AdminBrokerProcessor,实现逻辑在lockBatchMQ方法,代码如下:

private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);

        Set<MessageQueue> lockOKMQSet = new HashSet<>();
        //根据group和mq,尝试对没有被其他consumer锁定会加锁,只有没有枷锁的messagequeue,或者其他的锁已经过期了,才能上锁。
        //selfLockOKMQSet 是成功获取锁的message queue
        Set<MessageQueue> selfLockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
            requestBody.getConsumerGroup(),
            requestBody.getMqSet(),
            requestBody.getClientId());
        //看是否要请求其他的server,客户端发起的时false,broker发起的是true
        if (requestBody.isOnlyThisBroker() || !brokerController.getBrokerConfig().isLockInStrictMode()) {
            lockOKMQSet = selfLockOKMQSet;
        } else {
            //设置OnlyThisBroker为true,让其他的server接到请求时不再请求其他的server了
            requestBody.setOnlyThisBroker(true);
            //获取副本数
            int replicaSize = this.brokerController.getMessageStoreConfig().getTotalReplicas();
            //计算过半quorum
            int quorum = replicaSize / 2 + 1;

            if (quorum <= 1) {
                //如果就一个,则不需要再请求其他的broker server
                lockOKMQSet = selfLockOKMQSet;
            } else {
                //有多个副本,对所有broker尝试加锁。
                final ConcurrentMap<MessageQueue, Integer> mqLockMap = new ConcurrentHashMap<>();
                //先对本地加锁的mq 标记为1
                for (MessageQueue mq : selfLockOKMQSet) {
                    if (!mqLockMap.containsKey(mq)) {
                        mqLockMap.put(mq, 0);
                    }
                    mqLockMap.put(mq, mqLockMap.get(mq) + 1);
                }

                BrokerMemberGroup memberGroup = this.brokerController.getBrokerMemberGroup();

                if (memberGroup != null) {
                    Map<Long, String> addrMap = new HashMap<>(memberGroup.getBrokerAddrs());
                    addrMap.remove(this.brokerController.getBrokerConfig().getBrokerId());
                    final CountDownLatch countDownLatch = new CountDownLatch(addrMap.size());
                    requestBody.setMqSet(selfLockOKMQSet);
                    requestBody.setOnlyThisBroker(true);
                    for (Long brokerId : addrMap.keySet()) {
                        try {
                            this.brokerController.getBrokerOuterAPI().lockBatchMQAsync(addrMap.get(brokerId),
                                requestBody, 1000, new LockCallback() {
                                    @Override
                                    public void onSuccess(Set<MessageQueue> lockOKMQSet) {
                                        for (MessageQueue mq : lockOKMQSet) {
                                            if (!mqLockMap.containsKey(mq)) {
                                                mqLockMap.put(mq, 0);
                                            }
                                            //加锁成功,对加锁次数加1
                                            mqLockMap.put(mq, mqLockMap.get(mq) + 1);
                                        }
                                        countDownLatch.countDown();
                                    }

                                    @Override
                                    public void onException(Throwable e) {
                                        LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
                                        countDownLatch.countDown();
                                    }
                                });
                        } catch (Exception e) {
                            LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
                            countDownLatch.countDown();
                        }
                    }
                    try {
                        countDownLatch.await(2000, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        LOGGER.warn("lockBatchMQ exception on {}, {}", this.brokerController.getBrokerConfig().getBrokerName(), e);
                    }
                }
                //计算哪些mq是成功实现过半加锁的,返回给客户端
                for (MessageQueue mq : mqLockMap.keySet()) {
                    if (mqLockMap.get(mq) >= quorum) {
                        lockOKMQSet.add(mq);
                    }
                }
            }
        }

上面的代码挺多,主要是实现了两个关键点,分别是对本地mq 加锁,和对其他的broker server 获取锁,计算加锁成功的broker server是否过半,过半则成功,否则失败。

  • 对本地message queue 加锁
    看本broker server 的message queue 尝试获取锁,能加锁成功的条件是没有加锁的mq,或者已经加锁了,但是已经过期了,其他的都是被其他的客户端锁定中,关键代码如下:
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
        ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
        if (groupValue != null) {
            LockEntry lockEntry = groupValue.get(mq);
            if (lockEntry != null) {
                //检查clientid和是否过期
                boolean locked = lockEntry.isLocked(clientId);
                if (locked) {
                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                }

                return locked;
            }
        }

        return false;
    }
  • 分布式锁
    分布式我们都知道需要通过zk,redis,consul等实现,但是rocketmq并没有这样做,个人理解是rocketmq 不想因为这个问题要依赖其他的外部组件,因为依赖一个组件你还要对依赖组件的稳定性,所以自己巧妙的实现了对所有broker server message queue 加锁时,应用了leader选举的思想,因为broker肯定是集群部署,不同的客户端同时发起顺序消费时,很有可能链接的不同的broker server,如果只对单broker server判断获取锁成功是有问题的,通过对所有的broker server都获取锁,如果有一半以上获取锁成功,则肯定是只有一个客户端能获取到锁,类似leader选举的思路,是值得学习的地方。

定期刷新锁

顺序消费的这个锁也是一个锁租约的机制,到了时间不续租,就释放了,所以broker分布式锁除了两看consumer的客户端id,还有一个时间的限制,如果客户端出现问题,没有主动更新锁的时间,则会被其他的客户端获取到锁,续租也有可能是和其他的客户端并发的,所以就有可能锁续租失败,失败了就不能消费这个message queue了,所以在消费的时候需要检查是否持有锁,更新是通过一个定时任务更新的,时间周期为20秒一次,通过rocketmq.client.rebalance.lockInterval 变量控制。

还有一个值得注意的是,一个topic有多个message queue,两个客户端同时发起顺序消费时,在获取分布式锁时,有可能两个分别获得部分mq的锁,rocketmq的顺序是保证在mq级别的。

分发消息

获取到对应message queue的锁后,就可以创建pullRequest请求到队列messageRequestQueue 中,这时候拉消息的线程就会被换醒,去拉消息,拉到消息后,会把消息缓存在一个treeMap中,这个和并发消费是一样的,添加到treeMap中,返回结果判断是否需要提交新的ConsumeRequest task,如果前面的消费任务已经消费完了,则会返回true,即需要提交新的ConsumeRequest,代码如下:

public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }

顺序消费在分发的时候,不像并发消费一样,默认一个请求提交一个ConsumeRequest task到线程执行,来实现并发消费。

顺序消费如果没有入在消费的判断,在把消息加入到processQueue时会判断有没有线程在消费,如果有,则不能提交消费任务,只有没有线程消费的时候,才创建一个ConsumeRequest task到线程池执行, 因为有提交一个任务后,会不断的从processQueue 的treemap 里获取message,如果获取不到了,才把consuming的标记设置为false,下次拉到消息时,就重新提交一个新的ConsumeRequest。

ConsumeRequest 的run 方法如下:

public void run() {
            
            .....
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
                    final long beginTime = System.currentTimeMillis();
                    for (boolean continueConsume = true; continueConsume; ) {
                        if (this.processQueue.isDropped()) {
                            log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                            break;
                        }

                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && !this.processQueue.isLocked()) {
                            log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                            && this.processQueue.isLockExpired()) {
                            log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                            ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                            break;
                        }

                        long interval = System.currentTimeMillis() - beginTime;
                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                            break;
                        }
                        //consumeBatchSize 默认是1,从tree map里取出一批消息,默认是一条消息
                        final int consumeBatchSize =
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                        List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                        if (!msgs.isEmpty()) {
                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                            ConsumeOrderlyStatus status = null;

                            ConsumeMessageContext consumeMessageContext = null;
                                                  
                            //.... hook partion
                            long beginTimestamp = System.currentTimeMillis();
                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                            boolean hasException = false;
                            try {
                                //这里需要加锁,一定是等前面一条消息处理完后,才能继续消费下一条消息。
                                this.processQueue.getConsumeLock().lock();
                                if (this.processQueue.isDropped()) {
                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                        this.messageQueue);
                                    break;
                                }
                                //执行业务的消费代码
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                                log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                                    UtilAll.exceptionSimpleDesc(e),
                                    ConsumeMessageOrderlyService.this.consumerGroup,
                                    msgs,
                                    messageQueue), e);
                                hasException = true;
                            } finally {
                                this.processQueue.getConsumeLock().unlock();
                            }

                            //去掉部分代码
                            long consumeRT = System.currentTimeMillis() - beginTimestamp;
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } else {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        return;
                    }

                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
                }
            }
        }

代码有点多,省掉了部分非关键的代码,ConsumeRequest 的run 方法主要干了如下几件事情:

    1. 首先获取锁,这个锁是以message queue为单位的,就是为每个message queue 创建了一个object,通过对synchronized 对object 加锁,防止并发执行。
    1. 检查processqueue 是否还被锁住,就是前面说的,会定期更新锁,即续租成功,就还是locked,如果失败,则不能消费。
    1. 检查消费的时间,如果持续消费超过了1分钟,说明消费有瓶颈,则等10毫秒再继续消费。
    1. 取消息,从msgTreeMap里获取消息,默认是一次获取1条,这里还有对这条消息做了一个暂存,存在consumingMsgOrderlyTreeMap里面,是用来消费成功后,做commit offset的。
  • 5.获取 processqueue的consumer lock,拿到锁后,即开始执行业务的消费代码,这里的锁不是很理解,顺序消费的task 同时只有一个线程在运行,前面已经对message queue加了一个大锁。

  • 6.执行业务的消费代码,获取消费结果。

  • 7.处理消费结果,如果成功的情况下,会更新本地的offset,这里不更新到broker server端,还是统一通过定时任务上报给broker server的。

总结时刻

本文对rocketmq 的顺序消费模式的代码撸了一遍,让我们了解了顺序消费背后的原理和逻辑,即是怎么保证客户端能顺序消费消息的,主要有下几点:

  1. 顺序消费时group级别对message queue保证有顺序。
  2. 开始消费message queue前需要获取分布式锁,这里和选举leader一样的思路,通过对集群的broker都获取锁,有一半获取成功就说明加锁成功。
  3. 顺序消费时拉到消息后,只提交一个ConsumeRequest任务,甚至有可能不提交,如果前面一个还在消费的情况下,通过一个ConsumeRequest来循环从msgTree里获取,默认一次取一条消息,来执行业务的消费代码,也就是单线程在执行,虽然是线程池。
  4. 每消费完一条消息,更新一次消费的offset。

注:目前看机会中,关注基础架构,中间件,高并发系统建设和治理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容