store模块阅读15:ConsumeQueueExt

说明

这个是consumeQueue类的拓展,记录一些不重要的信息

在MessageStoreConfig#enableConsumeQueueExt为true时生效(默认false)
存储路径默认 {user.home}/store/consumequeue_ext/{topic}/{queueId}/
里面利用MappedFileQueue管理一个MappedFile的队列,进行put,get,truncate,recover等操作
每个mappedFile默认最大48M,存放CqExtUnit(存储单元,由头部和内容两部分组成,详见下面内部类解析),标志位 short -1(代表数据结尾),尾部预留4个字节作为END_BLANK
整个consumequeue_ext记录的文件大小不得超过属性 MAX_REAL_OFFSET(见属性说明)

PS:ConsumeQueue后面再讲

属性

    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private final MappedFileQueue mappedFileQueue;//对应的mappedFileQueue
    private final String topic;
    private final int queueId;

    private final String storePath;//存储路径,默认 user.home/store/consumequeue_ext

    //consumeQueueExt文件大小,默认48M
    private final int mappedFileSize;

    private ByteBuffer tempContainer;

    public static final int END_BLANK_DATA_LENGTH = 4;//结尾需要预留4字节空白

    /**
     * Addr can not exceed this value.For compatible.
     */
    public static final long MAX_ADDR = Integer.MIN_VALUE - 1L;//extAddr的上限
    public static final long MAX_REAL_OFFSET = MAX_ADDR - Long.MIN_VALUE;//ext文件记录的偏移量上限

备注:ConsumeQueue需要Topic和queueId信息,ConsumeQueueExt因此也需要

内部类 CqExtUnit

作为mappedFile里面存放的数据结构,由头部和内容两部分组成
头部为20字节,依次为short size,long tagsCode, long msgStoreTime, short bitMapSize 共 2+8+8+2=20
内容为byte[] filterBitMap;
其中 bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
size = 20 + bitMapSize ,规定不能超过32k

CqExtUnit属性

        //size(short) + tagsCode(long) + msgStoreTime(long) + bitMapSize(short) = 20字节
        public static final short MIN_EXT_UNIT_SIZE
            = 2 * 1 // size, 32k max
            + 8 * 2 // msg time + tagCode
            + 2; // bitMapSize

        public static final int MAX_EXT_UNIT_SIZE = Short.MAX_VALUE;//一个unit最大32k

         /**
         * unit size
         */
        private short size;
        /**
         * has code of tags
         */
        private long tagsCode;
        /**
         * the time to store into commit log of message
         */
        private long msgStoreTime;
        /**
         * size of bit map
         */
        private short bitMapSize;
        /**
         * filter bit map
         */
        private byte[] filterBitMap;

CqExtUnit 构造函数

        public CqExtUnit() {
        }

        public CqExtUnit(Long tagsCode, long msgStoreTime, byte[] filterBitMap) {
            this.tagsCode = tagsCode == null ? 0 : tagsCode;
            this.msgStoreTime = msgStoreTime;
            this.filterBitMap = filterBitMap;
            this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
            this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);//size的值 为 20字节(头信息) + bitMapSize(实际内容)
        }

CqExtUnit函数

