创建 MappedFile 文件
创建 MappedFile 文件实现如下:
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
// 从 requestQueue 阻塞队列中获取 AllocateRequest 任务。
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
// 判断是否开启 isTransientStorePoolEnable ,如果开启则使用直接内存进行写入数据,最后从直接内存中 commit 到 FileChannel 中。
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 使用 mmap 方式创建 MappedFile
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
// 判断 mappedFile 大小,只有 CommitLog 才进行文件预热
// 预写入数据。按照系统的 pagesize 进行每个pagesize 写入一个字节数据。
//为了把mmap 方式映射的文件都加载到内存中。
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMapedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}
从代码中可以看出,只有 MappedFile 的大小等于或大于 CommitLog 的大小并且开启文件预热功能才会预加载文件。
CommitLog 文件的大小默认为 1 G。
文件预热
文件预热的时候需要了解的知识点 操作系统的 Page Cache 和 内存映射技术 mmap 。
Page Cache
Page Cache 叫做页缓存,而每一页的大小通常是4K,在Linux系统中写入数据的时候并不会直接写到硬盘上,而是会先写到Page Cache中,并打上dirty标识,由内核线程flusher定期将被打上dirty的页发送给IO调度层,最后由IO调度决定何时落地到磁盘中,而Linux一般会把还没有使用的内存全拿来给Page Cache使用。而读的过程也是类似,会先到Page Cache中寻找是否有数据,有的话直接返回,如果没有才会到磁盘中去读取并写入Page Cache然后再次读取Page Cache并返回。而且读的这个过程中操作系统也会有一个预读的操作,你的每一次读取操作系统都会帮你预读出后面一部分数据,而且当你一直在使用预读数据的时候,系统会帮你预读出更多的数据(最大到128K)。
mmap
mmap是一种将文件映射到虚拟内存的技术,可以将文件在磁盘位置的地址和在虚拟内存中的虚拟地址通过映射对应起来,之后就可以在内存这块区域进行读写数据,而不必调用系统级别的read,wirte这些函数,从而提升IO操作性能,另外一点就是mmap后的虚拟内存大小必须是内存页大小(通常是4K)的倍数,之所以这么做是为了匹配内存操作。
预热代码
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// 如果是同步写盘操作,则进行强行刷盘操作
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// prevent gc (有什么用?)
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// 把剩余的数据强制刷新到磁盘中
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
this.mlock();
}
这里 MappedFile 已经创建,对应的 Buffer 为 mappedByteBuffer。
mappedByteBuffer 已经通过 mmap 映射,此时操作系统中只是记录了该文件和该 Buffer 的映射关系,而没有映射到物理内存中。这里就通过对该 MappedFile 的每个 Page Cache 进行写入一个字节,通过读写操作把 mmap 映射全部加载到物理内存中。
锁定内存 mlock()
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
{
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
}
该方法主要是实现文件预热后,防止把预热过的文件被操作系统调到swap空间中。当程序在次读取交换出去的数据的时候会产生缺页异常。
LibC.INSTANCE.mlock 和 LibC.INSTANCE.madvise 都是调用的 Native 方法。
- LibC.INSTANCE.mlock 方法
实现是将锁住指定的内存区域避免被操作系统调到swap空间中。 - LibC.INSTANCE.madvise 方法
实现是一次性先将一段数据读入到映射内存区域,这样就减少了缺页异常的产生。