本章节,我们主要从rmq的broker启动后,会如何初始化【业务消息】、【逻辑位移索引】以及【key查询索引】这三种消息来分析底层实现。
直接上源码,存储相关的初始化的入口:
public boolean load() {
boolean result = true;
try {
//step1
//通过abort文件判断上次rmq上次退出是否正常,如果正常退出,就可以把abort文件删除,即不存在。
boolean lastExitOK = !this.isTempFileExist();
log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");
...
//step2
// load Commit Log,加载消息存储映射文件
result = result && this.commitLog.load();
// load Consume Queue,加载逻辑消费队列
result = result && this.loadConsumeQueue();
if (result) {
//step3
this.storeCheckpoint =
new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
//step4
//加载索引文件
this.indexService.load(lastExitOK);
//step5
this.recover(lastExitOK);
}
}
...
return result;
}
我们先来梳理一下上述的加载流程:
step1,通过abort文件判断上次rmq上次退出是否正常,如果正常退出,就可以把abort文件删除,即不存在。否则,我们需要通过快照方式恢复这些存储文件。
step2,我们依次初始化业务消息存储文件(commitLog.load()
)以及逻辑位移存储文件(this.loadConsumeQueue()
)对应的连续映射文件抽象。
step3,加载存储快照,里面存储上述三类存储文件的最后一次完整刷盘的时间戳。里面有三个long类型的字段属性,分别是physicMsgTimestamp
业务消息存储文件最后一次完整刷盘时间戳,logicsMsgTimestamp
逻辑位移存储文件最后一次完整刷盘时间戳 以及 indexMsgTimestamp
key查询索引存储文件最后一次完整刷盘时间戳。
step4,加载key查询索引存储文件。这里有可能使用快照方式。结合步骤三,我们先大致说一下使用快照方式的流程。这里以【key查询索引】存储文件为例。【key查询索引】d刷盘形式为,直到上一个indexFile 满了以后,才开线程异步刷盘。每次【key查询索引】存储文件刷盘后,同时会更新 indexMsgTimestamp
属性。然后,在某个【key查询索引】存储文件刷盘的时间点上,broker突然宕机了,那么在重启恢复时,就会根据存储文件的文件头属性endTimestamp
(该属性是每构建一条索引时,均会更新)与存储快照属性indexMsgTimestamp
作比较,把文件头属性endTimestamp
大于indexMsgTimestamp
的文件全都删除。
step5,加载部分业务消息以及逻辑位移索引值pagecache中。
上述的步骤1-4都比较简单,读者可以自行根据描述自行分析,我们重点分析一下step5:
private void recover(final boolean lastExitOK) {
//step1.加载 逻辑物理位至pagecache
this.recoverConsumeQueue();
//step2.加载业务消息至pagecache
if (lastExitOK) {
this.commitLog.recoverNormally();
} else {
this.commitLog.recoverAbnormally();
}
//step3.加载TopicQueueTable到内存中
this.recoverTopicQueueTable();
}
接下来,我们将从三个步骤分析recover(...)
:
1、加载 逻辑物理位至pagecache
2、加载业务消息至pagecache
3、加载TopicQueueTable到内存中
1、加载 逻辑物理位至pagecache
private void recoverConsumeQueue() {
//遍历所有topic
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
//遍历所有queueId
for (ConsumeQueue logic : maps.values()) {
logic.recover();
}
}
}
这里主要是遍历所有的topic-queueId 为维度下,加载对应的业务视图消费队列ConsumeQueue,我们接着进入logic.recover()
:
public void recover() {
//h获取逻辑位移连续映射文件抽象
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
//这里说明初始化时,最多只加载最新三个文件内容至pagecache中
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
int mappedFileSizeLogics = this.mappedFileSize;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
//读取一条完整的 逻辑位移索引
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
//这里只需要保证读完整一条
if (offset >= 0 && size > 0) {
//更新单个文件的处理位置
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
//基于消息时递增的,因此每次读取的offset均可以认为是最大的物理位移
this.maxPhysicOffset = offset;
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
if (mappedFileOffset == mappedFileSizeLogics) {
//代码走到这里,说明这个索引文件已处理完,需滚到到下一个文件。
index++;
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last maped file "
+ mappedFile.getFileName());
break;
} else {
//继续获取下一个逻辑位移索引映射文件
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
//更新方法内全局处理位置
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
}
总结一下上述代码段逻辑:
总的来说,就是最多只加载最新三个文件的逻辑位移内容至pagecache中,遍历这三个文件,然后逐条读取 【逻辑位移索引】字节内容。
其中,while(true)
循环就是逐条读取 【逻辑位移索引】。
这里,仅仅通过if (offset >= 0 && size > 0)
这个判断,即可认为这条【逻辑位移索引】是完整的?我认为该判断是有误的。首先,我们知道直接内存映射的刷盘操作并不是原子性,如果在刷盘过程中,机子突然宕机了,就无法将pagecache中的字节内容完整的保存在磁盘中。另外,在broker启动过程中,是对整个【逻辑位移索引】的存储文件做直接内存映射,换言之,对于没写入内容的空白存储部分,例如执行byteBuffer.getLong()
,读取pagecache中下一个offset
(8字节,64位),但出于宕机的原因,只将offset
前面63位存放到磁盘中,因此,会以‘0’作为补充位 来补充缺失的字段。这样一来,byteBuffer.getLong()
操作读取到的offset
即使是大于0,也可能是不准确的。但是,作为offset
的相邻存储字段size
,如果size > 0
,虽然不能保证读取到的size是完整的字节内容,却可以保证offset
字段是完整的。因此,如果通过该判断仅仅能保证offset
是正确的,去无法保证size
与tagsCode
字段正确。
假设,读取到一条正确的【逻辑位移索引】后,由于业务消息的物理存储位移是递增的,因此每次从【逻辑位移索引】读取的offset字段均可以认为是最大的物理位移,可以直接更新至ConsumeQueue的业务消息最大物理位移实例属性this.maxPhysicOffset
。
而循环内的mappedFileOffset
表明一个存储文件的当前处理位置,每读取一条【逻辑位移索引】,均加上20,对应代码mappedFileOffset = i + CQ_STORE_UNIT_SIZE
。当mappedFileOffset
到达存储文件的尾部是,即 if (mappedFileOffset == mappedFileSizeLogics)
,则说明这个索引文件已处理完,需滚到到下一个文件。
而processOffset
代表连续存储索引文件的全局处理位置,因此,每次处理完一个完整的存储文件时,均需要 processOffset = mappedFile.getFileFromOffset()
更新全局处理位置。
当循环结束后,更新最后一个文件的处理位置processOffset += mappedFileOffset
。
当然,为了容错处理,还需要清除一些脏数据,我们直接分析this.mappedFileQueue.truncateDirtyFiles(processOffset)
:
public void truncateDirtyFiles(long offset) {
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
for (MappedFile file : this.mappedFiles) {
//获取文件尾部的物理位移
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset > offset) {
if (offset >= file.getFileFromOffset()) {
//校正文件的缓存读写位置
file.setWrotePosition((int) (offset % this.mappedFileSize));
file.setCommittedPosition((int) (offset % this.mappedFileSize));
file.setFlushedPosition((int) (offset % this.mappedFileSize));
} else {
//代码走到这里,说明本文件是需要删除的脏文件。
file.destroy(1000);
willRemoveFiles.add(file);
}
}
}
this.deleteExpiredFile(willRemoveFiles);
}
先说一下入参offset
,其含义是目前最大的【逻辑位移索引】储物理位移。清除脏文件的逻辑如下,遍历目前所有的【逻辑位移索引】存储文件,然后比较offset
与文件尾部的存储物理位移,如果offset
大,则认为该文件正常;如果小于,则继续与文件开始的存储物理位移(file.getFileFromOffset()
),如果offset
还比文件开始的存储物理位移还要小,则说明该文件可以删除了;否则,校正文件的缓存读写位置。
到这里就分析完【逻辑位移索引】载入pagecache流程
2、加载业务消息至pagecache
这一步,我们在分两种情况去分析,一种是正常的加载业务消息的方式,另一种是通过快照方式加载。
首先,如果标志位lastExitOK
位true,说明该broker可以正常退出,则按正常方式,即 this.commitLog.recoverNormally()
:
public void recoverNormally() {
//default :true
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
// Come the end of the file, switch to the next file Since the
// return 0 representatives met last hole,
// this can not be included in truncate offset
else if (dispatchRequest.isSuccess() && size == 0) {
index++;
if (index >= mappedFiles.size()) {
// Current branch can not happen
log.info("recover last 3 physics file over, last maped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
// Intermediate file read error
else if (!dispatchRequest.isSuccess()) {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
}
}
上述的恢复流程与处理方式基本与【逻辑位移索引】载入pagecache的流程一致,都是连续加载最多3个最新的存储文件内容,只不过,加载的内容则是具体的业务消息字节内容而已,即DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)
。之前在构建【逻辑位移索引】有分析过this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover)
这个方法,这里,即把业务消息字节按照存储格式加载至pagecache中,只不过这里需要对内容进行crc检测。
我们在来看看通过快照形式加载,即进入this.commitLog.recoverAbnormally()
:
public void recoverAbnormally() {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
//step1、判断从哪个映射文件开始恢复,即从指定的开始文件到结束都需要通过快照恢复
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this maped file " + mappedFile.getFileName());
break;
}
}
if (index < 0) {
index = 0;
mappedFile = mappedFiles.get(index);
}
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
//step2、读取一条完整的消息
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (size > 0) { //说明该条消息是一条完整的消息
mappedFileOffset += size;
//
//尝试去构建索引,因为有可能索引已存在,如果存在,则跳过。
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else if (size == -1) {
//代码走到这里,说明读取到了一条不完整的消息
log.info("recover physics file end, " + mappedFile.getFileName());
break;
} else if (size == 0) {
//滚动到下一个待处理的文件
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last maped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
}
//step3、
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
//Clear ConsumeQueue redundant data(清除多余的逻辑位移索引数据)
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
} else {
//代码走到这里,说明所有的业务文件都删除了,因此,对应的位移索引文件也需要
//全部清空
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
}
分析一下上述代码流程。可以看出,不管是业务视图消费队列ConsumeQueue的恢复,或是【逻辑位移索引】恢复,还是业务消息的正常加载,他们的处理流程基本上都一致,而业务消息的非正常加载,也是采取了相同的处理流程。
对于业务消息的非正常加载,step1中先找出需要进行快照恢复的业务消息存储映射文件开始索引位置(List 中的index),然后遍历所有大于或等于该索引的业务消息存储映射文件(只有最后一个内存映射文件所存储的消息才有可能不完整)。
step2滚动读取映射文件中的每条消息,在判断返回的读取字节大小size
,如果size大于0,说明该条消息是一条完整的消息,然后尝试去构建索引,因为有可能索引已存在,如果存在,则跳过。如果size
等于-1,说明读取到了一条不完整的消息,进一步说明,从上一条完整的消息的存储物理位移的尾部开始,往后的存储字节内容我们均可认为是无效的,因此可以重写这部分的存储。如果size等于0,这完成遍历,跳出循环,否则,滚动到下一个映射文件。
step3中,processOffset
记录最大的完整业务消息存储物理位移,再更新全局的pagecache读写位移(mappedFileQueue.setFlushedWhere(processOffset)
以及mappedFileQueue.setCommittedWhere(processOffset)
)。
而mappedFileQueue.truncateDirtyFiles(processOffset)
则是清除脏数据文件,清除逻辑这里在总结一下,如果映射文件的存储起始位置大于processOffset
,则直接删除该文件。如果processOffset
位于映射文件的起始位置以及结束位置之间,则直接更新本文件的读写位置,值同样为processOffset
。最后的this.defaultMessageStore.truncateDirtyLogicFiles(processOffset)
则是根据processOffset
,清除那些与之对应的业务消息存储物理位移大于processOffset
的逻辑位移索引。由于逻辑比较简单,这里就不在详细展开,有兴趣的读者可以自行分析。
这里,我们再进一步分析分第一步中,是如何断从哪个映射文件开始恢复,跟进this.isMappedFileMatchedRecover(mappedFile)
:
private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
//读取映射文件中,第一条业务消息的的存储时间
int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
return false;
}
long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
if (0 == storeTimestamp) {
return false;
}
//根据条件,消息存储时间与快照时间比较,决定改文件是否正常恢复
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
log.info("find check timestamp, {} {}", //
storeTimestamp, //
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
log.info("find check timestamp, {} {}", //
storeTimestamp, //
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
}
return false;
}
根据上述代码逻辑,首先,获取本映射文件的第一条存储消息的存储时间戳(即写入pagecache的时间刻度)storeTimestamp
。这里,我们假设允许【key查询索引】开启,即this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
为true,并且this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()
也为true(要求【key查询索引】可以安全查询),则storeTimestamp
与this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()
比较,在来看看getMinTimestampIndex()
:
public long getMinTimestampIndex() {
return Math.min(this.getMinTimestamp(), this.indexMsgTimestamp);
}
public long getMinTimestamp() {
long min = Math.min(this.physicMsgTimestamp, this.logicsMsgTimestamp);
// fixed https://github.org/apache/rocketmqissues/467
min -= 1000 * 3;
if (min < 0)
min = 0;
return min;
很明显,getMinTimestampIndex()
返回的是indexMsgTimestamp
、physicMsgTimestamp
以及logicsMsgTimestamp
三者中的最小值。
其中,
physicMsgTimestamp
代表最后一条刷盘成功的【业务消息】的storeTimestamp
(写入pagecache中的时间戳);
logicsMsgTimestamp
代表最后一条刷盘成功的【逻辑位移索引】所对应【业务消息】的storeTimestamp
;
indexMsgTimestamp
代表最后一条刷盘成功的【key查询索引】所对应【业务消息】的storeTimestamp
。
换言之,如果storeTimestamp
小于getMinTimestampIndex()
,方法isMappedFileMatchedRecover(...)
则返回true否则返回false。我们在来结合外层的非正常恢复方法recoverAbnormally()
public void recoverAbnormally() {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
// Looking beginning to recover from which file
//step1、判断从哪个映射文件开始恢复,即从指定的开始文件到结束都需要通过快照恢复
int index = mappedFiles.size() - 1;
MappedFile mappedFile = null;
for (; index >= 0; index--) {
mappedFile = mappedFiles.get(index);
if (this.isMappedFileMatchedRecover(mappedFile)) {
log.info("recover from this maped file " + mappedFile.getFileName());
break;
}
}
...
}
从for (; index >= 0; index--)
循环,我们可以看出是从this.mappedFileQueue.getMappedFiles()
(连续存储文件集合)的尾部开始遍历,如果this.isMappedFileMatchedRecover(mappedFile)
为true,则跳出循环,换言之,index
记录的是开始恢复的索引位置。如何理解结束循环的条件是storeTimestamp
与indexMsgTimestamp
、physicMsgTimestamp
以及logicsMsgTimestamp
三者中的最小值比较,而不是storeTimestamp
与physicMsgTimestamp
比较??
原因如下:说原因之前,我们先来分析一个业务场景。假设broker第一次启动并始化,之前没有接受过任何producer的消息发送请求,并且消息时同步刷盘。突然间,并发上来了,一下子写满了两个文件,现在在写第三个文件,并且是该文件的第三个消息,还没刷盘,由于消息时同步刷盘的,因此我们可以知道前两个文件以及第三个文件的前两个消息均能完整落地。另一方面,根据之前的章节中,我们知道【逻辑位移索引】以及【key查询索引】的构建以及刷盘均有单独的线程异步进行的。在我们假设的场景中,【逻辑位移索引】以及【key查询索引】都没有对应的线程都没有进行刷盘。就在这时,机器突然宕机了,最后一个消息至刷盘了一半。因此,我们在按照非正常的恢复流程中,则需要根据storeTimestamp
与indexMsgTimestamp
、physicMsgTimestamp
以及logicsMsgTimestamp
三者中的最小值比较,找出最早那条已经构建索引失败的业务消息。并根据这条业务消息所在的映射文件,滚动构建这条消息以及之后存储的消息的索引。对于最后一个文件的第三条消息,由于无法完整刷盘,因此,需要截掉这部分不完整的消息。
到这里,我们已近分析完业务消息正常或非正常的恢复流程。
3、加载TopicQueueTable到内存中
我们先来看看CommitLog
的实例属性topicQueueTable
:
private HashMap<String/* topic-queueid /, Long/ offset */> topicQueueTable = new HashMap<String, Long>(1024);
从数据结构中我们可以看出,它是一个HashMap
, key为topic-queueId,value为消息位移。我们可以通过topic-queueId ,确定一条队列目前最大的可消费位移。这里简单在说一下,通过offset * CQ_STORE_UNIT_SIZE(【逻辑位移索引大小】)
,可以算出具体的【逻辑位移索引】的物理存储位移,然后在通过【逻辑位移索引】,查询出具体的业务消息内容。
我们在来分析是如何加载TopicQueueTable到内存中recoverTopicQueueTable()
:
private void recoverTopicQueueTable() {
HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
//获取未删除,有效并且最旧的消息的开始位置
long minPhyOffset = this.commitLog.getMinOffset();
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
String key = logic.getTopic() + "-" + logic.getQueueId();
//logic.getMaxOffsetInQueue():这里获取当前队列最大的逻辑位移(即已经持久化的最大的位置索引消息)
table.put(key, logic.getMaxOffsetInQueue());
logic.correctMinOffset(minPhyOffset);
}
}
this.commitLog.setTopicQueueTable(table);
}
分析一下上述流程:在步骤一 recoverConsumeQueue()
中,我们已近分析了consumeQueueTable
的加载流程。而TopicQueueTable
主要还是依据consumeQueueTable
来加载。
从代码中,我们可以看出,遍历所有的ConsumeQueue
,即可得到 对应的最大消费位移logic.getMaxOffsetInQueue()
:
public long getMaxOffsetInQueue() {
return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
}
public long getMaxOffset() {
MappedFile mappedFile = getLastMappedFile();
if (mappedFile != null) {
return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
}
return 0;
}
其中, getMaxOffset()
的语义为返回最后一个映射文件的最新可读位置,换言之,即最后一条存储内容的物理位移,最后在通过`this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE即可算出最大消费位移。
接着是logic.correctMinOffset(minPhyOffset)
,该方法主要是校正ConsumeQueue
的实例属性minLogicOffset
(目前该队列可消费最小【逻辑位移索引】的 物理位移),之前已近分析过该方法,这里就不在详细展开。
到这里,TopicQueueTable
的加载流程全部分析完
以上就是存储恢复的底层实现细节。