object相关函数不讲,下面主要注意read函数和write函数即可

        /**
         * build unit from buffer from current position.
         * 将buffer数据读入到内存记录的数据结构中
         */
        private boolean read(final ByteBuffer buffer) {
            if (buffer.position() + 2 > buffer.limit()) {//还没有写size
                return false;
            }

            this.size = buffer.getShort();

            if (this.size < 1) {
                return false;
            }

            this.tagsCode = buffer.getLong();
            this.msgStoreTime = buffer.getLong();
            this.bitMapSize = buffer.getShort();

            if (this.bitMapSize < 1) {
                return true;
            }

            if (this.filterBitMap == null || this.filterBitMap.length != this.bitMapSize) {
                this.filterBitMap = new byte[bitMapSize];
            }

            buffer.get(this.filterBitMap);
            return true;
        }

        /**
         * Only read first 2 byte to get unit size.
         * <p>
         * if size > 0, then skip buffer position with size.
         * </p>
         * <p>
         * if size <= 0, nothing to do.
         * </p>
         */
        //如果size>0,buffer跳过对应长度的大小
        private void readBySkip(final ByteBuffer buffer) {
            ByteBuffer temp = buffer.slice();
            short tempSize = temp.getShort();//上面slice使得,这里temp读取 不会导致buffer.position变化
            this.size = tempSize;
            if (tempSize > 0) {
                buffer.position(buffer.position() + this.size);
            }
        }

        //内存数据结构 放到 ByteBuffer中,转化成byte[]返回
        private byte[] write(final ByteBuffer container) {
            this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
            this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);

            ByteBuffer temp = container;

            if (temp == null || temp.capacity() < this.size) {
                temp = ByteBuffer.allocate(this.size);
            }

            temp.flip();
            temp.limit(this.size);

            temp.putShort(this.size);
            temp.putLong(this.tagsCode);
            temp.putLong(this.msgStoreTime);
            temp.putShort(this.bitMapSize);
            if (this.bitMapSize > 0) {
                temp.put(this.filterBitMap);
            }

            return temp.array();
        }

        /**
         * Calculate unit size by current data.
         * 实际大小为 头部长度20字节 + map的长度
         */
        private int calcUnitSize() {
            int sizeTemp = MIN_EXT_UNIT_SIZE + (filterBitMap == null ? 0 : filterBitMap.length);
            return sizeTemp;
        }

        //这里其实size没有更新,上层确保调用calcUnitSize函数
        public void setFilterBitMap(final byte[] filterBitMap) {
            this.filterBitMap = filterBitMap;
            // not safe transform, but size will be calculate by #calcUnitSize
            this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
        }

ConsumeQueueExt函数

flush,destroy,checkSelf,load函数直接调用的mappedFileQueue对应方法,不讲
getMaxAddress,getMinAddress只在测试类用,不讲

构造函数

    /**
     * Constructor.
     *
     * @param topic topic
     * @param queueId id of queue
     * @param storePath root dir of files to store.
     * @param mappedFileSize file size
     * @param bitMapLength bit map length.
     */
    public ConsumeQueueExt(final String topic,
        final int queueId,
        final String storePath,
        final int mappedFileSize,
        final int bitMapLength) {

        this.storePath = storePath;
        this.mappedFileSize = mappedFileSize;

        this.topic = topic;
        this.queueId = queueId;

        String queueDir = this.storePath
            + File.separator + topic
            + File.separator + queueId;//对应queueId的目录是 user.home/store/consumequeue_ext/topic/queueId/

        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);

        if (bitMapLength > 0) {
            this.tempContainer = ByteBuffer.allocate(
                bitMapLength / Byte.SIZE
            );
        }
    }

注意每个ConsumeQueueExt的目录为 {user.home}/store/consumequeue_ext/{topic}/{queueId}/即可

编解码相关

    public static boolean isExtAddr(final long address) {
        return address <= MAX_ADDR;
    }

    public long unDecorate(final long address) {
        if (isExtAddr(address)) {
            return address - Long.MIN_VALUE;
        }
        return address;
    }

    public long decorate(final long offset) {
        if (!isExtAddr(offset)) {
            return offset + Long.MIN_VALUE;
        }
        return offset;
    }

就是put到mappedFile时,返回的是编码之后的addr(即decorate,一个负数)
从mappedFile利用addr获取CqExtUnit时,要进行解码(即unDecorate,一个正数)
并不清楚为什么要这样设计,猜测是历史兼容性原因

get相关

两个函数,就是根据编码之后的addr进行解码,从mappedFileQueue中找对应的mappedFile,
通过%mappedFileSize拿到偏移,继而拿到buffer
读取buffer来给cqExtUnit的数据结构赋值

    /**
     * Get data from buffer.
     *
     * @param address less than 0
     */
    //根据修饰后的addr,拿到mappedFile中对应位置描述的CqExtUnit
    public CqExtUnit get(final long address) {
        CqExtUnit cqExtUnit = new CqExtUnit();
        if (get(address, cqExtUnit)) {
            return cqExtUnit;
        }

        return null;
    }

    /**
     * Get data from buffer, and set to {@code cqExtUnit}
     *
     * @param address less than 0
     */
    //根据修饰后的addr,从MappedFileQueue中找到对应的file
    //通过%mappedFileSize拿到偏移,继而拿到buffer
    //读取buffer来给cqExtUnit的数据结构赋值
    public boolean get(final long address, final CqExtUnit cqExtUnit) {
        if (!isExtAddr(address)) {
            return false;
        }

        final int mappedFileSize = this.mappedFileSize;
        final long realOffset = unDecorate(address);//真实偏移

        //realOffset找到对应的MappedFile
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(realOffset, realOffset == 0);
        if (mappedFile == null) {
            return false;
        }

        int pos = (int) (realOffset % mappedFileSize);

        SelectMappedBufferResult bufferResult = mappedFile.selectMappedBuffer(pos);//到特定的位置读取 buffer
        if (bufferResult == null) {
            log.warn("[BUG] Consume queue extend unit({}) is not found!", realOffset);
            return false;
        }
        boolean ret = false;
        try {
            ret = cqExtUnit.read(bufferResult.getByteBuffer());//读buffer到cqExtUnit中的内存中
        } finally {
            bufferResult.release();//引用-1
        }

        return ret;
    }

