Broker启动历史数据加载

概览

Broker启动需要知道commitlog上次的commitWhere、flushWhere上次的位置,启动流程加载commitlog、consumequeue、index、abort、checkpoint文件是为了获取这些值,为接受即将到来的消息做准备。

由于RocketMQ存储首先将消息全量存储在commitlog文件中,然后异步生成转发请求更新consumequeue、index索引文件。如果消息成功存储到commitlog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,导致commitlog、consumequeue、indexfile文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在commitlog文件中存在,但由于并没有转发到consumequeue,这部分消息将永远不会被消费者消费。RocketMQ是如何使commitlog、consumequeue达到最终一致性的呢?

加载过程

先从DefaultMessageStore这个消息入口说起,首先加载commitlog、consumequeue文件对应的mappedFile对象,做好文件映射,还有indexFile对象的加载和IndexHedaer文件加载到内存。然后再根据abort、checkpoint文件,再去加载恢复最近的mappedFile的文件,确定commitWhere、flushWhere等上次写入消息的值,恢复上次加载时的模样,准备再次接受消息,存储消息到磁盘。

  1. 首先它会加载abort文件,abort文件在项目启动的时候创建,项目正常结束删除,项目异常关闭的时候保留,从而判断项目是否正常结束。
    在退出是通过主次JVM钩子函数删除abort文件。

  2. 然后是把commitlog文件下的mappedFile对象加载到内存,并做好MappedFile文件内存到磁盘的映射,此时mappedFile文件的数据并没有从磁盘加载到内存。

  3. 加载 consumequeue 文件夹下对应文件的ConsumeQueue对象,只是创建了对应的ConsumeQueue对象,做好了文件映射,并没有从磁盘加载文件。

  4. 加载checkpoint文件,这个是真正加载文件了。进行它上次写入消息的commitlog文件刷盘时间点、消息的消费队列文件刷盘时间点、索引文件刷盘时间点的对比,找到正确的写入消息的各个时间,进行消息是否正确处理的判断。

  5. 加载Index索引文件对应的IndexFile对象,做好了文件映射,实际是只加载了IndexHeader汇总信息,建立好了mmap磁盘映射。

  6. 数据恢复

(1) 先恢复 consumequeue 文件,把不符合的 consumequeue 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。

(2) 如果 abort文件存在,此时找到第一个正常的 commitlog 文件,然后对该文件重新进行转发,依次更新 consumequeue,index文件。然后在恢复次commitlog文件

/**
 * 项目启动加载 commitlog、consumequeue、index、checkpoint、abort 文件及文件夹下历史数据
 * @throws IOException
 */
public boolean load() {
    boolean result = true;

    try {
        // ${RocketMQ_HOME}/store/abort文件是否存在,存在:异常退出;不存在:正常退出。
        // false:代表异常退出,true:代表正常退出
        boolean lastExitOK = !this.isTempFileExist();
        log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
        // 处理延迟定时消息,加载
        if (null != scheduleMessageService) {
            result = result && this.scheduleMessageService.load();
        }

        // load Commit Log
        // 项目启动,加载 commitlog 文件夹下对应文件的mappedFile对象,只是创建了对应的MappedFile,做好了文件映射,并没有从磁盘加载文件
        result = result && this.commitLog.load();

        // load Consume Queue
        // 加载 consumequeue 文件夹下对应文件的ConsumeQueue对象,只是创建了对应的ConsumeQueue对象,做好了文件映射,并没有从磁盘加载文件
        result = result && this.loadConsumeQueue();

        if (result) {
            // 加载checkpoint文件,这个是真正加载文件了
            this.storeCheckpoint =
                new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
            // 加载Index索引文件对应的IndexFile对象,做好了文件映射,实际是只加载了IndexHeader汇总信息,建立好了mmap磁盘映射
            this.indexService.load(lastExitOK);
            // 数据恢复
            // 1. 先恢复 consumequeue 文件,把不符合的 consumequeue 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。
            // 2. 如果 abort文件存在,此时找到第一个正常的 commitlog 文件,然后对该文件重新进行转发,依次更新 consumequeue,index文件。然后在恢复次commitlog文件。
            this.recover(lastExitOK);

            log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
        }
    } catch (Exception e) {
        log.error("load exception", e);
        result = false;
    }

    if (!result) {
        this.allocateMappedFileService.shutdown();
    }

    return result;
}

数据恢复

这里详细讲一下数据恢复的流程:

  1. 先恢复 consumequeue 文件,把不符合的 consumequeue 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。

  2. 如果 abort文件存在,此时找到第一个正常的 commitlog 文件,然后对该文件重新进行转发,依次更新 consumequeue,index文件,然后在恢复次commitlog文件。

  3. 设置consumeQueueTable数据表中的topic-queueId的consumequeue中最大的mappedFile对应的offset。

consumequeue的恢复

把不符合的 consueme 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。
最终目的:加载磁盘中的数据到MappedFile,并更新mappedFileQueue中的commitWhere、flushWhere值,并删除错误的mappedFile文件。

/**
 * 恢复 consumequeue 文件,把不符合的 consume 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。
 * @return
 */
