RocketMQ源码阅读(四)-消息存储二

RocketMQ的消息存储过程非常复杂, 本文先介绍存储模块中几个重要对象.

1. MappedFile

MappedByteBuffer的封装, 具有创建文件(使用非堆区内存), 写入, 提交, 读取, 释放, 关闭等功能, RocketMQ使用该类实现数据从内存到磁盘的持久化.

关键字段

  • fileChannel: 该类对应的文件通道.
  • mappedByteBuffer: 文件在内存中的映射. 如前文所述RocketMQ使用内存映射的方式来操作文件, 这种方式要比流的方式快很多.
  • fileSize: 文件尺寸
  • wrotePosition: 当前写到哪一个位置.
  • committedPosition: 已经提交(已经持久化到磁盘)的位置.
  • flushedPosition: 已经提交(已经持久化到磁盘)的位置.
  • writeBuffer: 内存字节缓冲区, RocketMQ提供两种数据落盘的方式: 一种是直接将数据写到映射文件字节缓冲区(mappedByteBuffer), 映射文件字节缓冲区(mappedByteBuffer)flush; 另一种是先写到writeBuffer, 再从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel), 然后文件通道(fileChannel)flush.
  • fileFromOffset: fileFromOffset: 映射的起始偏移量, 拿commitlog文件来举例, 下面有很多个文件夹(假设为1KB, 默认是1G大小), 第一个文件名为00000000000000000000, 第二个文件名为00000000000000001024, 那么第一个文件的fileFromOffset就是0, 第二个文件的fileFromOffset就是1024

关键方法

  • appendMessage: 插入消息到MappedFile, 并返回插入结果.
  • selectMappedBuffer: 返回指定位置的内存映射, 用于读取数据.
    (1) appendMessage
    源代码如下:
public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
    assert msg != null;
    assert cb != null;

    int currentPos = this.wrotePosition.get();  //获取当前写的位置

    if (currentPos < this.fileSize) {   //currentPos小于文件尺寸才能写入
        //获取获取需要写入的字节缓冲区, 之所以会有writeBuffer != null的判断与使用的刷盘服务有关.
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);    //设置写入的postion
        AppendMessageResult result =
            cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);  //执行写入
        this.wrotePosition.addAndGet(result.getWroteBytes()); //更新wrotePosition
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    //返回错误信息
    log.error("MappedFile.appendMessage return null, wrotePosition: " + currentPos + " fileSize: "
        + this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}

可以看到MappedFile调用AppendMessageCallback来执行msg到字节缓冲区的写入.事实上整个RocketMQ只有一个类实现了AppendMessageCallback接口, 就是DefaultAppendMessageCallback. doAppend方法的具体实现与消息格式有关, 并且不属于MappedFile的范畴, 后文再分析.
(2) selectMappedBuffer
源代码如下:

//返回从pos到 pos + size的内存映射
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
    int readPosition = getReadPosition();   //获取当前有效数据的最大位置
    if ((pos + size) <= readPosition) {    //内存映射的最大位置必须小于readPosition

        if (this.hold()) {    //引用计数
            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();  // 复制一个byteBuffer(与原byteBuffer共享数据, 只是指针位置独立)
            byteBuffer.position(pos);    //设置position
            //获取目标数据
            ByteBuffer byteBufferNew = byteBuffer.slice();
            byteBufferNew.limit(size);
            return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
        } else {
            log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
                + this.fileFromOffset);
        }
    } else {
        log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
            + ", fileFromOffset: " + this.fileFromOffset);
    }

    return null;
}

2. MappedFileQueue

顾名思义, 该类代表了MappedFile组成的队列(由大小相同的多个文件构成). 无论是CommitLog(消息主体以及元数据), 还是ConsumeQueue(逻辑队列), 底层使用的组件都是MappedFileQueue.

关键字段

  • storePath: 文件队列的存储路径
  • mappedFiles: 存储MappedFile的map
  • mappedFileSize: MappedFile的尺寸
  • flushedWhere: 已经刷到磁盘的位置
  • committedWhere: 已经提交的位置

关键方法

  • getLastMappedFile: 获取队列中最后一个MappedFile对象
  • findMappedFileByOffset: 根据offset/filesize计算该offset所在那个文件中

(1) getLastMappedFile
源代码如下:

