经过前面四个章节的分析,我们已近知道rocket mq 主要有三类消息的持久化,分别是【业务消息】、【逻辑位移索引】以及【key查询索引】,它们均会以文件形式落地到磁盘。但我们想一下,磁盘的容量是有限的,总不可能一直把这些消息存放于磁盘中。因此,接下来,我们来分析rmq是如何执行持久化文件的清除策略。
为了让读者有一个全局认识,先从总体概括一下持久化文件的清除策略。
首先,对于【业务消息】持久化文件来说,如果【业务消息】存储文件的存储容量到达了所占的磁盘分区空间使用百分比或者存储时间到期了,就会进行删除。而【逻辑位移索引】以及【key查询索引】会根据【业务消息】存储文件删除的物理位移,在进行删除。而删除逻辑由定时任务定时执行。
定时任务在broker启动时,进行注册的:
其中,
cleanFilesPeriodically()
就是清除逻辑的入口
private void cleanFilesPeriodically() {
//1、清除业务消息持久化文件
this.cleanCommitLogService.run();
//2、清除索引文件
this.cleanConsumeQueueService.run();
}
根据上述代码片段,接下来会分两大步去分析rmq清除流程
1、清除业务消息持久化文件
2、清除索引文件
1、清除业务消息持久化文件
this.cleanCommitLogService.run()
该逻辑是委托CleanCommitLogService
实例类完成的,跟进run()
方法:
public void run() {
try {
//删除过期文件,物理删除
this.deleteExpiredFiles();
//删除挂载文件,内存删除, 该步骤删除因deleteExpiredFiles()还没删除成功的文件
this.redeleteHangedFile();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
业务消息的存储文件通过两步删除,第一步通过deleteExpiredFiles()
物理删除满足条件的文件,但在删除过程中,某些文件有可能还在被引用,因此,通过redeleteHangedFile()
进一步删除第一步漏删除的文件。而redeleteHangedFile()
逻辑基本与deleteExpiredFiles()
一致,因此,我们只分析deleteExpiredFiles()
。
进入this.deleteExpiredFiles()
private void deleteExpiredFiles() {
int deleteCount = 0;
//获取文件的存活时间,默认为72小时
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
///删除物理文件的时间间隔 默认为100
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// 强制销毁 MapedFile 间隔时间 默认为 1000 * 120
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 回收硬盘存储 ,default is at 4 am 可以设置 为 03:04:05 ,表示3点,4点,五点都可以回收
boolean timeup = this.isTimeToDelete();
//总的来说不管是commitlog(消息存储文件) 或者是consumequeue(消费进度存储文件) 各自所占的磁盘分区空间使用百分比,如果大于75%
//则返回isSpaceToDelete = true ,如果大于85%,就设置cleanImmediately状态位为true
boolean spacefull = this.isSpaceToDelete();
//或者发起手动删除也可以
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
//时间到了,或者存储空间比率到了,又或者手动删除次数大于零,都要删除
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
//是否立刻删除 cleanFileForciblyEnable == true && cleanImmediately == true;; getMessageStoreConfig().isCleanFileForciblyEnable()默认为true
//cleanImmediately 会在commitlog(消息存储文件) 或者是consumequeue(消费进度存储文件)各自所占的磁盘分区空间使用百分比大于85%设置为true
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
...
//文件保留时间 1小时
fileReservedTime *= 60 * 60 * 1000;
//删除消息的持久化文件
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
...
}
}
从代码上,我们可以通过 if (timeup || spacefull || manualDelete)
,只要timeup
、spacefull
以及manualDelete
这三个条件中,有一个满足,就可以进一步进入删除逻辑,因此我们简单分析一下这三个标志位为true时的条件。
首先,标志位timeup
,它的语义就是更具用户设置的小时数(默认为04),例如,我们设置03:04:05
,则表示表示3点,4点,5点这整个点数的时间段都可以回收,则为true。
接着,标志位spacefull
,表示所有业务消息存储文件的总大小所占的磁盘分区空间使用百分比大于指定的配置比例(默认为75%),则为true。
最后,标志位manualDelete
,表明如果我们手动设置了删除,则为true。
进入 if (timeup || spacefull || manualDelete)
判断以后,在看看标志位cleanAtOnce
,该标志位表明是否需要立即对部分业务消息存储文件进行删除,条件为存储文件的总大小占的磁盘分区空间使用百分比大于85%,直到存储文件的总大小小于85%为止。
最后,在说一个属性,fileReservedTime
,该字段是文件保留时间,默认为72小时。
条件达到后,我们接着进入
commitLog.deleteExpiredFile(...)
:
public int deleteExpiredFile(//
final long expiredTime, //
final int deleteFilesInterval, //
final long intervalForcibly, //
final boolean cleanImmediately//
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
CommitLog
,即业务消息持久化抽象,其删除逻辑委托MappedFileQueue
,映射文件连续存储抽象完成。继续进入mappedFileQueue.deleteExpiredFileByTime(...):
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
//step1,获取MappedFiles快照
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
//存活最大时间戳 = 文件最后一次修改的时间戳(创建时间) + 60 * 60 * 1000 * 72(72小时)
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
//step2,如果当前系统时间 > 存活最大时间戳 或者立刻清除标志位为true
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
//step3,尝试销毁映射文件
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
...
} else {
break;
}
}
}
}
//step4,从缓存中删除所有过期,也即上述步骤中,物理删除成功的MappedFile
deleteExpiredFile(files);
return deleteCount;
}
上述逻辑主要分4个步骤来执行删除流程,step1,第一步,导出一份映射文件集合的快照,然后遍历快照。遍历过程中,step2,如果存储文件符合删除条件,即文件保留时间到了,或者磁盘空间超过指定百分比,step3,则对映射文件尝试进行销毁,如果销毁成功,则加入内存删除文件集合。step4,最后在从内存上删除销毁映射文件成功的存储文件。在step3中,如果尝试销毁映射文件成功后,会有一个files.size() >= DELETE_FILES_BATCH_MAX
的判断,如果符合,则结束销毁映射文件的遍历。我们可以思考一下为什么会需要改判断?。。。
其实原因也很简单,假如cleanImmediately
标志位为true,该标志位就是存储文件的总大小占的磁盘分区空间使用百分比大于85%满足时,如果没有该判断,那岂不是会删除该broker下所有的业务消息存储文件。因此,加上该判断,确保每次最多只能删除DELETE_FILES_BATCH_MAX(其值为10) 个存储文件。下次需要在通过时间,存储空间等判断,才会执行删除逻辑。
刚刚提到step3中,为什么是对映射文件进行尝试销毁,而不是强制销毁?我们接着跟进mappedFile.destroy(intervalForcibly)
方法:
public boolean destroy(final long intervalForcibly) {
//intervalForcibly设置,并符合,则释放所有的引用,并clean(this.mappedByteBuffer)
this.shutdown(intervalForcibly);
//确保释放了所有的引用即
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
//物理删除文件
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeEclipseTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
我们继续分析一下上述方法流程。首先,通过this.shutdown(intervalForcibly)
,该方法才是正在尝试销毁内存映射文件,如果销毁成功,即this.isCleanupOver()
的判断为true,就会this.fileChannel.close()
关闭文件通道以及this.file.delete()
物理删除该存储文件。
继续进入
this.shutdown(intervalForcibly)
:
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
public void release() {
long value = this.refCount.decrementAndGet();
if (value > 0)
return;
synchronized (this) {
this.cleanupOver = this.cleanup(value);
}
}
在《rocket mq 底层存储源码分析(1)-存储总概》章节中,我们曾经分析过,rmq通过引用计数法来对内存映射文件进行GC,其中this.available
标志位代表该内存映射文件是否有效,如果改标志位设为false,则表明该内存映射文件已无效,无法再使用。this.getRefCount()
代表当前的引用个数。什么情况下,这个引用个数会增加呢。例如,我们需要对业务消息进行构建索引时,我们就需要对消息所在的内存文件进行引用,即引用数加1。
我们不妨假设,此时,该映射文件还在被引用,那么,代码逻辑肯定会先运行第一个判断if (this.available)
,时this.available
为false
,表明该映射文件无法再被新的操作引用,并记下第一次尝试回收的时间this.firstShutdownTimestamp
。换言之,第一次回收不成功。一直回收到指定的时间间隔后intervalForcibly
(默认为两分钟),如果原来的引用因某些操作还完成二无法释放该映射文件的情况下,即代码中,既满足else if (this.getRefCount() > 0)
条件,又满足(System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly
条件,则会强行使this.refCount.set(-1000 - this.getRefCount())
该映射文件的引用为负数,即小于零,最后在根据release()
方法的逻辑,即可执行this.cleanup(value)
销毁方法。这里就是通过反射的方式调用Cleaner.clean,对堆外内存的释放的核心了。
到这里,我们已经对业务消息存储文件清除逻辑分析完成。
最后在总结一下上述流程逻辑,如果业务消息存储文件的总大小占的磁盘分区空间使用百分比大于85%,则忽视时间条件,从最旧的文件开始,依次删除该类型文件,直到存储文件的总大小占的磁盘分区空间使用百分比大于85%为止。否则,就删除保留时间超过3天的文件。当然磁盘空间比较充裕的情况下,只会在指定时间段删除【有业务消息存储文件的总大小所占的磁盘分区空间使用百分比小于指定的配置比例(默认为75%)】,否则, 会每隔10秒扫描一次。
2、清除索引文件
分析完 清除业务消息持久化文件 的流程后,我们直接分析索引文件的清除流程。
进入CleanConsumeQueueService.run()
:
public void run() {
try {
this.deleteExpiredFiles();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
这里先说一下,业务消息存储文件与索引文件是同步删除的,并且索引文件删除是紧跟在业务消息存储文件删除之后,这样一来,就可以确保索引文件所删除的范围不会超过业务消息存储文件删除范围。
继续进入
this.deleteExpiredFiles()
:
private void deleteExpiredFiles() {
//消费队列物理文件删除间隔 默认为100
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
//获取 最小 offset = commitLog.mappedFileQueue.getFirstMappedFile().getFileFromOffset()
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
if (minOffset > this.lastPhysicalMinOffset) {
this.lastPhysicalMinOffset = minOffset;
ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
int deleteCount = logic.deleteExpiredFile(minOffset);
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
Thread.sleep(deleteLogicsFilesInterval);
} catch (InterruptedException ignored) {
}
}
}
}
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
这里,我们先从总体上说明方法流程:依据long minOffset = DefaultMessageStore.this.commitLog.getMinOffset()
获取业务消息目前最小的物理存储位移,然后在遍历所有的ConsumeQueue,把所有小于该minOffset
的逻辑位移存储内容中所对应的业务消息的物理位移,进行删除或者内存校正。怎么理解这句话呢,我们接着进入 logic.deleteExpiredFile(minOffset)
分析:
public int deleteExpiredFile(long offset) {
//step1
int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
//step2
this.correctMinOffset(offset);
return cnt;
}
先看step1、
mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE)
:
public int deleteExpiredFileByOffset(long offset, int unitSize) {
Object[] mfs = this.copyMappedFiles(0);
List<MappedFile> files = new ArrayList<MappedFile>();
int deleteCount = 0;
if (null != mfs) {
int mfsLength = mfs.length - 1;
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
MappedFile mappedFile = (MappedFile) mfs[i];
//获取映射文件最后一个位置的索引
//如果result == null,表明该映射文件还没有填充完,即不存在下一个位置索引文件
//因此无需删除当前的位置索引文件。
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
if (result != null) {
//获取该位置索引所对应的 业务消息 开始物理位移
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
//调用mappedFile.selectMappedBuffer方法时,持有计数器加1,
//因此,查询完后,要释放引用,持有计数器减1.
result.release();
//如果该位置索引文件的最大 业务消息物理位移 都比指定的offset小
//则说明该位置索引文件可以删除
destroy = maxOffsetInLogicQueue < offset;
if (destroy) {
log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
+ maxOffsetInLogicQueue + ", delete it");
}
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
log.warn("Found a hanged consume queue file, attempting to delete it.");
destroy = true;
} else {
log.warn("this being not executed forever.");
break;
}
// TODO: Externalize this hardcoded value
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
} else {
break;
}
}
}
deleteExpiredFile(files);
return deleteCount;
}
整段代码的删除过程几乎和业务消息存储文件的删除一致,都是存储时间最早的文件开始遍历,找出所有可以删除的存储文件,先销毁映射文件,把文件从磁盘移除,最后在从内存上移除。唯独不同点在于复合删除条件的判断。我们就分析一下什么情况下,逻辑位移存储文件需要被删除。
我们先分析关键代码SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize)
,之前在分析构建索引的文章已近分析过selectMappedBuffer(long pos)
的含义,在根据this.mappedFileSize - unitSize
,可以得出result 的结果就是该逻辑索引文件最后一条逻辑索引的字节内容。先明确一点,业务消息的物理存储物理位移一定是按照插入顺序单调递增的,因此,逻辑位移的存储物理位移也一定是单调递增。如果result 为空,从业务上则表明该存储文件还没满,因此需要进一步判断该存储文件对应的映射文件是否有效(对应代码else if (!mappedFile.isAvailable())
),如果如果无效则删除,否则,结束该次清除流程。
我们在来分析result 不为空的情况下,先来回顾一下一条【逻辑位移索引】的存储格式,大小20字节,8字节的业务消息存储物理位移、4字节业务消息总长度 以及 8字节 的producer端指定消息的tags属性的hashcode。因此long maxOffsetInLogicQueue = result.getByteBuffer().getLong()
获取的是该【逻辑位移索引】所对应的 业务消息 开始物理位移。通过destroy = maxOffsetInLogicQueue < offset
,即当前【逻辑位移索引】存储文件的最大业务消息物理位移 与 目前业务消息存储文件最小物理位移的的比较,如果后者大于前者,则表明整个存储文件都需要删除;否则,则保留。换言之,经过上述逻辑遍历以后,留下来的第一个【逻辑位移索引】存储文件,一定有一部分的【逻辑位移索引】所关联的【业务消息的存储物理位移】一定大于业务消息最小的存储物理位移minOffset
,但也可能导致部分小于minOffset的【逻辑位移索引】存在于索引文件中。
到这里,我们接着分析step2.this.correctMinOffset(offset)
,看它是如何校正这部分小于minOffset的【逻辑位移索引】:
public void correctMinOffset(long phyMinOffset) {
//step1
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
if (result != null) {
try {
//step2
for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = result.getByteBuffer().getLong();
result.getByteBuffer().getInt();
result.getByteBuffer().getLong();
//step3
if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;
log.info("compute logics min offset: " + this.getMinOffsetInQueue() + ", topic: "
+ this.topic + ", queueId: " + this.queueId);
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
result.release();
}
}
}
}
经过一次【逻辑位移索引】存储文件删除以后,step1this.mappedFileQueue.getFirstMappedFile()
,所获取的第一个存储文件,一定有一部分的【逻辑位移索引】所关联的【业务消息的存储物理位移】一定大于业务消息最小的存储物理位移minOffset
,但也可能导致部分小于minOffset的【逻辑位移索引】存在于文件中。因此step2中,我们从头开始遍历该存储文件中所有的【逻辑位移索引】,直到找到大于或等于minOffset
的【逻辑位移索引】,最后在以该【逻辑位移索引】的物理位移更新至ConsumerQueue实例中的minLogicOffset
属性即可。这样一来,消费者端就不能消费小于minLogicOffset
的业务消息了。从而达到校正的效果。
到这里我们已近分析完了【逻辑位移索引】的清除流程。
最后在分析一下【key查询索引】索引文件的删除,我们回过来看一下删除索引文件的入口this.deleteExpiredFiles()
,该方法最后一步DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset)
就是【key查询索引】索引文件的删除清除流程:该流程主要是从最早的索引文件开始遍历,在根据索引文件头的endPhyOffset
(即该索引文件所构建的最大的业务消息物理位移)与minOffset
(业务消息最小的存储物理位移) 相比较,如果后者大,则该索引文件需要删除。由于比较简单,读者感兴趣的,可自行解读。
索引文件清除流程分析完成。
以上就是rmq清除持久化文件策略的底层细节!