private long recoverConsumeQueue() {
    long maxPhysicOffset = -1;
    // queueId-> ConsumeQueue;
    // 对应的consumequeue对象遍历恢复
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            //一个topic下的一个queueId,恢复一次consumequeue
            logic.recover();
            // consumequeue文件的最大的offset
            if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
                // 赋值maxPhysicOffset = 当前文件最大的maxPhysicOffset,供下次比较
                maxPhysicOffset = logic.getMaxPhysicOffset();
            }
        }
    }

    return maxPhysicOffset;
}

public void recover() {
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // 从倒数第三个文件开始恢复,这是一个经验吧,从倒数第三个consumequeue来恢复数据,不够3个从第0个开始
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;
        // 逻辑文件大小
        int mappedFileSizeLogics = this.mappedFileSize;
        // 获取mappedFile文件
        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        // 为commitlog已经确认的偏移量,为commitlog需要恢复的偏移量
        long processOffset = mappedFile.getFileFromOffset();
        // 为当前文件检验已经通过的offset
        long mappedFileOffset = 0;
        long maxExtAddr = 1;
        // while循环,接连恢复3个文件,结束后跳出,while循环
        while (true) {
            for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                long offset = byteBuffer.getLong();
                int size = byteBuffer.getInt();
                // tag的hashcode
                long tagsCode = byteBuffer.getLong();

                if (offset >= 0 && size > 0) {
                    mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                    // 最大的恢复的byte位置
                    this.maxPhysicOffset = offset + size;
                    // 处理ConsumeQueueExt扩展
                    if (isExtAddr(tagsCode)) {
                        maxExtAddr = tagsCode;
                    }
                } else {
                    log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                        + offset + " " + size + " " + tagsCode);
                    break;
                }
            }
            // 这个文件没有问题,检测下一个文件,未下一个文件检测做准备,继续在while里面循环
            if (mappedFileOffset == mappedFileSizeLogics) {
                // 一下文件的下标
                index++;
                // 大于最大文件数
                if (index >= mappedFiles.size()) {

                    log.info("recover last consume queue file over, last mapped file "
                        + mappedFile.getFileName());
                    break;
                } else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next consume queue file, " + mappedFile.getFileName());
                }
            } else {
                log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                    + (processOffset + mappedFileOffset));
                break;
            }
        }
        // 最后三个文件一共恢复了多少个offset,加上起始processOffset,为当前consumequeue的处理offset,多个queueId下的consumequeue,去更新对应的的mappedFileQueue的最后flush、commit位置,并删除错误的mappedFile数据
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        // 删除这个offset之后的错误的mappedFile数据
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        // 处理ConsumeQueueExt扩展
        if (isExtReadEnable()) {
            this.consumeQueueExt.recover();
            log.info("Truncate consume queue extend file by max {}", maxExtAddr);
            this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
        }
    }
}
commitlog 数据恢复

commitlog数据恢复分为两种情况,broker正常退出,从倒数第三个mappedfile文件恢复;异常退出,从最后一个mappedFile文件往前找确认最后一个正常消息所在的mappedFile,然后开始恢复,异常的commitlog文件的mappedFile的message会进行重新分发构建新的consumequeue、index索引文件。

正常恢复

正常退出,数据恢复,所有内存数据将会被刷新,从倒数第三个文件恢复。

/**
 * 正常退出,数据恢复,所有内存数据将会被刷新,从倒数第三个文件恢复
 * When the normal exit, data recovery, all memory data have been flush
 */
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
    // 恢复时是否检查CRC
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // Began to recover from the last third file
        int index = mappedFiles.size() - 3;
        if (index < 0)
            index = 0;

        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        // 为commitlog已经确认的偏移量,为commitlog需要恢复的偏移量
        long processOffset = mappedFile.getFileFromOffset();
        // 为当前文件检验已经通过的offset
        long mappedFileOffset = 0;
        while (true) {
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();
            // Normal data
            if (dispatchRequest.isSuccess() && size > 0) {
                mappedFileOffset += size;
            }
            // Come the end of the file, switch to the next file Since the
            // return 0 representatives met last hole,
            // this can not be included in truncate offset

            // 构造下一个待检测文件,继续在while里面循环
            else if (dispatchRequest.isSuccess() && size == 0) {
                index++;
                if (index >= mappedFiles.size()) {
                    // Current branch can not happen
                    log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
                    break;
                } else {
                    mappedFile = mappedFiles.get(index);
                    byteBuffer = mappedFile.sliceByteBuffer();
                    processOffset = mappedFile.getFileFromOffset();
                    mappedFileOffset = 0;
                    log.info("recover next physics file, " + mappedFile.getFileName());
                }
            }
            // Intermediate file read error
            // 终止跳出循环
            else if (!dispatchRequest.isSuccess()) {
                log.info("recover physics file end, " + mappedFile.getFileName());
                break;
            }
        }
        // 最后commitlog的mappdFile文件一共恢复了多少个offset,加上起始processOffset,为当前commitlog的处理offset,去更新对应的的mappedFileQueue的最后flush、commit位置,并删除错误的mappedFile数据
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);

        // 删除phyOffset之后的consumequeue的mappdFile
        // Clear ConsumeQueue redundant data
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    } else {
        // commitlog 为空,不需要加载数据
        // Commitlog case files are deleted
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}
异常恢复

