rocket mq 底层存储源码分析(6)-存储恢复

本章节,我们主要从rmq的broker启动后,会如何初始化【业务消息】、【逻辑位移索引】以及【key查询索引】这三种消息来分析底层实现。


直接上源码,存储相关的初始化的入口:

    public boolean load() {
        boolean result = true;

        try {
            //step1
            //通过abort文件判断上次rmq上次退出是否正常,如果正常退出,就可以把abort文件删除,即不存在。
            boolean lastExitOK = !this.isTempFileExist();
            log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");

            ...

            //step2
            // load Commit Log,加载消息存储映射文件
            result = result && this.commitLog.load();

            // load Consume Queue,加载逻辑消费队列
            result = result && this.loadConsumeQueue();

            if (result) {
                //step3
                this.storeCheckpoint =
                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
                
                //step4
                //加载索引文件
                this.indexService.load(lastExitOK);

                //step5
                this.recover(lastExitOK);

            }
        }

        ...

        return result;
    }

我们先来梳理一下上述的加载流程:
step1,通过abort文件判断上次rmq上次退出是否正常,如果正常退出,就可以把abort文件删除,即不存在。否则,我们需要通过快照方式恢复这些存储文件。

step2,我们依次初始化业务消息存储文件(commitLog.load())以及逻辑位移存储文件(this.loadConsumeQueue())对应的连续映射文件抽象。

step3,加载存储快照,里面存储上述三类存储文件的最后一次完整刷盘的时间戳。里面有三个long类型的字段属性,分别是physicMsgTimestamp业务消息存储文件最后一次完整刷盘时间戳,logicsMsgTimestamp逻辑位移存储文件最后一次完整刷盘时间戳 以及 indexMsgTimestampkey查询索引存储文件最后一次完整刷盘时间戳。

step4,加载key查询索引存储文件。这里有可能使用快照方式。结合步骤三,我们先大致说一下使用快照方式的流程。这里以【key查询索引】存储文件为例。【key查询索引】d刷盘形式为,直到上一个indexFile 满了以后,才开线程异步刷盘。每次【key查询索引】存储文件刷盘后,同时会更新 indexMsgTimestamp属性。然后,在某个【key查询索引】存储文件刷盘的时间点上,broker突然宕机了,那么在重启恢复时,就会根据存储文件的文件头属性endTimestamp(该属性是每构建一条索引时,均会更新)与存储快照属性indexMsgTimestamp作比较,把文件头属性endTimestamp大于indexMsgTimestamp的文件全都删除。

step5,加载部分业务消息以及逻辑位移索引值pagecache中。

上述的步骤1-4都比较简单,读者可以自行根据描述自行分析,我们重点分析一下step5:

    private void recover(final boolean lastExitOK) {
        //step1.加载 逻辑物理位至pagecache
        this.recoverConsumeQueue();

        //step2.加载业务消息至pagecache
        if (lastExitOK) {
            this.commitLog.recoverNormally();
        } else {
            this.commitLog.recoverAbnormally();
        }

        //step3.加载TopicQueueTable到内存中
        this.recoverTopicQueueTable();
    }

接下来,我们将从三个步骤分析recover(...)

1、加载 逻辑物理位至pagecache

2、加载业务消息至pagecache

3、加载TopicQueueTable到内存中

1、加载 逻辑物理位至pagecache

    private void recoverConsumeQueue() {
        //遍历所有topic
        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            //遍历所有queueId
            for (ConsumeQueue logic : maps.values()) {
                logic.recover();
            }
        }
    }

