RocketMQ消息存储

从存储方式和效率来看,文件系统高于KV存储,KV存储又高于关系型数据库,直接操作文件系统肯定是最快的,但可靠性是最低的,而关系型数据库的性能和可靠性与文件系统恰恰相反。本章主要分析RocketMQ的消息存储机制:

  • RocketMQ存储概要设计。
  • 消息发送存储流程。
  • 存储文件组织与内存映射机制。
  • RocketMQ存储文件。
  • 消息消费队列、索引文件构建机制。
  • RocketMQ文件恢复机制。
  • RocketMQ刷盘机制。
  • RocketMQ文件删除机制。
  • 同步双写机制。

1 存储概要设计

RocketMQ是一款高性能的消息中间件,存储部分的设计是重点,存储的核心是I/O访问性能,本章也会重点剖析RocketMQ是如何提高I/O访问性能的。我们先看一下RocketMQ数据流向,如图所示:


image.png
  1. CommitLog:消息存储,所有消息主题的消息都存储在CommitLog文件中。
  2. ConsumeQueue:消息消费队列,消息到达CommitLog文件后,将异步转发到ConsumeQuene文件中,供消息消费者消费。
  3. Index:消息索引,主要存储消息key与offset的对应关系。

1.1 存储文件的组织方式

RocketMQ在消息写入过程中追求极致的磁盘顺序写,所有主题的消息全部写入一个文件,即CommitLog文件。所有消息按抵达顺序依次追加到CommitLog文件中,消息一旦写入,不支持修改。

1.1.1 CommitLog文件

正如关系型数据库会为每条数据引入一个ID字段,基于文件编程也会为每条消息引入一个身份标志:消息物理偏移量,即消息存储在文件的起始位置。

正是有了物理偏移量的概念,CommitLog文件的命名方式也是极具技巧性,使用存储在该文件的第一条消息在整个CommitLog文件组中的偏移量来命名,例如第一个CommitLog文件为0000000000000000000,第二个CommitLog文件为00000000001073741824,依次类推。这样做的好处是给出任意一个消息的物理偏移量,可以通过二分法进行查找,快速定位这个文件的位置,然后用消息物理偏移量减去所在文件的名称,得到的差值就是在该文件中的绝对地址。

1.1.2 ConsumeQueue文件

CommitlLog文件的设计理念是追求极致的消息写,但我们知道消息消费模型是基于主题订阅机制的,即一个消费组是消费特定主题的消息。根据主题从CommitlLog文件中检索消息,这绝不是一个好主意,这样只能从文件的第一条消息逐条检索,其性能可想而知,为了解决基于topic的消息检索问题,RocketMQ引入了ConsumeQueue文件,ConsumeQueue文件的结构如图所示:

image.png

ConsumeQueue文件是消息消费队列文件,是CommitLog文件基于topic的索引文件,主要用于消费者根据topic消费消息,其组织方式为/topic/queue,同一个队列中存在多个消息文件。

ConsumeQueue的设计极具技巧,每个条目长度固定(8字节CommitLog物理偏移量、4字节消息长度、8字节tag哈希码)。这里不是存储tag的原始字符串,而是存储哈希码,目的是确保每个条目的长度固定,可以使用访问类似数组下标的方式快速定位条目,极大地提高了ConsumeQueue文件的读取性能。

消息消费者根据topic、消息消费进度(ConsumeQueue逻辑偏移量),即第几个ConsumeQueue条目,这样的消费进度去访问消息,通过逻辑偏移量logicOffset×20,即可找到该条目的起始偏移量(ConsumeQueue文件中的偏移量),然后读取该偏移量后20个字节即可得到一个条目,无须遍历ConsumeQueue文件。

1.1.3 Index文件

RocketMQ与Kafka相比具有一个强大的优势,就是支持按消息属性检索消息,引入ConsumeQueue文件解决了基于topic查找消息的问题,但如果想基于消息的某一个属性进行查找,ConsumeQueue文件就无能为力了。故RocketMQ又引入了Index索引文件,实现基于文件的哈希索引。Index文件的存储结构如图所示:

