概览
Broker启动需要知道commitlog上次的commitWhere、flushWhere上次的位置,启动流程加载commitlog、consumequeue、index、abort、checkpoint文件是为了获取这些值,为接受即将到来的消息做准备。
由于RocketMQ存储首先将消息全量存储在commitlog文件中,然后异步生成转发请求更新consumequeue、index索引文件。如果消息成功存储到commitlog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,导致commitlog、consumequeue、indexfile文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在commitlog文件中存在,但由于并没有转发到consumequeue,这部分消息将永远不会被消费者消费。RocketMQ是如何使commitlog、consumequeue达到最终一致性的呢?
加载过程
先从DefaultMessageStore这个消息入口说起,首先加载commitlog、consumequeue文件对应的mappedFile对象,做好文件映射,还有indexFile对象的加载和IndexHedaer文件加载到内存。然后再根据abort、checkpoint文件,再去加载恢复最近的mappedFile的文件,确定commitWhere、flushWhere等上次写入消息的值,恢复上次加载时的模样,准备再次接受消息,存储消息到磁盘。
首先它会加载abort文件,abort文件在项目启动的时候创建,项目正常结束删除,项目异常关闭的时候保留,从而判断项目是否正常结束。
在退出是通过主次JVM钩子函数删除abort文件。然后是把commitlog文件下的mappedFile对象加载到内存,并做好MappedFile文件内存到磁盘的映射,此时mappedFile文件的数据并没有从磁盘加载到内存。
加载 consumequeue 文件夹下对应文件的ConsumeQueue对象,只是创建了对应的ConsumeQueue对象,做好了文件映射,并没有从磁盘加载文件。
加载checkpoint文件,这个是真正加载文件了。进行它上次写入消息的commitlog文件刷盘时间点、消息的消费队列文件刷盘时间点、索引文件刷盘时间点的对比,找到正确的写入消息的各个时间,进行消息是否正确处理的判断。
加载Index索引文件对应的IndexFile对象,做好了文件映射,实际是只加载了IndexHeader汇总信息,建立好了mmap磁盘映射。
数据恢复
(1) 先恢复 consumequeue 文件,把不符合的 consumequeue 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。
(2) 如果 abort文件存在,此时找到第一个正常的 commitlog 文件,然后对该文件重新进行转发,依次更新 consumequeue,index文件。然后在恢复次commitlog文件
/**
* 项目启动加载 commitlog、consumequeue、index、checkpoint、abort 文件及文件夹下历史数据
* @throws IOException
*/
public boolean load() {
boolean result = true;
try {
// ${RocketMQ_HOME}/store/abort文件是否存在,存在:异常退出;不存在:正常退出。
// false:代表异常退出,true:代表正常退出
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
// 处理延迟定时消息,加载
if (null != scheduleMessageService) {
result = result && this.scheduleMessageService.load();
}
// load Commit Log
// 项目启动,加载 commitlog 文件夹下对应文件的mappedFile对象,只是创建了对应的MappedFile,做好了文件映射,并没有从磁盘加载文件
result = result && this.commitLog.load();
// load Consume Queue
// 加载 consumequeue 文件夹下对应文件的ConsumeQueue对象,只是创建了对应的ConsumeQueue对象,做好了文件映射,并没有从磁盘加载文件
result = result && this.loadConsumeQueue();
if (result) {
// 加载checkpoint文件,这个是真正加载文件了
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
// 加载Index索引文件对应的IndexFile对象,做好了文件映射,实际是只加载了IndexHeader汇总信息,建立好了mmap磁盘映射
this.indexService.load(lastExitOK);
// 数据恢复
// 1. 先恢复 consumequeue 文件,把不符合的 consumequeue 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。
// 2. 如果 abort文件存在,此时找到第一个正常的 commitlog 文件,然后对该文件重新进行转发,依次更新 consumequeue,index文件。然后在恢复次commitlog文件。
this.recover(lastExitOK);
log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}
} catch (Exception e) {
log.error("load exception", e);
result = false;
}
if (!result) {
this.allocateMappedFileService.shutdown();
}
return result;
}
数据恢复
这里详细讲一下数据恢复的流程:
先恢复 consumequeue 文件,把不符合的 consumequeue 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。
如果 abort文件存在,此时找到第一个正常的 commitlog 文件,然后对该文件重新进行转发,依次更新 consumequeue,index文件,然后在恢复次commitlog文件。
设置consumeQueueTable数据表中的topic-queueId的consumequeue中最大的mappedFile对应的offset。
consumequeue的恢复
把不符合的 consueme 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。
最终目的:加载磁盘中的数据到MappedFile,并更新mappedFileQueue中的commitWhere、flushWhere值,并删除错误的mappedFile文件。
/**
* 恢复 consumequeue 文件,把不符合的 consume 文件删除,一个 consume 条目正确的标准(commitlog偏移量 >0 size > 0)[从倒数第三个文件开始恢复]。
* @return
*/
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
// queueId-> ConsumeQueue;
// 对应的consumequeue对象遍历恢复
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
//一个topic下的一个queueId,恢复一次consumequeue
logic.recover();
// consumequeue文件的最大的offset
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
// 赋值maxPhysicOffset = 当前文件最大的maxPhysicOffset,供下次比较
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}
return maxPhysicOffset;
}
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// 从倒数第三个文件开始恢复,这是一个经验吧,从倒数第三个consumequeue来恢复数据,不够3个从第0个开始
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
// 逻辑文件大小
int mappedFileSizeLogics = this.mappedFileSize;
// 获取mappedFile文件
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
// 为commitlog已经确认的偏移量,为commitlog需要恢复的偏移量
long processOffset = mappedFile.getFileFromOffset();
// 为当前文件检验已经通过的offset
long mappedFileOffset = 0;
long maxExtAddr = 1;
// while循环,接连恢复3个文件,结束后跳出,while循环
while (true) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
// tag的hashcode
long tagsCode = byteBuffer.getLong();
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
// 最大的恢复的byte位置
this.maxPhysicOffset = offset + size;
// 处理ConsumeQueueExt扩展
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
// 这个文件没有问题,检测下一个文件,未下一个文件检测做准备,继续在while里面循环
if (mappedFileOffset == mappedFileSizeLogics) {
// 一下文件的下标
index++;
// 大于最大文件数
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
// 最后三个文件一共恢复了多少个offset,加上起始processOffset,为当前consumequeue的处理offset,多个queueId下的consumequeue,去更新对应的的mappedFileQueue的最后flush、commit位置,并删除错误的mappedFile数据
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
// 删除这个offset之后的错误的mappedFile数据
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// 处理ConsumeQueueExt扩展
if (isExtReadEnable()) {
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}
commitlog 数据恢复
commitlog数据恢复分为两种情况,broker正常退出,从倒数第三个mappedfile文件恢复;异常退出,从最后一个mappedFile文件往前找确认最后一个正常消息所在的mappedFile,然后开始恢复,异常的commitlog文件的mappedFile的message会进行重新分发构建新的consumequeue、index索引文件。
正常恢复
正常退出,数据恢复,所有内存数据将会被刷新,从倒数第三个文件恢复。
/**
* 正常退出,数据恢复,所有内存数据将会被刷新,从倒数第三个文件恢复
* When the normal exit, data recovery, all memory data have been flush
*/
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
// 恢复时是否检查CRC
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
// 为commitlog已经确认的偏移量,为commitlog需要恢复的偏移量
long processOffset = mappedFile.getFileFromOffset();
// 为当前文件检验已经通过的offset
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
// 构造下一个待检测文件,继续在while里面循环
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
// 终止跳出循环
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
// 最后commitlog的mappdFile文件一共恢复了多少个offset,加上起始processOffset,为当前commitlog的处理offset,去更新对应的的mappedFileQueue的最后flush、commit位置,并删除错误的mappedFile数据
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// 删除phyOffset之后的consumequeue的mappdFile
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// commitlog 为空,不需要加载数据
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
异常恢复
异常退出commitlog恢复,从最后一个mappedFile文件往前找确认最后一个正常消息所在的mappedFile,然后开始恢复,异常的commitlog文件的mappedFile的message会进行重新分发构建新的consumequeue、index索引文件,然后在恢复commitlog文件。
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
// 恢复时是否检查CRC
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
// 从倒数第一个文件开始恢复
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
// 根据checkpoint找到开始被恢复的文件
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
// mappedFile的存储时间和checkpoint的commitlog的最后刷盘时间进行对比,确认最后一个正常消息所在的mappedFile
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this mapped file " + mappedFile.getFileName());
break;
}
}
// <0,重置为第一个
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
// 错误所在的mappedFile
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
// 为commitlog已经确认的偏移量,为commitlog需要恢复的偏移量
long processOffset = mappedFile.getFileFromOffset();
// 为当前文件检验已经通过的offset
long mappedFileOffset = 0;
while (true) {
// 检测消息,并构造DispatchRequest
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
// Normal data
if (size > 0) {
// 增加已经恢复的offset记录
mappedFileOffset += size;
// 进行分发请求再次构造
// 是否循序重复转发,不允许进行commitlog已确认的offset的对比
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
// 构造下一个待检测文件,继续在while里面循环
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position());
break;
}
}
// 最后commitlog的mappdFile文件一共恢复了多少个offset,加上起始processOffset,为当前commitlog的处理offset,去更新对应的的mappedFileQueue的最后flush、commit位置,并删除错误的mappedFile数据
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// 删除phyOffset之后的consumequeue的mappdFile,indexFile索引文件脏数据并没有被删除,构建Index文件的offset时,和会已确认的offset已经对比,如果存在就不在构建直接返回,参考buildIndex()方法;
// 同样构建consumequeue的时候也会和已确认的offset已经对比,如果存在就不在构建直接返回,参考putMessagePositionInfo()方法。
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// commitlog 为空,不需要加载数据
// Commitlog case files are deleted
else {
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
consumeQueueTable恢复
设置consumeQueueTable数据表中的topic-queueId的consumequeue中最大的mappedFile对应的offset。
public void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
table.put(key, logic.getMaxOffsetInQueue());
logic.correctMinOffset(minPhyOffset);
}
}
this.commitLog.setTopicQueueTable(table);
}
注意
存储启动时所谓的文件恢复主要完成 flushedPosition, committedWhere 指针的设置 、消息消费队列最大偏移 加载到内存,并删除 flushedPosition之后所有的文件。
如果Broker异常启动, 在文件恢复过程中RocketMQ会将最后一个有效文件中的所有消息重新转发到消息消费队列与索引文件,确保不丢失消息,但同时会带来消息重复的问题,纵观RocketMQ的整体设计思想,RocketMQ保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幕等设计。
但RocketMQ也做了一些优化,比如构建Index索引文件的offset时,和会已确认的offset已经对比,如果存在就不在构建直接返回,参考buildIndex()方法;同样构建consumequeue的时候也会和已确认的offset已经对比,如果存在就不在构建直接返回,参考putMessagePositionInfo()方法。