这里主要是遍历所有的topic-queueId 为维度下,加载对应的业务视图消费队列ConsumeQueue,我们接着进入logic.recover()

    public void recover() {
        //h获取逻辑位移连续映射文件抽象
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            
            //这里说明初始化时,最多只加载最新三个文件内容至pagecache中
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;

            int mappedFileSizeLogics = this.mappedFileSize;
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                    //读取一条完整的 逻辑位移索引
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();
                      
                    //这里只需要保证读完整一条
                    if (offset >= 0 && size > 0) {
                        //更新单个文件的处理位置
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        //基于消息时递增的,因此每次读取的offset均可以认为是最大的物理位移
                        this.maxPhysicOffset = offset;
                    } else {
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                        break;
                    }
                }


                if (mappedFileOffset == mappedFileSizeLogics) {
                    //代码走到这里,说明这个索引文件已处理完,需滚到到下一个文件。
                    index++;
                    if (index >= mappedFiles.size()) {

                        log.info("recover last consume queue file over, last maped file "
                            + mappedFile.getFileName());
                        break;
                    } else {
                        //继续获取下一个逻辑位移索引映射文件
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        //更新方法内全局处理位置
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    break;
                }
            }

            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
        }

总结一下上述代码段逻辑:
总的来说,就是最多只加载最新三个文件的逻辑位移内容至pagecache中,遍历这三个文件,然后逐条读取 【逻辑位移索引】字节内容。

其中,while(true)循环就是逐条读取 【逻辑位移索引】。

这里,仅仅通过if (offset >= 0 && size > 0)这个判断,即可认为这条【逻辑位移索引】是完整的?我认为该判断是有误的。首先,我们知道直接内存映射的刷盘操作并不是原子性,如果在刷盘过程中,机子突然宕机了,就无法将pagecache中的字节内容完整的保存在磁盘中。另外,在broker启动过程中,是对整个【逻辑位移索引】的存储文件做直接内存映射,换言之,对于没写入内容的空白存储部分,例如执行byteBuffer.getLong(),读取pagecache中下一个offset(8字节,64位),但出于宕机的原因,只将offset前面63位存放到磁盘中,因此,会以‘0’作为补充位 来补充缺失的字段。这样一来,byteBuffer.getLong()操作读取到的offset即使是大于0,也可能是不准确的。但是,作为offset的相邻存储字段size ,如果size > 0,虽然不能保证读取到的size是完整的字节内容,却可以保证offset字段是完整的。因此,如果通过该判断仅仅能保证offset是正确的,去无法保证sizetagsCode字段正确。

假设,读取到一条正确的【逻辑位移索引】后,由于业务消息的物理存储位移是递增的,因此每次从【逻辑位移索引】读取的offset字段均可以认为是最大的物理位移,可以直接更新至ConsumeQueue的业务消息最大物理位移实例属性this.maxPhysicOffset

而循环内的mappedFileOffset表明一个存储文件的当前处理位置,每读取一条【逻辑位移索引】,均加上20,对应代码mappedFileOffset = i + CQ_STORE_UNIT_SIZE 。当mappedFileOffset到达存储文件的尾部是,即 if (mappedFileOffset == mappedFileSizeLogics),则说明这个索引文件已处理完,需滚到到下一个文件。

processOffset代表连续存储索引文件的全局处理位置,因此,每次处理完一个完整的存储文件时,均需要 processOffset = mappedFile.getFileFromOffset()更新全局处理位置。

当循环结束后,更新最后一个文件的处理位置processOffset += mappedFileOffset

