RocketMQ基础篇Broker存储消息

Broker是如何存储消息的

流程图

RocketMQ基础篇 Broker存储消息 流程图.png

代码解释

写入CommitLog

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery 延时等级 > 0,替换原有的消费主题为 SCHEDULE_TOPIC_XXXX,队列id 为 延时的等级-1
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setBornHostV6Flag();
        }

        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag();
        }

        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) {
            return CompletableFuture.completedFuture(encodeResult);
        }
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;

        // 申请 putMessageLock 锁 (将消息存储到CommitLog文件中是串行的)
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            // 获取当前可以写入的CommitLog文件
            // CommitLog 存储地址:${ROCKET_HOME}/store/commitlog 文件默认大小1g。一个文件写满后就再创建另一个文件
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            //  ${ROCKET_HOME}/store/commitlog 目录下没有任何文件
            if (null == mappedFile || mappedFile.isFull()) {
                // 以偏移量为0 创建commitLog文件
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            }

            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            // 处理完追加逻辑就会释放锁
            putMessageLock.unlock();
        }

        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());

        // 刷盘
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        // 此处会处理主从同步的结果(HA)
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
            if (flushStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(flushStatus);
            }
            if (replicaStatus != PutMessageStatus.PUT_OK) {
                putMessageResult.setPutMessageStatus(replicaStatus);
                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
                    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
                }
            }
            return putMessageResult;
        });
    }

从代码中,我们可以看到写入CommitLog这个操作主要做了以下几件事情

  1. 设置存储的消息的基本信息
    1. 如果消息是延时消息,会将原有的消息topic替换为 SCHEDULE_TOPIC_XXXX,队列id为延迟等级
  2. 申请putMessageLock(该操作确保了只会有一个线程去对CommitLog进行修改)
  3. 如果CommitLog不存在或者已经写满,需要创建新的CommitLog
  4. 往MappedFile中追加消息(追加消息的时候才会生成消息唯一ID)
    1. 如果文件剩余空间不足,会创建新的文件 (消息长度 + END_FILE_MIN_BLANK_LENGTH > CommitLog文件空闲空间,返回END_OF_FILE)
    2. 其他的一些异常
  5. 追加完消息,释放锁

那么,这边来解释下CommitLog和MappedFile的关系

CommitLog和MappedFile的关系

CommitLog里面有什么

RocketMQ基础篇 Broker存储消息 CommitLog结构.png

CommitLog里面记录了消息的完整内容,我们在读取消息的时候,首先先通过前4个字节记录了当前消息的实际长度,然后再往后读对应的长度,就可以将消息完全读取出来
CommitLog文件在磁盘中的存储路径 ${ROCKET_HOME}/store/commitlog/

CommitLog和MappedFile的关系

RocketMQ采用内存映射文件的方式来提高IO访问性能,无论是CommitLog,ConsumeQueue还是IndexFile,单个文件都被设置为固定长度。然后RocketMQ使用MappedFile和MappedFileQueue来封装存储文件。具体对MappedFile和MappedFileQueue的概念我会在后面在具体介绍

CommitLog与ConsumeQueue,IndexFile的关系

既然要探讨ConsumeQueue和IndexFile与CommitLog之间的关系,不如直接来讲一讲这两个文件是干什么用的,从他们的用处中我们也就能够得知为什么在已经有了CommitLog的情况下,RocketMQ还需要ConsumeQueue和IndexFile这两种文件

ConsumeQueue

首先,我们来看看ConsumeQueue的结构

ConsumeQueue的结构

RocketMQ基础篇 Broker存储消息 ConsumeQueue目录结构.png

RocketMQ基础篇 Broker存储消息 ConsumeQueue结构.png

从图片中,我们不难看出,RocketMQ对ConsumeQueue的层级目录为
ConsumeQueue - 具体的Topic - 具体的某一个队列 - ConsumeQueue的单元 (记录了消息在CommitLog中的偏移量,消息的长度,消息的tags)

为什么需要CommitLog和ConsumeQueue的映射关系