image.png

Index文件基于物理磁盘文件实现哈希索引。Index文件由40字节的文件头、500万个哈希槽、2000万个Index条目组成,每个哈希槽4字节、每个Index条目含有20个字节,分别为4字节索引key的哈希码、8字节消息物理偏移量、4字节时间戳、4字节的前一个Index条目(哈希冲突的链表结构)。

1.2 内存映射

虽然基于磁盘的顺序写消息可以极大提高I/O的写效率,但如果基于文件的存储采用常规的Java文件操作API,例如FileOutputStream等,将性能提升会很有限,故RocketMQ又引入了内存映射,将磁盘文件映射到内存中,以操作内存的方式操作磁盘,将性能又提升了一个档次。

在Java中可通过FileChannel的map方法创建内存映射文件。在Linux服务器中由该方法创建的文件使用的就是操作系统的页缓存(pagecache)。Linux操作系统中内存使用策略时会尽可能地利用机器的物理内存,并常驻内存中,即页缓存。在操作系统的内存不够的情况下,采用缓存置换算法,例如LRU将不常用的页缓存回收,即操作系统会自动管理这部分内存。

如果RocketMQ Broker进程异常退出,存储在页缓存中的数据并不会丢失,操作系统会定时将页缓存中的数据持久化到磁盘,实现数据安全可靠。不过如果是机器断电等异常情况,存储在页缓存中的数据也有可能丢失。

1.3 灵活多变的刷盘策略

有了顺序写和内存映射的加持,RocketMQ的写入性能得到了极大的保证,但凡事都有利弊,引入了内存映射和页缓存机制,消息会先写入页缓存,此时消息并没有真正持久化到磁盘。那么Broker收到客户端的消息后,是存储到页缓存中就直接返回成功,还是要持久化到磁盘中才返回成功呢?

这是一个“艰难”的选择,是在性能与消息可靠性方面进行权衡。为此,RocketMQ提供了两种策略:同步刷盘、异步刷盘。

1.3.1 同步刷盘

同步刷盘在RocketMQ的实现中称作组提交,其设计理念如图所示:


image.png

1.3.2 异步刷盘

同步刷盘的优点是能保证消息不丢失,即向客户端返回成功就代表这条消息已被持久化到磁盘,但这是以牺牲写入性能为代价的,不过因为RocketMQ的消息是先写入pagecache,所以消息丢失的可能性较小,如果能容忍一定概率的消息丢失或者在丢失后能够低成本的快速重推,可以考虑使用异步刷盘策略。

异步刷盘指的是broker将消息存储到pagecache后就立即返回成功,然后开启一个异步线程定时执行FileChannel的force方法,将内存中的数据定时写入磁盘,默认间隔时间为500ms。

1.4 transientStorePoolEnable机制

RocketMQ为了降低pagecache的使用压力,引入了transientStorePoolEnable机制,即内存级别的读写分离机制。

默认情况下,RocketMQ将消息写入pagecache,消息消费时从pagecache中读取,这样在高并发时pagecache的压力会比较大,容易出现瞬时broker busy的异常。RocketMQ通过transientStorePoolEnable机制,将消息先写入堆外内存并立即返回,然后异步将堆外内存中的数据提交到pagecache,再异步刷盘到磁盘中。因为堆外内存中的数据并未提交,所以认为是不可信的数据,消息在消费时不会从堆外内存中读取,而是从pagecache中读取,这样就形成了内存级别的读写分离,即写入消息时主要面对堆外内存,而读取消息时主要面对pagecache。

该机制使消息直接写入堆外内存,然后异步写入pagecache,相比每条消息追加直接写入pagechae,最大的优势是实现了批量化消息写入。

该机制的缺点是如果由于某些意外操作导致broker进程异常退出,已经放入pagecache的数据不会丢失,而存储在堆外内存的数据会丢失。

2 存储文件

2.1 CommitLog文件