当然,为了容错处理,还需要清除一些脏数据,我们直接分析this.mappedFileQueue.truncateDirtyFiles(processOffset)

    public void truncateDirtyFiles(long offset) {
        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();

        for (MappedFile file : this.mappedFiles) {
             //获取文件尾部的物理位移
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
            if (fileTailOffset > offset) {
                if (offset >= file.getFileFromOffset()) {
                    //校正文件的缓存读写位置
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {
                    //代码走到这里,说明本文件是需要删除的脏文件。
                    file.destroy(1000);
                    willRemoveFiles.add(file);
                }
            }
        }

        this.deleteExpiredFile(willRemoveFiles);
    }

先说一下入参offset,其含义是目前最大的【逻辑位移索引】储物理位移。清除脏文件的逻辑如下,遍历目前所有的【逻辑位移索引】存储文件,然后比较offset与文件尾部的存储物理位移,如果offset大,则认为该文件正常;如果小于,则继续与文件开始的存储物理位移(file.getFileFromOffset()),如果offset还比文件开始的存储物理位移还要小,则说明该文件可以删除了;否则,校正文件的缓存读写位置。
到这里就分析完【逻辑位移索引】载入pagecache流程


2、加载业务消息至pagecache

这一步,我们在分两种情况去分析,一种是正常的加载业务消息的方式,另一种是通过快照方式加载。

首先,如果标志位lastExitOK位true,说明该broker可以正常退出,则按正常方式,即 this.commitLog.recoverNormally()

   public void recoverNormally() {
        //default :true
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Began to recover from the last third file
            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;

            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();
                // Normal data
                if (dispatchRequest.isSuccess() && size > 0) {
                    mappedFileOffset += size;
                }
                // Come the end of the file, switch to the next file Since the
                // return 0 representatives met last hole,
                // this can not be included in truncate offset
                else if (dispatchRequest.isSuccess() && size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // Current branch can not happen
                        log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
                // Intermediate file read error
                else if (!dispatchRequest.isSuccess()) {
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                }
            }

            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
        }
    }

上述的恢复流程与处理方式基本与【逻辑位移索引】载入pagecache的流程一致,都是连续加载最多3个最新的存储文件内容,只不过,加载的内容则是具体的业务消息字节内容而已,即DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)。之前在构建【逻辑位移索引】有分析过this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)这个方法,这里,即把业务消息字节按照存储格式加载至pagecache中,只不过这里需要对内容进行crc检测。

我们在来看看通过快照形式加载,即进入this.commitLog.recoverAbnormally()

    public void recoverAbnormally() {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Looking beginning to recover from which file
            //step1、判断从哪个映射文件开始恢复,即从指定的开始文件到结束都需要通过快照恢复
            int index = mappedFiles.size() - 1;
            MappedFile mappedFile = null;
            for (; index >= 0; index--) {
                mappedFile = mappedFiles.get(index);
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this maped file " + mappedFile.getFileName());
                    break;
                }
            }

            if (index < 0) {
                index = 0;
                mappedFile = mappedFiles.get(index);
            }

            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            while (true) {
                //step2、读取一条完整的消息
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();

                // Normal data
                if (size > 0) { //说明该条消息是一条完整的消息
                    mappedFileOffset += size;

                     //
                    //尝试去构建索引,因为有可能索引已存在,如果存在,则跳过。
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                } else if (size == -1) {
                    //代码走到这里,说明读取到了一条不完整的消息
                    log.info("recover physics file end, " + mappedFile.getFileName());
                    break;
                } else if (size == 0) {
                    //滚动到下一个待处理的文件
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last maped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            }

            //step3、
            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);
            //Clear ConsumeQueue redundant data(清除多余的逻辑位移索引数据)
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        } else {
            //代码走到这里,说明所有的业务文件都删除了,因此,对应的位移索引文件也需要
            //全部清空
            this.mappedFileQueue.setFlushedWhere(0);
            this.mappedFileQueue.setCommittedWhere(0);
            this.defaultMessageStore.destroyLogics();
        }
   }

分析一下上述代码流程。可以看出,不管是业务视图消费队列ConsumeQueue的恢复,或是【逻辑位移索引】恢复,还是业务消息的正常加载,他们的处理流程基本上都一致,而业务消息的非正常加载,也是采取了相同的处理流程。

对于业务消息的非正常加载,step1中先找出需要进行快照恢复的业务消息存储映射文件开始索引位置(List 中的index),然后遍历所有大于或等于该索引的业务消息存储映射文件(只有最后一个内存映射文件所存储的消息才有可能不完整)。

step2滚动读取映射文件中的每条消息,在判断返回的读取字节大小size,如果size大于0,说明该条消息是一条完整的消息,然后尝试去构建索引,因为有可能索引已存在,如果存在,则跳过。如果size等于-1,说明读取到了一条不完整的消息,进一步说明,从上一条完整的消息的存储物理位移的尾部开始,往后的存储字节内容我们均可认为是无效的,因此可以重写这部分的存储。如果size等于0,这完成遍历,跳出循环,否则,滚动到下一个映射文件。