那么为什么RocketMQ要设计这样的映射关系呢?主要要从两个方面来考虑,同时这两个方面最终的目的也是为了保证RocketMQ的性能

  • 写入消息:如果我们没有CommitLog,直接就用一个ConsumeQueue,那么在Producer生产消息给Broker的时候,不同的Topic和队列的消息我们要写到不同的ConsumeQueue文件中。那么就会存在随机写的问题,这样写入消息的效率就会变的非常的低
  • 读取消息:如果为了保证消息的写入效率,我们将消息存放在CommitLog中,但是,在消费者拉取消息的时候,它肯定是要拉取自己感兴趣的Topic的消息。此时,我们去CommitLog中寻找对应的Topic和对应MessageQueue下对应偏移量的消息,挨个去遍历CommitLog一看就是一个效率非常低的操作。此时我们使用ConsumeQueue就可以快速定位到消息处于哪个CommitLog,他对应的偏移量是多少。这样读取消息的效率就会变得非常的高

ConsumeQueue的tag hashcode

这边补充一点,关于ConsumeQueue中的tag hashcode。
在RocketMQ中,我们不仅可以监听我们感兴趣的Topic消息,同时我们还可以只监听Topic下部分的消息。这个操作就是通过消息的tag来实现的(一种实现方式,另外还有类过滤和SQL92过滤的方式)【RocketMQ服务端过滤根据tag的hashcode】
在消费者拉取消息的时候,不仅消费者自己会对消息进行过滤,Broker也会根据消费者的情况对消息进行一次预过滤。此处我们就不展开讲述,具体可以消息消费 Consumer消费消息 的文章

IndexFile

IndexFile的结构

RocketMQ基础篇 Broker存储消息 IndexFile结构.png

他的目的是为了方便我们根据消息的key快速检索消息,快速检索主要是依靠文件头部的Hash槽。

哈希冲突的解决

既然说到了hash,那么就一定会存在哈希冲突的情况。那么,IndexFile是如何解决哈希冲突的呢?
在我们写IndexFile的时候,假如IndexFile还有足够的空间,那么我们就会对当前这条消息的消息key进行一次哈希计算,假如对应的hash槽中没有记录对应的index条目偏移量,那么直接将偏移量记录到hash槽中。否则这是在Index条目中的pre index no记录原先的偏移量,然后再将这个条目的偏移量记录到hash槽中

对于为什么要设计CommitLog,ConsumeQueue,IndexFile的总结

因为消费者消费消息的时候,是根据具体的Topic去进行消费的。所以为了加快消费者消费的速度,需要根据具体的Topic去存储消息(ConsumeQueue)。但是如果直接把消息存储在ConsumeQueue,会导致生产者发送的消息Topic不同,出现大量随机写的情况。所以为了提高消费者和生产者的速度。相同的,为了加快对消息key检索消息的速度,所以添加了IndexFile
RocketMQ首先先将生产者的消息写入到CommitLog中(顺序写),然后在通过一个异步线程将消息基本信息(偏移量等)写入到ConsumeQueue和IndexFile中

class ReputMessageService extends ServiceThread {
    // ...省略其他代码
    @Override
    public void run() {
        DefaultMessageStore.log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            try {
                // 每执行一次任务推送休息1毫秒,就继续尝试推送消息到消息消费队列和索引文件
                Thread.sleep(1);
                // 消息消费转发核心方法
                this.doReput();
            } catch (Exception e) {
                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        DefaultMessageStore.log.info(this.getServiceName() + " service end");
    }
    // ...省略其他代码
}

// 最终调用到该方法
public void doDispatch(DispatchRequest req) {
        // CommitLogDispatcher 有三个实现
        // 其中就有我们用到的 CommitLogDispatcherBuildConsumeQueue 写ConsumeQueue
        // CommitLogDispatcherBuildIndex 写IndexFile
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}

MappedFile和MappedFileQueue

MappedFile

为什么需要MappedFile

RocketMQ底层使用 mmap + write (零拷贝)的方式减少用户态和内存态的切换次数
具体内容参考 mmap解释 这篇文章
这里只简单介绍一下:
加入没有mmap,我们读取文件需要经历两次数据拷贝,一次是从磁盘拷贝到page cache中,一次从page cache拷贝到用户空间内存。
而有了mmap,就能够减少page cache到用户空间内存的拷贝mmap对page cache和用户空间虚拟地址进行了直接的映射操作虚拟地址就等同于操作page cache

MappedFileQueue

MappedFileQueue是MappedFile的管理容器,它是对存储目录的封装
比如说在CommitLog文件存储目录下有好多个CommitLog文件,对应在内存中也会有好多个MappedFile对象,而MappedFileQueue就是来管理这些MappedFile对象的

MappedFile刷盘操作

消息刷盘有两种:

