RocketMQ 的 IndexFile 索引文件

1、前言

最近 rocketmq 系列要做起来了,一来是为了自己能多学点东西,而是为了年底的跳槽。相信我们大家看过 rocketmq 的 github 文档时,都看过这样一张图:


rocketmq 消息存储

它告诉我们,查询 rocketmq 的 commitlog 中消息有两种方式,一种是通过 ConsumerQueue 去查找,一种是通过 key(还可以在 key 的基础上加上时间范围)去查找。

通过 ConsumerQueue 查找的方式比较简单,因为 ConsumerQueue 的每一块都是固定的,每一块由[CommitLog Offset, size, Message Tag HashCode] 组成,我们有了相对于起始地址的 CommitLog Offset 就知道从哪查找消息(baseOffSet + CommitLog Offset),再加上 size 就能读到完整的消息了。

但是官方文档对于索引如何查找消息的描述比较模糊,它只说明了怎样根据 key 构建 hashmap,但是没说到底是怎样构建的。所以我们今天根据源码以及官方图解,来说明。

2、IndexFile 索引文件

IndexFile 首先它是一个固定大小的完整 file,它由 Header、Slot Table、Index Linked List 组成,在文件组织上,它们是存在一起的。然后关于 Index Linked List 区域,它是真正存储索引的地方,而 Slot Table 是存储 HashMap 的 table(槽)。看到这,就有疑惑了,它到底是怎么组织成的 HashMap 呢?


indexFile

在说明之前,我们先说一下 IndexFile 这几部分的结构。Header 占 40 Bytes


Header 结构
    1. beginTimestamp : 该索引文件的第一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 0-7) 8bytes
    1. endTimestamp : 该索引文件的最后一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 8-15) 8bytes
    1. beginPhyoffset : 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息) 物理位置(pos: 16-23) 8bytes
    1. endPhyoffset : 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量 (pos: 24-31) 8bytes
    1. hashSlotCount : 该索引文件目前的hash slot的个数 (pos: 32-35) 4bytes
    1. indexCount : 该索引文件目前的索引个数 (pos: 36-39) 4bytes

Slot Table 每个 slot 占 4 Byte,有500w 个,共占用 4 * 500w Byte。slot 就放一个值,即当前 hashmap 此槽中最后一个的节点的位置(只是形象的说位置,其实不是存的地址,是计算出来的,实际上存的是最后一个索引在所有索引顺序上是第几个)。

Index Linked List 每个占用 20 Byte,共有 2000w个,共占用 20 * 2000w Byte。每个索引区域存的值分别是:


索引区域
    1. key hash value: message key的hash值(key = topic + “#” + KEY,然后针对 key 计算 hashcode)
    1. phyOffset: message在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
    1. timeDiff: message的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
    1. prevIndex: hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件),每个slot位置的第一个消息的prevIndex就是0的。

3、流程

putkey 的源码为:

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            
            // 1、计算 key 的 hashCode
            int keyHash = indexKeyHashMethod(key);
            
            // 2、算出在哪个槽,hashSlotNum 是 500w
            int slotPos = keyHash % this.hashSlotNum;
            
            // 3、算出要槽在哪个位置,Header + (slotPos * 每个 slot 大大小)
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                // 3、取出 slot 处的值
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }

                // 4、算出要存的 index 的值,跳过 Header + slot Table + 当前已经存的索引数 * 每个索引的大小
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;

                // 5、开始写入20 Byte,分别为 keyHash、phyOffset、timeDiff、slotValue(这个很重要)
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

                // 6、在 slot 中写入当前 index 是第几个
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

根据 key 或者再加上时间区间查找消息的值为:

public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {

            // 1、计算 key 的 hashCode
            int keyHash = indexKeyHashMethod(key);

            // 2、算出在哪个槽,hashSlotNum 是 500w
            int slotPos = keyHash % this.hashSlotNum;

            // 3、算出要槽在哪个位置,Header + (slotPos * 每个 slot 的大小)
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }

                // 4、取出 slot 处的值
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {
                // fileLock.release();
                // fileLock = null;
                // }

                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    
                    // 从当前 index 不停往前找
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }


                        // 5、算出要存的 index 的值,跳过 Header + slot Table + 当前已经存的索引数 * 每个索引的大小
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        
                        // 这个为什么是 pre,因为 index 是递增的,最后一位虽然存的是 slot 的值,但是也是上一个 index 的 slot
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                        // 判断是否符合是当前的 key 并且符合区间
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }

                        // 6、往上一位找
                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();
            }
        }
    }

假设我们初始的 IndexFile 如下:


初始 IndexFile

然后我们给他插入2个 key,变成了这样:


插入2个 key

假设插入第三个 key 后,与位置2冲突了,那么此时第3个 index 应该记录第2个 slot 的值,然后再第2个 slot 后写入,并把第2个 slot 的值改成2:


插入第3个

然后我们查找第3个 key 的时候,首先找到第3个 key 的位置,然后发现 hashCode 相同,那么便把它加入;然后取出 slotValue,计算上一个的位置,即第二个,然后把它取出来,发现 key 不一样,不加入;接着取出 slotValue 的位置,发现值是0,即前面没有 index 了,然后直接返回了。

总结来说,index 索引文件模仿了 hashmap 的实现方式,在 hash 冲突时也是使用链表的方式防止冲突。但是这个链表只是逻辑上的,实际上是这样的。

4、后记

多思考,多对比。还有,项目的单测很重要,看不懂可跑跑单测。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容