step3中,processOffset记录最大的完整业务消息存储物理位移,再更新全局的pagecache读写位移(mappedFileQueue.setFlushedWhere(processOffset)以及mappedFileQueue.setCommittedWhere(processOffset))。
mappedFileQueue.truncateDirtyFiles(processOffset)则是清除脏数据文件,清除逻辑这里在总结一下,如果映射文件的存储起始位置大于processOffset,则直接删除该文件。如果processOffset位于映射文件的起始位置以及结束位置之间,则直接更新本文件的读写位置,值同样为processOffset。最后的this.defaultMessageStore.truncateDirtyLogicFiles(processOffset)则是根据processOffset,清除那些与之对应的业务消息存储物理位移大于processOffset的逻辑位移索引。由于逻辑比较简单,这里就不在详细展开,有兴趣的读者可以自行分析。

这里,我们再进一步分析分第一步中,是如何断从哪个映射文件开始恢复,跟进this.isMappedFileMatchedRecover(mappedFile)

    private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        
        //读取映射文件中,第一条业务消息的的存储时间
        int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
        if (magicCode != MESSAGE_MAGIC_CODE) {
            return false;
        }
        long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
        if (0 == storeTimestamp) {
            return false;
        }

        //根据条件,消息存储时间与快照时间比较,决定改文件是否正常恢复
        if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
            && this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
                log.info("find check timestamp, {} {}", //
                    storeTimestamp, //
                    UtilAll.timeMillisToHumanString(storeTimestamp));
                return true;
            }
        } else {
            if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
                log.info("find check timestamp, {} {}", //
                    storeTimestamp, //
                    UtilAll.timeMillisToHumanString(storeTimestamp));
                return true;
            }
        }

        return false;
    }

根据上述代码逻辑,首先,获取本映射文件的第一条存储消息的存储时间戳(即写入pagecache的时间刻度)storeTimestamp。这里,我们假设允许【key查询索引】开启,即this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()为true,并且this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()也为true(要求【key查询索引】可以安全查询),则storeTimestampthis.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()比较,在来看看getMinTimestampIndex()

    public long getMinTimestampIndex() {
        return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
    }

    public long getMinTimestamp() {
        long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);

        // fixed https://github.org/apache/rocketmqissues/467
        min -= 1000 * 3;
        if (min < 0)
            min = 0;

        return min;

很明显,getMinTimestampIndex()返回的是indexMsgTimestampphysicMsgTimestamp以及logicsMsgTimestamp三者中的最小值。

其中,
physicMsgTimestamp代表最后一条刷盘成功的【业务消息】的storeTimestamp(写入pagecache中的时间戳);
logicsMsgTimestamp代表最后一条刷盘成功的【逻辑位移索引】所对应【业务消息】的storeTimestamp
indexMsgTimestamp代表最后一条刷盘成功的【key查询索引】所对应【业务消息】的storeTimestamp

换言之,如果storeTimestamp小于getMinTimestampIndex(),方法isMappedFileMatchedRecover(...)则返回true否则返回false。我们在来结合外层的非正常恢复方法recoverAbnormally()

    public void recoverAbnormally() {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Looking beginning to recover from which file
            //step1、判断从哪个映射文件开始恢复,即从指定的开始文件到结束都需要通过快照恢复
            int index = mappedFiles.size() - 1;
            MappedFile mappedFile = null;
            for (; index >= 0; index--) {
                mappedFile = mappedFiles.get(index);
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this maped file " + mappedFile.getFileName());
                    break;
                }
            }

      ...
     }

