RocketMQ源码解析(九)-Broker#消息存储ConsumeQueue

ConsumeQueue的作用

上一篇文章讲到Broker在收到消息后,通过MessageStore将消息存储到commitLog中,但是consumer在消费消息的时候是按照topic+queue的维度来拉取消息的。为了方便读取,MessageStoreCommitLog中消息的offset按照topic+queueId划分后,存储到不同的文件中,这就是ConsumeQueue

文件组织方式

回顾一下数据结构图中ConsumeQueue相关的部分。

ConsumeQueue存储结构

底层储存跟CommitLog一样使用MappedFile,每个CQUnit的大小是固定的,存储了消息的offset、消息size和tagCode。存tag是为了在consumer取到消息offset后时候先根据tag做一次过滤,剩下的才需要到CommitLog中取消息详情。
之前讲过,MessageStore通过ReputMessageService来将消息的offset写道ConsumeQueue中,我们看下这部分代码实现

ReputMessageService

这个Service是一个单线程的任务,一直循环的调用doReput()方法:

        private boolean isCommitLogAvailable() {
            return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
        }

        private void doReput() {
            //1、判断commitLog的maxOffset是否比上次读取的offset大
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }
                //2、从上次的结束offset开始读取commitLog文件中的消息
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            //3、检查message数据完整性并封装成DispatchRequest
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getMsgSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    //4、分发消息到CommitLogDispatcher,1)构建索引; 2)更新consumeQueue
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);
                                    //5、分发消息到MessageArrivingListener,唤醒等待的PullReqeust接收消息,Only Master?
                                    if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                        && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                        DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                            dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                            dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                            dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                    }
                                    //5、更新offset
                                    this.reputFromOffset += size;
                                    readSize += size;
                                    if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                        DefaultMessageStore.this.storeStatsService
                                            .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                            .addAndGet(dispatchRequest.getMsgSize());
                                    }
                                } else if (size == 0) {
                                    //6、如果读到文件结尾,则切换到新文件
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {
                                //7、解析消息出错,跳过。commitLog文件中消息数据损坏的情况下才会进来
                                if (size > 0) {
                                    log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                    this.reputFromOffset += size;
                                } else {
                                    doNext = false;
                                    if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                        log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                            this.reputFromOffset);

                                        this.reputFromOffset += result.getSize() - readSize;
                                    }
                                }
                            }
                        }
                    } finally {
                        //8、release对MappedFile的引用
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }
    /**
     * 消息分发
     */
    public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);
        }
    }
  • 第1步,每次处理完读取消息后,都将当前已经处理的最大offset记录下来,下次处理从这个offset开始读取消息
  • 第2步,从commitLog文件中读取消息详情
  • 第4步,分发读取到的消息,MessageStore在初始化的时候会往dispatcherList中添加两个Dispatcher.
this.dispatcherList = new LinkedList<>();
//consumeQueue构建Dispatcher
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
//索引更新Dispatcher
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

具体Dispatcher的处理逻辑,我们下面详细说

  • 第8步,在通过commitLog读取消息时,不会把消息数据复制到堆内存中,只是返回文件映射的byteBuffer,所以MappedFile记录了有多少个引用,在数据使用完后需要释放。

Dispatcher构建ConsumeQueue

CommitLogDispatcherBuildConsumeQueue实现比较简单,直接调用的MessageStore的接口

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                /** 对于非事务消息和commit事务消息 */
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }

MessageStore中的实现:

    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        //找到对应的ComsumeQueue文件
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        cq.putMessagePositionInfoWrapper(dispatchRequest);
    }

前面已经讲过consumeQueue的数据存储结构,每个topic+queueId对应一个ConsumeQueue,每个ConsumeQueue包含一系列MappedFile。所以,这里第一步就是获取对应的ConsumeQueue,如果不存在的话就会新建一个。后面就是调用CQ的put方法:

public void putMessagePositionInfoWrapper(DispatchRequest request) {
        //1、写入重试次数,最多30次
        final int maxRetries = 30; 
        //2、判断CQ是否是可写的
        boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
        for (int i = 0; i < maxRetries && canWrite; i++) {
            long tagsCode = request.getTagsCode();
            if (isExtWriteEnable()) {
                //3、如果需要写ext文件,则将消息的tagscode写入
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                cqExtUnit.setFilterBitMap(request.getBitMap());
                cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                cqExtUnit.setTagsCode(request.getTagsCode());

                long extAddr = this.consumeQueueExt.put(cqExtUnit);
                if (isExtAddr(extAddr)) {
                    tagsCode = extAddr;
                } else {
                    log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                        topic, queueId, request.getCommitLogOffset());
                }
            }
            //4、写入文件
            boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
            if (result) {
                //5、记录check point
                this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
                return;
            } else {
                ...
                ...
            }
        }

       ...
        this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
    }
  • 第3步,将tagcode和bitMap记录进CQExt文件中,这个是一个过滤的扩展功能,采用的bloom过滤器先记录消息的bitMap,这样consumer来读取消息时先通过bloom过滤器判断是否有符合过滤条件的消息
  • 第4步,将消息offset写入CQ文件中,这边代码如下:
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {

        if (offset <= this.maxPhysicOffset) {
            return true;
        }
        //一个CQUnit的大小是固定的20字节
        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
        //获取最后一个MappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {
            //对新创建的文件,写将所有CQUnit初始化0值
            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }

            if (cqOffset != 0) {
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }

                if (expectLogicOffset != currentLogicOffset) {
                    LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                    );
                }
            }
            this.maxPhysicOffset = offset;
            //CQUnit写入文件中
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }

写文件的逻辑和写CommitLog的逻辑是一样的,首先封装一个CQUnit,这里面offset占8个字节,消息size占用4个字节,tagcode占用8个字节。然后找最后一个MappedFile,对于新建的文件,会有一个预热的动作,写把所有CQUnit初始化成0值。最后将Unit写入到文件中。

总结

ConsumeQueue文件数据生成的整个步骤就讲到这里了。Consumer来读取文件的时候,只要指定要读的topic和queueId,以及开始offset。因为每个CQUnit的大小是固定的,所以很容易就可以在文件中定位到。找到开始的位置后,只需要连续读取后面指定数量的Unit,然后根据Unit中存的CommitLog的offset就可以到CommitLog中读取消息详情了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容