put相关

put函数,存放一个cqExtUnit,返回编码后的地址
fullFillToEnd函数,wrotePosition存放-1代表结束,更新wrotePosition为mappedFileSize代表该mappedFile写结束

    /**
     * Save to mapped buffer of file and return address.
     * <p>
     * Be careful, this method is not thread safe.
     * </p>
     *
     * @return success: < 0: fail: >=0
     */
    //存放一个cqExtUnit,返回编码后的地址
    public long put(final CqExtUnit cqExtUnit) {
        final int retryTimes = 3;
        try {
            int size = cqExtUnit.calcUnitSize();//计算大小
            if (size > CqExtUnit.MAX_EXT_UNIT_SIZE) {//太大了,超过了32k
                log.error("Size of cq ext unit is greater than {}, {}", CqExtUnit.MAX_EXT_UNIT_SIZE, cqExtUnit);
                return 1;
            }
            if (this.mappedFileQueue.getMaxOffset() + size > MAX_REAL_OFFSET) {//ext文件记录数据过多
                log.warn("Capacity of ext is maximum!{}, {}", this.mappedFileQueue.getMaxOffset(), size);
                return 1;
            }
            // unit size maybe change.but, the same most of the time.
            if (this.tempContainer == null || this.tempContainer.capacity() < size) {
                this.tempContainer = ByteBuffer.allocate(size);
            }

            for (int i = 0; i < retryTimes; i++) {
                MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

                if (mappedFile == null || mappedFile.isFull()) {
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);//之前最新的写满了,自动创建一个
                }

                if (mappedFile == null) {
                    log.error("Create mapped file when save consume queue extend, {}", cqExtUnit);
                    continue;
                }
                final int wrotePosition = mappedFile.getWrotePosition();
                final int blankSize = this.mappedFileSize - wrotePosition - END_BLANK_DATA_LENGTH;

                // check whether has enough space.
                if (size > blankSize) {//空间不够了
                    fullFillToEnd(mappedFile, wrotePosition);//标记这个mappedFile为写满了
                    log.info("No enough space(need:{}, has:{}) of file {}, so fill to end",
                        size, blankSize, mappedFile.getFileName());
                    continue;//重新再试
                }
                //mappedFile追加消息
                if (mappedFile.appendMessage(cqExtUnit.write(this.tempContainer), 0, size)) {
                    return decorate(wrotePosition + mappedFile.getFileFromOffset());
                }
            }
        } catch (Throwable e) {
            log.error("Save consume queue extend error, " + cqExtUnit, e);
        }

        return 1;
    }

    //wrotePosition存放-1代表结束,更新wrotePosition为mappedFileSize代表该mappedFile写满
    protected void fullFillToEnd(final MappedFile mappedFile, final int wrotePosition) {
        ByteBuffer mappedFileBuffer = mappedFile.sliceByteBuffer();
        mappedFileBuffer.position(wrotePosition);

        // ending.
        mappedFileBuffer.putShort((short) -1);//-1标记结束位

        mappedFile.setWrotePosition(this.mappedFileSize);//标记写满了,isFull
    }

recover相关

    public void recover() {
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (mappedFiles == null || mappedFiles.isEmpty()) {
            return;
        }

        // load all files, consume queue will truncate extend files.
        int index = 0;

        MappedFile mappedFile = mappedFiles.get(index);
        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        long processOffset = mappedFile.getFileFromOffset();
        long mappedFileOffset = 0;
        CqExtUnit extUnit = new CqExtUnit();
        while (true) {
            extUnit.readBySkip(byteBuffer);//在一个mappedFile之间skip

            // check whether write sth.
            if (extUnit.getSize() > 0) {
                mappedFileOffset += extUnit.getSize();
                continue;
            }
            //最后肯定getSize返回-1,是文件尾部写入的short标志位
            index++;
            if (index < mappedFiles.size()) {
                mappedFile = mappedFiles.get(index);//下一个mappedFile
                byteBuffer = mappedFile.sliceByteBuffer();
                processOffset = mappedFile.getFileFromOffset();
                mappedFileOffset = 0;
                log.info("Recover next consume queue extend file, " + mappedFile.getFileName());
                continue;
            }

            log.info("All files of consume queue extend has been recovered over, last mapped file "
                + mappedFile.getFileName());
            break;
        }

        processOffset += mappedFileOffset;//拿到最后一个mappedFile最后的绝对偏移
        this.mappedFileQueue.setFlushedWhere(processOffset);
        this.mappedFileQueue.setCommittedWhere(processOffset);
        this.mappedFileQueue.truncateDirtyFiles(processOffset);
    }