CommitLog文件存储格式如图所示,每条消息的前面4个字节存储该条消息的总长度。


image.png

CommitLog文件的存储目录默认为${ROCKET_HOME}/store/commitlog,可以通过在broker配置文件中设置storePathRootDir属性改变默认路径。CommitLog文件默认大小为1GB,可通过在broker配置文件中设置mapedFileSizeCommitLog属性改变默认大小。本节将基于上述存储结构,重点分析消息的查找实现。

Commitlog#getMinOffset。获取当前CommitLog目录的最小偏移量,首先获取目录下的第一个文件,如果该文件可用,则返回该文件的起始偏移量,否则返回下一个文件的起始偏移量

public long getMinOffset() {
    MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
    if (mappedFile != null) {
        if (mappedFile.isAvailable()) {
            return mappedFile.getFileFromOffset();
        } else {
            return this.rollNextFile(mappedFile.getFileFromOffset());
        }
    }
        return -1;
}

CommitLog#rollNextFile

public long rollNextFile(final long offset) {
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig()
                .getMapedFileSizeCommitLog();
    return offset + mappedFileSize - offset % mappedFileSize;
}

CommitLog#getMessage。

public SelectMappedBufferResult getMessage(final long offset, final int size) { int
    mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize); 
        return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}

根据偏移量与消息长度查找消息。首先根据偏移找到文件所在的物理偏移量,然后用offset与文件长度取余,得到在文件内的偏移量,从该偏移量读取size长度的内容并返回。如果只根据消息偏移量查找消息,则首先找到文件内的偏移量,然后尝试读取4字节,获取消息的实际长度,最后读取指定字节。

2.2 ConsumeQueue文件

RocketMQ基于主题订阅模式实现消息消费,消费者关心的是一个主题下的所有消息,但同一主题的消息是不连续地存储在CommitLog文件中的。如果消息消费者直接从消息存储文件中遍历查找订阅主题下的消息,效率将极其低下。

RocketMQ为了适应消息消费的检索需求,设计了ConsumeQueue文件,该文件可以看作CommitLog关于消息消费的“索引”文件,ConsumeQueue的第一级目录为消息主题,第二级目录为主题的消息队列


image.png

为了加速ConsumeQueue消息条目的检索速度并节省磁盘空间,每一个ConsumeQueue条目不会存储消息的全量信息,存储格式如图所示。


image.png

单个ConsumeQueue文件中默认包含30万个条目,单个文件的长度为3×106×20字节,单个ConsumeQueue文件可以看作一个ConsumeQueue条目的数组,其下标为ConsumeQueue的逻辑偏移量,消息消费进度存储的偏移量即逻辑偏移量。

ConsumeQueue即为CommitLog文件的索引文件,其构建机制是当消息到达CommitLog文件后,由专门的线程产生消息转发任务,从而构建ConsumeQueue文件与下文提到的Index文件,如代码所示:ConsumeQueue#getIndexBuffer

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
     if (offset >= this.getMinLogicOffset()) {
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int)(offset % mappedFileSize));
            return result;
        }
    }
    return null;
}

根据startIndex获取消息消费队列条目。通过startIndex×20得到在ConsumeQueue文件的物理偏移量,如果该偏移量小于minLogicOffset,则返回null,说明该消息已被删除,如果大于minLogicOffset,则根据偏移量定位到具体的物理文件。通过将该偏移量与物理文件的大小取模获取在该文件的偏移量,从偏移量开始连续读取20个字节即可。

2.3 Index文件

RocketMQ引入哈希索引机制为消息建立索引,HashMap的设计包含两个基本点:哈希槽与哈希冲突的链表结构。RocketMQ索引文件Index存储格式如图所示。


image.png

Index包含Index文件头、哈希槽、Index条目(数据)。Index文件头包含40字节,记录该Index的统计信息。