  • 同步刷盘:只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
  • 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

RocketMQ定期删除已经消费的消息文件

另外,RocketMQ会定时将已经消费的消息从存储文件中删除,以CommitLog为例,第一个CommitLog文件的偏移量不一定 00000000000000000000

主干逻辑流程图

RocketMQ基础篇 Broker存储消息 定期删除CommitLog.png

对应源码和注释

// 默认每个文件过期时间为72小时,通过Broker配置文件中设置fileReservedTime来改变
// 如果非当前写文件在一定时间间隔内没有再次被更新,则会被认为是过期文件,可以被删除(RocketMQ不会关注这个文件上的消息是否被全部消费)
private void deleteExpiredFiles() {
    int deleteCount = 0;
    // 文件保留时间 72小时
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    // 删除物理文件的间隔(在一次清除过程中,可能需要删除的文件不止一个,该值指定两次删除文件的时间间隔)
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    // 在清除过期文件时,如果该文件被其他线程所占用(引用次数大于0,比如读取消息),此时会阻止此次删除任务
    // 同时在第一次试图删除该文件时记录当前时间戳 destroyMapedFileIntervalForcibly 表示第一次拒绝删除之后能保留的最大时间
    // 在此时间内,同样可以被拒绝删除,同时会将引用减少1000个,超过该时间间隔后,文件将被强制删除
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();

    // 指定删除文件的时间点
    boolean timeup = this.isTimeToDelete();
    // 磁盘空间满了
    boolean spacefull = this.isSpaceToDelete();
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

    if (timeup || spacefull || manualDelete) {

        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;

        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

        log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
                 fileReservedTime,
                 timeup,
                 spacefull,
                 manualDeleteFileSeveralTimes,
                 cleanAtOnce);

        fileReservedTime *= 60 * 60 * 1000;

        // 清除过期文件
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                                                                           destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}
public int deleteExpiredFileByTime(final long expiredTime,
                                   final int deleteFilesInterval,
                                   final long intervalForcibly,
                                   final boolean cleanImmediately) {
    Object[] mfs = this.copyMappedFiles(0);

    if (null == mfs)
        return 0;

    // 从倒数第二个文件开始
    int mfsLength = mfs.length - 1;
    int deleteCount = 0;
    // 通过MappedFile,没有磁盘i/o的情况
    List<MappedFile> files = new ArrayList<MappedFile>();
    if (null != mfs) {
        for (int i = 0; i < mfsLength; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            // 文件最大存活时间=文件的最后一次更新时间+文件存活时间(默认72小时)
            long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
            // 当前时间>=最大存活时间 或 需要立即删除
            if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                // 删除文件(将该文件加入到待删除文件列表中,然后统一执行File#delete方法将文件从物理磁盘中删除)
                if (mappedFile.destroy(intervalForcibly)) {
                    files.add(mappedFile);
                    deleteCount++;

                    if (files.size() >= DELETE_FILES_BATCH_MAX) {
                        break;
                    }

                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                        try {
                            Thread.sleep(deleteFilesInterval);
                        } catch (InterruptedException e) {
                        }
                    }
                } else {
                    break;
                }
            } else {
                //avoid deleting files in the middle
                break;
            }
        }
    }

    deleteExpiredFile(files);

    return deleteCount;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351