异常退出commitlog恢复,从最后一个mappedFile文件往前找确认最后一个正常消息所在的mappedFile,然后开始恢复,异常的commitlog文件的mappedFile的message会进行重新分发构建新的consumequeue、index索引文件,然后在恢复commitlog文件。

public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
    // recover by the minimum time stamp
    // 恢复时是否检查CRC
    boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
    final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
    if (!mappedFiles.isEmpty()) {
        // Looking beginning to recover from which file
        // 从倒数第一个文件开始恢复
        int index = mappedFiles.size() - 1;
        MappedFile mappedFile = null;
        // 根据checkpoint找到开始被恢复的文件
        for (; index >= 0; index--) {
            mappedFile = mappedFiles.get(index);
            // mappedFile的存储时间和checkpoint的commitlog的最后刷盘时间进行对比,确认最后一个正常消息所在的mappedFile
            if (this.isMappedFileMatchedRecover(mappedFile)) {
                log.info("recover from this mapped file " + mappedFile.getFileName());
                break;
            }
        }
        // <0,重置为第一个
        if (index < 0) {
            index = 0;
            mappedFile = mappedFiles.get(index);
        }
        // 错误所在的mappedFile
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        // 为commitlog已经确认的偏移量,为commitlog需要恢复的偏移量
        long processOffset = mappedFile.getFileFromOffset();
        // 为当前文件检验已经通过的offset
        long mappedFileOffset = 0;
        while (true) {
            // 检测消息,并构造DispatchRequest
            DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
            int size = dispatchRequest.getMsgSize();

            if (dispatchRequest.isSuccess()) {
                // Normal data
                if (size > 0) {
                    // 增加已经恢复的offset记录
                    mappedFileOffset += size;
                    // 进行分发请求再次构造
                    // 是否循序重复转发,不允许进行commitlog已确认的offset的对比
                    if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                        if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    } else {
                        this.defaultMessageStore.doDispatch(dispatchRequest);
                    }
                }
                // Come the end of the file, switch to the next file
                // Since the return 0 representatives met last hole, this can
                // not be included in truncate offset
                // 构造下一个待检测文件,继续在while里面循环
                else if (size == 0) {
                    index++;
                    if (index >= mappedFiles.size()) {
                        // The current branch under normal circumstances should
                        // not happen
                        log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
                        break;
                    } else {
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next physics file, " + mappedFile.getFileName());
                    }
                }
            } else {
                log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
                break;
            }
        }
        // 最后commitlog的mappdFile文件一共恢复了多少个offset,加上起始processOffset,为当前commitlog的处理offset,去更新对应的的mappedFileQueue的最后flush、commit位置,并删除错误的mappedFile数据
        processOffset += mappedFileOffset;
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
        // 删除phyOffset之后的consumequeue的mappdFile,indexFile索引文件脏数据并没有被删除,构建Index文件的offset时,和会已确认的offset已经对比,如果存在就不在构建直接返回,参考buildIndex()方法;
        // 同样构建consumequeue的时候也会和已确认的offset已经对比,如果存在就不在构建直接返回,参考putMessagePositionInfo()方法。
        // Clear ConsumeQueue redundant data
        if (maxPhyOffsetOfConsumeQueue >= processOffset) {
            log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
            this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
        }
    }
    // commitlog 为空,不需要加载数据
    // Commitlog case files are deleted
    else {
        log.warn("The commitlog files are deleted, and delete the consume queue files");
        this.mappedFileQueue.setFlushedWhere(0);
        this.mappedFileQueue.setCommittedWhere(0);
        this.defaultMessageStore.destroyLogics();
    }
}
consumeQueueTable恢复

设置consumeQueueTable数据表中的topic-queueId的consumequeue中最大的mappedFile对应的offset。

public void recoverTopicQueueTable() {
    HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
    long minPhyOffset = this.commitLog.getMinOffset();
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            String key = logic.getTopic() + "-" + logic.getQueueId();
            table.put(key, logic.getMaxOffsetInQueue());
            logic.correctMinOffset(minPhyOffset);
        }
    }

    this.commitLog.setTopicQueueTable(table);
}

注意

存储启动时所谓的文件恢复主要完成 flushedPosition, committedWhere 指针的设置 、消息消费队列最大偏移 加载到内存,并删除 flushedPosition之后所有的文件。

如果Broker异常启动, 在文件恢复过程中RocketMQ会将最后一个有效文件中的所有消息重新转发到消息消费队列与索引文件,确保不丢失消息,但同时会带来消息重复的问题,纵观RocketMQ的整体设计思想,RocketMQ保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幕等设计。
但RocketMQ也做了一些优化,比如构建Index索引文件的offset时,和会已确认的offset已经对比,如果存在就不在构建直接返回,参考buildIndex()方法;同样构建consumequeue的时候也会和已确认的offset已经对比,如果存在就不在构建直接返回,参考putMessagePositionInfo()方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352

推荐阅读更多精彩内容