[ RocketMQ源码阅读7 ] CommitLog文件刷盘方式

之前我们进行RocketMQ的搭建,其中有一个参数是用来配置刷盘方式的。存在“同步”和“异步”两种方式。

flushDiskType=ASYNC_FLUSH|SYNC_FLUSH

和刷新磁盘逻辑相关的代码可以从这里开始看DefaultFlushManager

    class DefaultFlushManager implements FlushManager {
        private final FlushCommitLogService flushCommitLogService;
        //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
        private final FlushCommitLogService commitRealTimeService;

        public DefaultFlushManager() {
            if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                this.flushCommitLogService = new CommitLog.GroupCommitService();//同步
            } else {
                this.flushCommitLogService = new CommitLog.FlushRealTimeService();//异步
            }
            this.commitRealTimeService = new CommitLog.CommitRealTimeService();
        }

        @Override public void start() {
            this.flushCommitLogService.start();
            if (defaultMessageStore.isTransientStorePoolEnable()) {
                this.commitRealTimeService.start();
            }
        }

从构造函数可以看到,要理解刷盘的行为,需要搞懂GroupCommitService同步刷盘,FlushRealTimeService 异步刷盘和CommitRealTimeService这三个类的名称取的不是很便于记忆和理解。

为了理解刷盘操作,我们去看更上一层的逻辑。在Broker接收到客户端的消息写入请求,并完成一系列的解析、校验放入内存等工作后。后续就需要将消息持久化到磁盘。

        public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
            // Synchronization flush
            if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {//同步刷盘
                final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                if (messageExt.isWaitStoreMsgOK()) {//消息中是否存在需要等待消息落盘的属性
                    GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                    flushDiskWatcher.add(request);
                    service.putRequest(request);//提交刷盘请求,将request存入队列后,会立马调用一次wakeup()
                    return request.future(); //返回刷盘请求,异步等待句柄
                } else {
                    service.wakeup();//唤醒刷盘线程
                    return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
                }
            }
            // Asynchronous flush
            else {
                if (!CommitLog.this.defaultMessageStore.isTransientStorePoolEnable()) {//按照配置不同,分别唤醒不同的刷盘线程
                    flushCommitLogService.wakeup();
                } else {
                    commitRealTimeService.wakeup();
                }
                return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
            }
        }

GroupCommitService和FlushRealTimeService的主要区别在于调用flush传入的刷新数据页数(RocketMQ内部逻辑概念,和计算机系统的页无关)。GroupCommitService每次都做刷新都传入0,FlushRealTimeService则按照规则进行计算出页数。我这边按照原逻辑改写的容易理解的伪代码:

int flushPhysicQueueLeastPages = 4;
 if (currentTimeMillis >= (lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
       flushPhysicQueueLeastPages = 0;
 } 
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);

这段代码避免了短时间内进行多次全量刷盘,从而提高了刷盘效率和性能呢。正因为异步刷盘不是每次都是全量刷盘,从这个角度来看才会被称为异步刷盘,其实本质上都是异步进行刷盘的。

transientStorePoolEnable

我们在上述刷盘代码中看到了此配置项。该配置项是为了提高IO性能,但是在Broker JVM进程异常退出的时候增加丢消息的风险。感兴趣的同学可以看这篇文章

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容