一个Index默认包含500万个哈希槽。哈希槽存储的是落在该哈希槽的哈希码最新的Index索引。默认一个Index文件包含2000万个条目,每个Index条目结构如下。

  1. hashcode: key的哈希码。
  2. phyoffset:消息对应的物理偏移量。
  3. timedif:该消息存储时间与第一条消息的时间戳的差值,若小于0,则该消息无效。
  4. pre index no:该条目的前一条记录的Index索引,当出现哈希冲突时,构建链表结构。

RocketMQ将消息索引键与消息偏移量的映射关系写入Index的实现方法为publicboolean putKey(final String key, final long phyOffset, final longstoreTimestamp),参数含义分别为消息索引、消息物理偏移量、消息存储时间。
IndexFile#putKey

if (this.indexHeader.getIndexCount() < this.indexNum) {
    int keyHash = indexKeyHashMethod(key);
    int slotPos = keyHash % this.hashSlotNum;
    int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
}

值得注意的是,Index文件条目中存储的不是消息索引key,而是消息属性key的哈希,在根据key查找时需要根据消息物理偏移量找到消息,进而验证消息key的值。之所以只存储哈希,而不存储具体的key,是为了将Index条目设计为定长结构,才能方便地检索与定位条目。

2.4 checkpoint文件

checkpoint(检查点)文件的作用是记录ComitLog、ConsumeQueue、Index文件的刷盘时间点,文件固定长度为4KB,其中只用该文件的前面24字节,其存储格式如图所示。


image.png
  1. physicMsgTimestamp: CommitLog文件刷盘时间点。
  2. logicsMsgTimestamp: ConsumeQueue文件刷盘时间点。
  3. indexMsgTimestamp: Index文件刷盘时间点

3 实时更新ConsumeQueue与Index文件

因为ConsumeQueue文件、Index文件都是基于CommitLog文件构建的,所以当消息生产者提交的消息存储到CommitLog文件中时,ConsumeQueue文件、Index文件需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。

RocketMQ通过开启一个线程ReputMessageServcie来准实时转发CommitLog文件的更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue文件、Index文件,如代码所示。
DefaultMessageStore#start

