DefaultMessageStore.load()->commitLog(MappedFileQueue).load
->loadConsumeQueue
文件管理对象
DefaultMessageStore:默认文件存储管理类
CommitLog:日志消息提交文件
ConsumeQueue:消费索引文件
MappedFileQueue:映射文件组
MappedFile:具体的一个个映射文件
StoreCheckpoint:
文件加载
CommitLog和ConsumeQueue调用自己的load方法至MappedFileQueue字段
文件自我检查
检查相邻文件的起始值ID是否等于一个文件的长度,统一调用MappedFileQueue.checkSelf
public void checkSelf() {
if (!this.mappedFiles.isEmpty()) {
Iterator<MappedFile> iterator = mappedFiles.iterator();
MappedFile pre = null;
while (iterator.hasNext()) {
MappedFile cur = iterator.next();
if (pre != null) {
if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",
pre.getFileName(), cur.getFileName());
}
}
pre = cur;
}
}
}
系统是否正常退出
通过判断abort文件是否存在
private boolean isTempFileExist() {
String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
File file = new File(fileName);
return file.exists();
}
文件重启恢复
文件恢复总控制逻辑为调用DefaultMessageStore.recover
private void recover(final boolean lastExitOK) {
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
if (lastExitOK) {
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}
this.recoverTopicQueueTable();
}
文件正常重启恢复
CosumeQueue文件恢复
从倒数第3个文件开始,恢复的逻辑即是查找最大有效偏移量,设置提交位移和刷盘位移并截断脏数据
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
int mappedFileSizeLogics = this.mappedFileSize;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
if (offset >= 0 && size > 0) {
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset + size;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
} else {
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
if (mappedFileOffset == mappedFileSizeLogics) {
index++;
if (index >= mappedFiles.size()) {
log.info("recover last consume queue file over, last mapped file "
+ mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
if (isExtReadEnable()) {
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
}
CommitLog正常文件恢复
类似CosumeQueue恢复逻辑由文件信息获取到有效偏移量,再由CosumeQueue得来的消息最大偏移量进行对比截断掉CommitLog里没有记录的消息
if (!mappedFiles.isEmpty()) {
// Began to recover from the last third file
int index = mappedFiles.size() - 3;
if (index < 0)
index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
while (true) {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();
// Normal data
if (dispatchRequest.isSuccess() && size > 0) {
mappedFileOffset += size;
}
///省略一万字
}
processOffset += mappedFileOffset;
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
} else {
// Commitlog case files are deleted
log.warn("The commitlog files are deleted, and delete the consume queue files");
this.mappedFileQueue.setFlushedWhere(0);
this.mappedFileQueue.setCommittedWhere(0);
this.defaultMessageStore.destroyLogics();
}
CommitLog异常文件恢复
与正常恢复相比,会根据检查点的时间戳信息找到需要恢复的文件,再一个个取出来放到ConsumeQueue里,最后也会截断掉ConsumeQueue里CommitLog里没有记录的消息
.........
如何控制写入线程安全性
通过lock锁,初始化如下
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
放入消息时如下使用
public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch){
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
///此处省略一万字
} finally {
putMessageLock.unlock();
}
}