for (; index >= 0; index--)循环,我们可以看出是从this.mappedFileQueue.getMappedFiles()(连续存储文件集合)的尾部开始遍历,如果this.isMappedFileMatchedRecover(mappedFile)为true,则跳出循环,换言之,index记录的是开始恢复的索引位置。如何理解结束循环的条件是storeTimestampindexMsgTimestampphysicMsgTimestamp以及logicsMsgTimestamp三者中的最小值比较,而不是storeTimestampphysicMsgTimestamp比较??

原因如下:说原因之前,我们先来分析一个业务场景。假设broker第一次启动并始化,之前没有接受过任何producer的消息发送请求,并且消息时同步刷盘。突然间,并发上来了,一下子写满了两个文件,现在在写第三个文件,并且是该文件的第三个消息,还没刷盘,由于消息时同步刷盘的,因此我们可以知道前两个文件以及第三个文件的前两个消息均能完整落地。另一方面,根据之前的章节中,我们知道【逻辑位移索引】以及【key查询索引】的构建以及刷盘均有单独的线程异步进行的。在我们假设的场景中,【逻辑位移索引】以及【key查询索引】都没有对应的线程都没有进行刷盘。就在这时,机器突然宕机了,最后一个消息至刷盘了一半。因此,我们在按照非正常的恢复流程中,则需要根据storeTimestampindexMsgTimestampphysicMsgTimestamp以及logicsMsgTimestamp三者中的最小值比较,找出最早那条已经构建索引失败的业务消息。并根据这条业务消息所在的映射文件,滚动构建这条消息以及之后存储的消息的索引。对于最后一个文件的第三条消息,由于无法完整刷盘,因此,需要截掉这部分不完整的消息。

到这里,我们已近分析完业务消息正常或非正常的恢复流程。


3、加载TopicQueueTable到内存中

我们先来看看CommitLog的实例属性topicQueueTable

private HashMap<String/* topic-queueid /, Long/ offset */> topicQueueTable = new HashMap<String, Long>(1024);

从数据结构中我们可以看出,它是一个HashMap, key为topic-queueId,value为消息位移。我们可以通过topic-queueId ,确定一条队列目前最大的可消费位移。这里简单在说一下,通过offset * CQ_STORE_UNIT_SIZE(【逻辑位移索引大小】),可以算出具体的【逻辑位移索引】的物理存储位移,然后在通过【逻辑位移索引】,查询出具体的业务消息内容。

我们在来分析是如何加载TopicQueueTable到内存中recoverTopicQueueTable()

    private void recoverTopicQueueTable() {
        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
        //获取未删除,有效并且最旧的消息的开始位置
        long minPhyOffset = this.commitLog.getMinOffset();
        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                String key = logic.getTopic() + "-" + logic.getQueueId();
                //logic.getMaxOffsetInQueue():这里获取当前队列最大的逻辑位移(即已经持久化的最大的位置索引消息)
                table.put(key, logic.getMaxOffsetInQueue());
                logic.correctMinOffset(minPhyOffset);
            }
        }

        this.commitLog.setTopicQueueTable(table);
    }

分析一下上述流程:在步骤一 recoverConsumeQueue()中,我们已近分析了consumeQueueTable的加载流程。而TopicQueueTable主要还是依据consumeQueueTable来加载。

从代码中,我们可以看出,遍历所有的ConsumeQueue,即可得到 对应的最大消费位移logic.getMaxOffsetInQueue()

    public long getMaxOffsetInQueue() {
        return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
    }

    public long getMaxOffset() {
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
            return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
        }
        return 0;
    }

其中, getMaxOffset()的语义为返回最后一个映射文件的最新可读位置,换言之,即最后一条存储内容的物理位移,最后在通过`this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE即可算出最大消费位移。

接着是logic.correctMinOffset(minPhyOffset),该方法主要是校正ConsumeQueue的实例属性minLogicOffset(目前该队列可消费最小【逻辑位移索引】的 物理位移),之前已近分析过该方法,这里就不在详细展开。

到这里,TopicQueueTable的加载流程全部分析完


以上就是存储恢复的底层实现细节。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352

推荐阅读更多精彩内容