if (this.getMessageStoreConfig().isDuplicationEnable()) {
    this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
    this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();

Broker服务器在启动时会启动ReputMessageService线程,并初始化一个非常关键的参数reputFromOffset,该参数的含义是ReputMessageService从哪个物理偏移量开始转发消息给ConsumeQueue和Index文件。如果允许重复转发,将reputFromOffset设置为CommitLog文件的提交指针。如果不允许重复转发,将reputFromOffset设置为CommitLog文件的内存中最大偏移量。

DefaultMessageStore#run

public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try { 
            Thread.sleep(1);
            this.doReput();
        } catch (Exception e) {
             DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

ReputMessageService线程每执行一次任务推送,休息1ms后继续尝试推送消息到Consume Queue和Index文件中,消息消费转发由doReput()方法实现。
DefaultMessageStore#doReput

SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();
if (dispatchRequest.isSuccess()) {
    if (size > 0) {
        DefaultMessageStore.this.doDispatch(dispatchRequest);
    }
}

3.1 根据消息更新ConsumeQueue文件

消息消费队列转发任务实现类为CommitLogDispatcherBuildConsumeQueue,内部最终将调用putMessagePositionInfo()方法
DefaultMessageStore#putMessagePositionInfo

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(),
        dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

第一步:根据消息主题与队列ID,先获取对应的ConsumeQueue文件,其逻辑比较简单,因为每一个消息主题对应一个ConsumeQueue目录,主题下每一个消息队列对应一个文件夹,所以取出该文件夹最后的ConsumeQueue文件即可。

第二步:依次将消息偏移量、消息长度、tag哈希码写入ByteBuffer,并根据consume-QueueOffset计算ConsumeQueue中的物理地址,将内容追加到ConsumeQueue的内存映射文件中(本操作只追加,不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘。

3.2 根据消息更新Index文件

哈希索引文件转发任务实现类为CommitLogDispatcherBuildIndex,如代码所示:
CommitLogDispatcherBuildIndex#dispatch

public void dispatch(DispatchRequest request) {
    if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
        DefaultMessageStore.this.indexService.buildIndex(request);
    }
}

第一步:获取或创建Index文件并获取所有文件最大的物理偏移量。如果该消息的物理偏移量小于Index文件中的物理偏移量,则说明是重复数据,忽略本次索引构建。

第二步:如果消息的唯一键不为空,则添加到哈希索引中,以便加速根据唯一键检索消息。

第三步:构建索引键,RocketMQ支持为同一个消息建立多个索引,多个索引键用空格分开。

4 ConsumeQueue与Index文件恢复

如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,就会导致CommitLog文件、ConsumeQueue文件、Index文件中的数据不一致。如果不加以人工修复,会有一部分消息即便在CommitLog文件中存在,由于并没有转发到ConsumeQueue文件,也永远不会被消费者消费。

那RocketMQ是如何使文件达到最终一致性的呢?下面详细分析RocketMQ关于存储文件的加载流程来一窥端倪。
DefaultMessageStore#load

boolean lastExitOK = !this.isTempFileExist(); 
private boolean isTempFileExist() {
    String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
    File file = new File(fileName);
    return file.exists();
}

第一步:判断上一次退出是否正常。其实现机制是Broker在启动时创建${ROCKET_HOME}/store/abort文件,在退出时通过注册JVM钩子函数删除abort文件。如果下一次启动时存在abort文件。说明Broker是异常退出的,CommitLog与ConsumeQueue数据有可能不一致,需要进行修复。
DefaultMessageStore#load

if (null != scheduleMessageService) {
    result = result&&this.scheduleMessageService.load();
}

第二步:加载延迟队列。MappedFileQueue#load

Arrays.sort(files);
for (File file : files) {
    if (file.length() != this.mappedFileSize) {
       return true;
    }
    try {
       MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
       mappedFile.setWrotePosition(this.mappedFileSize);
       mappedFile.setFlushedPosition(this.mappedFileSize);
       mappedFile.setCommittedPosition(this.mappedFileSize);
       this.mappedFiles.add(mappedFile);
       log.info("load " + file.getPath() + " OK");
    } catch (IOException e) {
       log.error("load file " + file + " error", e); return false;
    }
}

第三步:加载CommitLog文件,加载${ROCKET_HOME}/store/commitlog目录下所有文件并按照文件名进行排序。如果文件与配置文件的单个文件大小不一致,将忽略该目录下的所有文件,然后创建MappedFile对象。注意load()方法将wrotePosition、flushedPosition、committedPosition三个指针都设置为文件大小。

第四步:加载消息消费队列,调用DefaultMessageStore#loadConsumeQueue,其思路与CommitLog大体一致,遍历消息消费队列根目录,获取该Broker存储的所有主题,然后遍历每个主题目录,获取该主题下的所有消息消费队列,最后分别加载每个消息消费队列下的文件,构建ConsumeQueue对象,主要初始化ConsumeQueue的topic、queueId、storePath、mappedFileSize属性。

第五步:加载并存储checkpoint文件,主要用于记录CommitLog文件、ConsumeQueue文件、Inde文件的刷盘点。

第六步:加载Index文件,如果上次异常退出,而且Index文件刷盘时间小于该文件最大的消息时间戳,则该文件将立即销毁

第七步:根据Broker是否为正常停止,执行不同的恢复策略,下文将分别介绍异常停止、正常停止的文件恢复机制,如代码清单所示。
DefaultMessageStore#recoverTopicQueueTable

private void recoverTopicQueueTable() {
    HashMap<String/* topic-queueid */, Long/* offset */> table = new
            HashMap<String, Long>(1024);
    long minPhyOffset = this.commitLog.getMinOffset();
    for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
        for (ConsumeQueue logic : maps.values()) {
            String key = logic.getTopic() + "-" + logic.getQueueId();
            table.put(key, logic.getMaxOffsetInQueue());
            logic.correctMinOffset(minPhyOffset);
        }
    }
    this.commitLog.setTopicQueueTable(table);
}

第八步:恢复ConsumeQueue文件后,将在CommitLog实例中保存每个消息消费队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列ID还存储了消息队列偏移量的关键所在。

4.1 Broker正常停止文件恢复

Broker正常停止文件恢复的实现为CommitLog#recoverNormally,CommitLog#recoverNormally

boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
    int index = mappedFiles.size() - 3;
if (index < 0)
        index = 0;
// 省略部分代码
}

