rocketMq高性能存储设计

  消息中间价存储一般都是利用磁盘,在廉价的PC机上一般是使用机械硬盘,但机械硬盘的速度比访问内存慢了n个数量级,但一款优秀的消息中间件必然会将硬件资源压榨到极致,接下来看看rocketMq是如何做到高效存储的。

1、rocketMq存储结构

rocketMq存储

这张流程图简单介绍了rocketMq的存储实现,先简单说明下各自的含义

  • MappedFile 所有的topic数据都写到同一个文件中,文件的大小默认为1G,使用mmap与磁盘文件做映射,初始化时使用mlock将内存锁定,防止pagecache被os交换到swap区域。数据是顺序写,数据写满后自动创建下个MappedFile顺序写入。
  • MappedFileQueue MappedFile的队列,存储封装了所有的MappedFile实例。
  • CommitLog 封装了写入消息和读取消息的实现,根据MappedFileQueue找到正在写的MappedFile,之后将消息写入到pagecache。
  • ConsumerQueue 一个topic可以设置多个queue,每个consumerQueue对应一个topic下的queue,相当于kafka里的partition概念。里面存储了msg在commitLog中的offset、size、tagsCode,固定长度是20字节,consumer可以根据消息的offset在commitLog找到具体的消息。

2、高性能存储实现

2.1、mmap&&page cache

  先简单介绍下mmap,mmap一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。
  rocketMq默认的文件大小为1G,即将1G的文件映射到物理内存上。但mmap初始化时只是将文件磁盘地址和进程虚拟地址做了个映射,并没有真正的将整个文件都映射到内存中,当程序真正访问这片内存时产生缺页异常,这时候才会将文件的内容拷贝到page cache。试想,如果一开始只是做个映射,而到具体写消息时才将文件的部分页加载到pagecache,那效率将会是多么的低下。MappedFile初始化的操作是由单独的线程(AllocateMappedFileService)实现的,就是对应的生产消费模型。还好rocketMq在初始化MappedFile时做了内存预热,事先向page cache 中写入一些数据flush到磁盘,使整个文件都加载到page cache中。接下来简单看下如何预热的

public void warmMappedFile(FlushDiskType type, int pages) {
        long beginTime = System.currentTimeMillis();
        // mappedByteBuffer在java里面对应了mmap的实现
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        int flush = 0;
        long time = System.currentTimeMillis();
        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
            byteBuffer.put(i, (byte) 0);
            // force flush when flush disk type is sync
            if (type == FlushDiskType.SYNC_FLUSH) {
                // 同步刷盘机制,OS_PAGE_SIZE为4K
                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                    flush = i;
                    mappedByteBuffer.force();
                }
            }

            // prevent gc
            if (j % 1000 == 0) {
                log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                }
            }
        }

        // force flush when prepare load finished
        if (type == FlushDiskType.SYNC_FLUSH) {
            log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
                this.getFileName(), System.currentTimeMillis() - beginTime);
            mappedByteBuffer.force();
        }
        log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
            System.currentTimeMillis() - beginTime);
        // 将page cache 这片内存锁定
        this.mlock();
    }

2.2、mlock 内存锁定

  os在内存充足的情况下,会将文件加载到 page cache 提高文件的读写效率,但是当内存不够用时,os会将page cache 回收掉。试想如果MappedFile对应的pagecache 被os回收,那就又产生缺页异常再次从磁盘加载到pagecache,会对系统性能产生很大的影响。rocketMq在创建完MappedFile并且内存预热完成后调用了c的mlock函数将这片内存锁定了,具体来看下是怎么实现的

// java 调用c
LibC INSTANCE = (LibC) Native.loadLibrary(Platform.isWindows() ? "msvcrt" : "c", LibC.class);
// 具体实现
public void mlock() {
        final long beginTime = System.currentTimeMillis();
        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
        Pointer pointer = new Pointer(address);
        {
            int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
            log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }

        {
            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
            log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }
    }

2.3、刷盘机制

  写消息时是先写入到pagecache,rocketMq提供了两种刷盘机制,同步刷盘和异步刷盘,同步刷盘适用于对消息可靠性比较高的场合,同步刷盘性能比较低下,这样即使系统宕机消息也不会丢失。如图所示,此图来自rocketMq社区


刷盘机制

  下面简单介绍下同步刷盘的原理,同步刷盘机制下,发送线程实例化一个GroupCommitRequest,成员变量中有CountDownLatch,然后push到单独的刷盘线程(GroupCommitService)中的阻塞队列中,刷盘线程从阻塞队列中获取,刷盘其实就是调用了mappedByteBuffer.force()方法,刷盘成功后通过countdownlatch唤醒刷盘等待的线程,原理很简单。

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // 同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
           // 对应一个单独的线程
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                 // GroupCommitRequest 封装了CountDownLatch,GroupCommitService刷盘完毕后唤醒等待线程
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // 异步刷盘
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }

  异步刷盘原理 发送消息线程写到pagecache成功之后就返回,消息保存在page cache 中,异步刷盘对应了一个单独线程,源码中刷盘默认一次刷4个pageSize,也就是16k的数据。异步刷盘有可能会丢失数据,当jvm程序死掉 但机器没有宕机,pagecache 中的脏页还是能人工刷到磁盘的,但是当机器宕机之后,数据就永远丢失了。

2.4、堆外内存池机制

堆外内存池

  如上图所示,rocketMq提供了堆外内存池机制即 TransientStorePool,TransientStorePool初始化时实例化5个堆外内存,大小和MappedFile的大小1G,然后mlock锁定此内存区域。发送消息时如果开启了堆外内存机制,MappedFile在实例化时从堆外内存池中获取一个directBuffer实例,写消息先写到堆外内存中,然后有单独的线程(CommitRealTimeService)刷到pagecache,之后再由单独的线程(FlushRealTimeService)从pagecahce刷到磁盘。
  开启堆外内存池的好处:写消息时先写到堆外内存,纯内存操作非常快。读消息时是从pagecache中读,相当于实现了读写分离

3、消息生产

  由最开始的总体图可知,所有发送消息的线程是串行执行的,所有topic的数据放一块顺序写到pagecache中,因此效率十分的高。在写 page cache 成功后,再由单独的线程异步构建consumerQueue和 indexFile(基于磁盘实现的hashMap,实现消息的查找),构建完成consumerQueue成功后 consumer 就能消费到最新的消息了,当然构建consumerQueue也是顺序写,每次只写入20个字节,占用的空间也不大。

4、消息消费

  每个topic可以对应多个consumerQueue,就相当于kafka里面的分区概念。rocketmq里面的消费者与consumerQueue的分配算法和kafka的相似。由于consumerQueue中只保存了消息在commitLog中的offset、msgSize、tagsCode,因此需要拿到offset去commitlog中把这条消息捞出来,注意,这时候读相当与随机读,由前面的mlock内存锁定再加上消费的数据一般是最近生产的,所有数据还在pagecache中,对性能的影响也不大。有一点,当consumer消费很远的数据时,pagecache中肯定是没有缓存的,这时候rocketMq建议consumer去slave上读,多好的设计啊。

5、总结

  rocketMq所有topic共用一个commitLog,磁盘顺序写,这一点实现也是参考了kafka,读消息时根据consumerQueue去commitLog中吧数据捞出来,虽然是随机读,但是最新的数据一般在pagecahce中也无关紧要。一款优秀的中间件要把硬件的性能发挥到极致和考虑到操作系统的相关特性,比如使用内存锁定避免内存swap交换,堆外内存和pagecache的读写分离。以上这些都是看了看rocketMq的存储源码总结出来的,如有错误欢迎指正~

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容