public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    long createOffset = -1;
    //获取当前Queue中最后一个MappedFile
    MappedFile mappedFileLast = getLastMappedFile();

    //一个文件都不存在时, 计算起始文件的offset
    if (mappedFileLast == null) {
        createOffset = startOffset - (startOffset % this.mappedFileSize);
    }
    //计算需要新创建的文件的offset
    if (mappedFileLast != null && mappedFileLast.isFull()) {
        createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }

    //创建新的MappedFile
    if (createOffset != -1 && needCreate) {
        //计算文件名
        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
        String nextNextFilePath = this.storePath + File.separator
            + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
        MappedFile mappedFile = null;

        if (this.allocateMappedFileService != null) {
            //使用AllocateMappedFileService创建文件主要是更加安全一些, 会将一些并行的操作穿行化
            mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                nextNextFilePath, this.mappedFileSize);
        } else {
            try {
                mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
            } catch (IOException e) {
                log.error("create mappedFile exception", e);
            }
        }

        //将新创建的文件添加到队列中
        if (mappedFile != null) {
            if (this.mappedFiles.isEmpty()) {
                mappedFile.setFirstCreateInQueue(true);
            }
            this.mappedFiles.add(mappedFile);
        }

        return mappedFile;
    }

    return mappedFileLast;
}

从源码中可见, 只有当文件写满或者找不到文件时, 才会创建新的文件.
(2) findMappedFileByOffset
主要是根据offset寻找对应的MappedFile, 具体源代码不再贴出.
为了理解findMapedFileByOffset, 我们假设每个文件的大小是1024K, 参考以下图示:



如果现在想查找3021在那个文件中, 可以按如下计算:
(3021 - 0)/1024=2 即可知其在队列下标为2的MappedFile中
释义如下: (offset-第一个文件的fileFromeOffset)/mappedFileSize

3. CommitLog

用于存储消息的抽象封装, 内部采用MapedFileQueue实现了消息文件队列功能.

关键字段

  • HashMap topicQueueTable: 用于记录某个topic在某个queueId共写入了多少个消息, put一个消息加1.

关键方法

  • putMessage: 存储消息.
  • getMessage: 读取消息

(1) putMessage
存储消息主要分3步: 查找文件(getLastMapedFile), 写入数据(DefaultAppendMessageCallback), 刷盘(FlushRealTimeService). 最终产生实际存储消息的队列文件如下:
${storePathRootDir}/commitlog/消息队列文件. (消息队列文件名规则如MappedFileQueue).

(2)getMessage(final long offset, final int size)
offset: 绝对偏移量, 可以用其调用findMappedFileByOffset查询MappedFile.
size: 欲查询的数据大小.

4. ConsumeQueue

消费队列的实现, 该消费队列主要存储了消息在CommitLog的位置, 与CommitLog类似, 内部采用MappedFileQueue实现了消息位置文件队列功能.
一个topic和一个queueId对应一个ConsumeQueue.
默认queue存储30W条消息, 每个消息大小为20个字节, 详细如下:
offset(long 8字节) + size(int 4字节) + tagsCode(long 8字节)

关键方法

  • putMessagePositionInfo: 消息位置的存储
  • getIndexBuffer: 该方法返回从offset之后的字节映射
    (1)putMessagePositionInfo(final long offset, final int size, final long tagsCode,
    final long cqOffset)
    offset: 消息在commitLog中的起始位置
    size: 消息长度
    tagsCode: 消息tag的hash code
    cqOffset: 该消息在topic对应的queue中的下标
    该方法主要实现了消息位置的存储, 并产生消息文件:
    storePathRootDir/consumequeue/{topic}/${queueId}/消息位置队列文件
    消息数(30W)*消息位置固定大小(20字节)=6000000字节
    故每6000000字节一个文件, 文件名依次递增, 前缀不够20位补0, 类似如下:
    00000000000000000000
    00000000000006000000
    00000000000012000000

(2)getIndexBuffer(final long startIndex)
该方法源代码如下:

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    int mappedFileSize = this.mappedFileSize;
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
            return result;
        }
    }
    return null;
}

startIndex代表了起始偏移量索引.
该方法先根据startIndex找到对应的MappedFile, 再在该MappedFile中找到对应的字节映射.

5. 总结

RocketMQ的消息存储非常复杂, 本文介绍了消息存储中使用到的基础组件类以及一些重要的API. 后文会进一步介绍消息存储的详细流程.

参考资料:
1.http://www.tuicool.com/articles/6FFR7v
2.http://blog.csdn.net/a417930422/article/details/50606732

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容