第一步:Broker正常停止再重启时,从倒数第3个文件开始恢复,如果不足3个文件,则从第一个文件开始恢复。

第二步:遍历CommitLog文件,每次取出一条消息。

第三步:更新MappedFileQueue的flushedWhere和committedPosition指针。

第四步:删除offset之后的所有文件。遍历目录下的文件,如果文件的尾部偏移量小于offset则跳过该文件,如果尾部的偏移量大于offset,则进一步比较offset与文件的开始偏移量。如果offset大于文件的起始偏移量,说明当前文件包含了有效偏移量,设置MappedFile的flushedPosition和committedPosition。如果offset小于文件的起始偏移量,说明该文件是有效文件后面创建的,则调用MappedFile#destory方法释放MappedFile占用的内存资源(内存映射与内存通道等),然后加入待删除文件列表中,最终调用deleteExpiredFile将文件从物理磁盘上删除。

4.2 Broker异常停止文件恢复

Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally。异常文件恢复与正常停止文件恢复的步骤基本相同,主要差别有两个:

  • 首先,Broker正常停止默认从倒数第三个文件开始恢复,而异常停止则需要从最后一个文件倒序推进,找到第一个消息存储正常的文件;
  • 其次,如果CommitLog目录没有消息文件,在ConsuneQueue目录下存在的文件则需要销毁。

如何判断一个消息文件是否正确呢?请看代码CommitLog#isMappedFileMatchedRecover

int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);
if (magicCode != MESSAGE_MAGIC_CODE) {
    return false;
}

第一步:判断文件的魔数,如果不是MESSAGE_MAGIC_CODE,则返回false,表示该文件不符合CommitLog文件的存储格式。

第二步:如果文件中第一条消息的存储时间等于0,则返回false,说明该消息的存储文件中未存储任何消息。

第三步:对比文件第一条消息的时间戳与检测点。如果文件第一条消息的时间戳小于文件检测点,说明该文件的部分消息是可靠的,则从该文件开始恢复。checkpoint文件中保存了CommitLog、ConsumeQueue、Index的文件刷盘点,RocketMQ默认选择CommitLog文件与ConsumeQueue这两个文件的刷盘点中较小值与CommitLog文件第一条消息的时间戳做对比,如果messageIndexEnable为true,表示Index文件的刷盘时间点也参与计算。

第四步:如果根据前3步算法找到MappedFile,则遍历MappedFile中的消息,验证消息的合法性,并将消息重新转发到ConsumeQueue与Index文件。

第五步:如果未找到有效的MappedFile,则设置CommitLog目录的flushedWhere、committedWhere指针都为0,并销毁ConsumeQueue文件。

如果Broker异常停止,在文件恢复过程中,RocketMQ会将最后一个有效文件中的所有消息重新转发到ConsumeQueue和Index文件中,确保不丢失消息,但同时会带来消息重复的问题。纵观RocktMQ的整体设计思想,RocketMQ保证消息不丢失但不保证消息不会重复消费,故消息消费业务方需要实现消息消费的幂等设计。

5 文件刷盘机制

RocketMQ的存储与读写是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储时首先将消息追加到内存中,再根据配置的刷盘策略在不同时间刷盘。

  • 如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法;
  • 如果是异步刷盘,在消息追加到内存后会立刻返回给消息发送端。RocketMQ使用一个单独的线程按照某一个设定的频率执行刷盘操作。通过在broker配置文件中配置flushDiskType来设定刷盘方式,可选值为ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘),默认为异步刷盘。