这里就是为了获取最后一个mappedFile#getFileFromOffset(); 而且这个函数,设置标志位然后truncate,见吐槽

truncate相关

truncateByMinAddress: 删除minAddress之前的mappedFile
truncateByMaxAddress: 删除maxAddress之后的mappedFile以及同一个mappedFile之后的记录

    /**
     * Delete files before {@code minAddress}.
     *
     * @param minAddress less than 0
     */
    //删除minAddress之前的mappedFile
    public void truncateByMinAddress(final long minAddress) {
        if (!isExtAddr(minAddress)) {
            return;
        }

        log.info("Truncate consume queue ext by min {}.", minAddress);

        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();

        List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        final long realOffset = unDecorate(minAddress);

        for (MappedFile file : mappedFiles) {
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;

            if (fileTailOffset < realOffset) {
                log.info("Destroy consume queue ext by min: file={}, fileTailOffset={}, minOffset={}", file.getFileName(),
                    fileTailOffset, realOffset);
                if (file.destroy(1000)) {
                    willRemoveFiles.add(file);
                }
            }
        }

        this.mappedFileQueue.deleteExpiredFile(willRemoveFiles);
    }

    /**
     * Delete files after {@code maxAddress}, and reset wrote/commit/flush position to last file.
     *
     * @param maxAddress less than 0
     */
    //删除maxAddress之后的mappedFile以及同一个mappedFile之后的记录
    public void truncateByMaxAddress(final long maxAddress) {
        if (!isExtAddr(maxAddress)) {
            return;
        }

        log.info("Truncate consume queue ext by max {}.", maxAddress);

        CqExtUnit cqExtUnit = get(maxAddress);//拿到mappedFile对应的CqExtUnit记录,用来获取size属性
        if (cqExtUnit == null) {
            log.error("[BUG] address {} of consume queue extend not found!", maxAddress);
            return;
        }

        final long realOffset = unDecorate(maxAddress);
        //之后的偏移全部清空
        this.mappedFileQueue.truncateDirtyFiles(realOffset + cqExtUnit.getSize());
    }

思考

结束标记,与END_BLANK属性

fullFillToEnd 函数中,一个mappedFile剩余空间不够了,此时会放置short -1,来填充,short 2字节就够了。
文件最后一定会留END_BLANK即4字节,在put函数中计算

final int blankSize = this.mappedFileSize - wrotePosition - END_BLANK_DATA_LENGTH;

时体现

CqExtUnit 与 IndexFile和IndexHeader

CqExtUnit其实也是有头部和内容两部分的,后IndexFile以及IndexHeader 这点比较像

问题

注释中 week reliable?

这个是怎么体现的?

CqExtUnit作用是什么

filterBitMap以及tagsCode,值到底是什么,有什么用,暂时不是很清楚,涉及ConsumeQueue,等ConsumeQueue看懂了再补充

吐槽

ConsumeQueueExt.CqExtUnit#setFilterBitMap 以及 calcUnitSize函数

一定要再调用了setFilterBitMap 函数之后,再自行调用calcUnitSize
确保bitMapSize变更之后,再单独变更size,不如直接在setFilterBitMap 这里设置size好了

unDecorate & decorate

不明白为什么要编解码,很绕,感觉是历史,兼容性等原因

recover函数

这里并不懂为什么要readBySkip各个mappedFile直到最后一个,不如直接mappedFileQueue#getLastMappedFile不就好了吗,
而且processOffset已经是最大的offset了,那么调用

this.mappedFileQueue.truncateDirtyFiles(processOffset);

还有什么意义呢,看ConsumeQueueExtTest#testRecovery也没有删除后续的file,因为本来就没有后续的file了
不知道要这个函数干嘛

refer

org.apache.rocketmq.store.ConsumeQueueExtTest

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容