值得注意的是,Index文件的刷盘并不是采取定时刷盘机制,而是每更新一次Index文件就会将上一次的改动写入磁盘。

5.1 Broker同步刷盘

同步刷盘指的是在消息追加到内存映射文件的内存中后,立即将数据从内存写入磁盘文件,由CommitLog的handleDiskFlush方法实现,如代码所示。
CommitLog#handleDiskFlush

final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if(!flushOK){
    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}

这里先简单记录下,后续有用到了,再回来补吧。

5.2 Broker异步刷盘

异步刷盘操作如代码所示,CommitLog#handleDiskFlush

// 异步刷盘
if (!this.defaultMessageStore.getMessageStoreConfig().
        isTransientStorePoolEnable()) {
    flushCommitLogService.wakeup();
} else {
    commitLogService.wakeup();
}

开启transientStorePoolEnable机制则启动异步刷盘方式,刷盘实现较同步刷盘有细微差别。

  • 如果transientStorePoolEnable为true, RocketMQ会单独申请一个与目标物理文件(CommitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到与物理文件的内存映射中,再经flush操作到磁盘。
  • 如果transientStorePoolEnable为false,消息将追加到与物理文件直接映射的内存中,然后写入磁盘。

transientStorePoolEnable为true的刷盘流程如图所示:


image.png
  1. 将消息直接追加到ByteBuffer(堆外内存DirectByteBuffer),wrotePosition随着消息的不断追加向后移动。
  2. CommitRealTimeService线程默认每200ms将ByteBuffer新追加(wrotePosition减去commitedPosition)的数据提交到FileChannel中。
  3. FileChannel在通道中追加提交的内容,其wrotePosition指针向前后移动,然后返回。
  4. commit操作成功返回,将commitedPosition向前后移动本次提交的内容长度,此时wrotePosition指针依然可以向前推进。
  5. FlushRealTimeService线程默认每500ms将FileChannel中新追加的内存(wrotePosition减去上一次写入位置flushedPositiont),通过调用FileChannel#force()方法将数据写入磁盘。

6 同步双写

RocketMQ为了优化同步复制的性能,在RocketMQ 4.7.0中正式对原先的同步复制做了重大改造,大大提高了同步复制的性能。我们不妨先来简单回顾一下之前关于同步复制的基本流程,如图所示。


image.png

那么RocketMQ 4.7.0中又是如何进行优化的呢?因为同步复制的语义就是将消息同步到从节点,所以这个复制过程没有什么可优化的,那么,是不是可以减少SendMessageThread线程的等待时间,即在同步复制的过程中,SendMessageThread线程可以继续处理其他消息,只是收到从节点的同步结果后再向客户端返回结果。提高Broker的消息处理能力,重复利用Broker的资源,就是将上述putMessage同步方式修改为异步方式。

image.png

CommitLog向HaService提交数据同步请求后并没有被阻塞,而是返回了一个CompletableFuture对象,SendMessageProcessor在收到返回结果后,将继续处理新的消息,等到消息被成功同步到从节点后,会调用CompletableFuture的complete方法,触发网络通信,将结果返回到客户端。其核心代码在SendMessageProcessor#asyncSendMessage中,这里不再赘述了。

7 小结

RocketMQ主要存储文件包含CommitLog文件、ConsumeQueue文件、IndexFile文件、Checkpoint文件、abort文件。

  • RocketMQ组织文件以文件的起始偏移量来命名文件,这样根据偏移量能快速定位到真实的物理文件。
  • RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题的消息,保证消息存储是完全的顺序写。
  • 但这样给文件读取同样带来了不便,为此RocketMQ构建了ConsumeQueue文件,基于主题与队列进行组织
  • 同时RocketMQ为消息实现了哈希索引,可以为消息设置索引键,根据索引能够快速从CommitLog文件中检索消息。
  • RocketMQ基于内存映射文件机制提供了同步刷盘与异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的数据写入磁盘。
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容