RocketMQ 的 IndexFile 索引文件

1、前言

最近 rocketmq 系列要做起来了,一来是为了自己能多学点东西,而是为了年底的跳槽。相信我们大家看过 rocketmq 的 github 文档时,都看过这样一张图:


rocketmq 消息存储

它告诉我们,查询 rocketmq 的 commitlog 中消息有两种方式,一种是通过 ConsumerQueue 去查找,一种是通过 key(还可以在 key 的基础上加上时间范围)去查找。

通过 ConsumerQueue 查找的方式比较简单,因为 ConsumerQueue 的每一块都是固定的,每一块由[CommitLog Offset, size, Message Tag HashCode] 组成,我们有了相对于起始地址的 CommitLog Offset 就知道从哪查找消息(baseOffSet + CommitLog Offset),再加上 size 就能读到完整的消息了。

但是官方文档对于索引如何查找消息的描述比较模糊,它只说明了怎样根据 key 构建 hashmap,但是没说到底是怎样构建的。所以我们今天根据源码以及官方图解,来说明。

2、IndexFile 索引文件

IndexFile 首先它是一个固定大小的完整 file,它由 Header、Slot Table、Index Linked List 组成,在文件组织上,它们是存在一起的。然后关于 Index Linked List 区域,它是真正存储索引的地方,而 Slot Table 是存储 HashMap 的 table(槽)。看到这,就有疑惑了,它到底是怎么组织成的 HashMap 呢?


indexFile

在说明之前,我们先说一下 IndexFile 这几部分的结构。Header 占 40 Bytes


Header 结构
    1. beginTimestamp : 该索引文件的第一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 0-7) 8bytes
    1. endTimestamp : 该索引文件的最后一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 8-15) 8bytes
    1. beginPhyoffset : 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息) 物理位置(pos: 16-23) 8bytes
    1. endPhyoffset : 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量 (pos: 24-31) 8bytes
    1. hashSlotCount : 该索引文件目前的hash slot的个数 (pos: 32-35) 4bytes
    1. indexCount : 该索引文件目前的索引个数 (pos: 36-39) 4bytes

Slot Table 每个 slot 占 4 Byte,有500w 个,共占用 4 * 500w Byte。slot 就放一个值,即当前 hashmap 此槽中最后一个的节点的位置(只是形象的说位置,其实不是存的地址,是计算出来的,实际上存的是最后一个索引在所有索引顺序上是第几个)。

Index Linked List 每个占用 20 Byte,共有 2000w个,共占用 20 * 2000w Byte。每个索引区域存的值分别是:


索引区域
    1. key hash value: message key的hash值(key = topic + “#” + KEY,然后针对 key 计算 hashcode)
    1. phyOffset: message在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
    1. timeDiff: message的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
    1. prevIndex: hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件),每个slot位置的第一个消息的prevIndex就是0的。

3、流程

putkey 的源码为:

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            
            // 1、计算 key 的 hashCode
            int keyHash = indexKeyHashMethod(key);
            
            // 2、算出在哪个槽,hashSlotNum 是 500w
            int slotPos = keyHash % this.hashSlotNum;
            
            // 3、算出要槽在哪个位置,Header + (slotPos * 每个 slot 大大小)
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                // 3、取出 slot 处的值
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }

                // 4、算出要存的 index 的值,跳过 Header + slot Table + 当前已经存的索引数 * 每个索引的大小
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;

                // 5、开始写入20 Byte,分别为 keyHash、phyOffset、timeDiff、slotValue(这个很重要)
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

                // 6、在 slot 中写入当前 index 是第几个
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }

                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

根据 key 或者再加上时间区间查找消息的值为:

public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {

            // 1、计算 key 的 hashCode
            int keyHash = indexKeyHashMethod(key);

            // 2、算出在哪个槽,hashSlotNum 是 500w
            int slotPos = keyHash % this.hashSlotNum;

            // 3、算出要槽在哪个位置,Header + (slotPos * 每个 slot 的大小)
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }

                // 4、取出 slot 处的值
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {
                // fileLock.release();
                // fileLock = null;
                // }

                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    
                    // 从当前 index 不停往前找
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }


                        // 5、算出要存的 index 的值,跳过 Header + slot Table + 当前已经存的索引数 * 每个索引的大小
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        
                        // 这个为什么是 pre,因为 index 是递增的,最后一位虽然存的是 slot 的值,但是也是上一个 index 的 slot
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                        // 判断是否符合是当前的 key 并且符合区间
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }

                        // 6、往上一位找
                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();
            }
        }
    }

假设我们初始的 IndexFile 如下:


初始 IndexFile

然后我们给他插入2个 key,变成了这样:


插入2个 key

假设插入第三个 key 后,与位置2冲突了,那么此时第3个 index 应该记录第2个 slot 的值,然后再第2个 slot 后写入,并把第2个 slot 的值改成2:


插入第3个

然后我们查找第3个 key 的时候,首先找到第3个 key 的位置,然后发现 hashCode 相同,那么便把它加入;然后取出 slotValue,计算上一个的位置,即第二个,然后把它取出来,发现 key 不一样,不加入;接着取出 slotValue 的位置,发现值是0,即前面没有 index 了,然后直接返回了。

总结来说,index 索引文件模仿了 hashmap 的实现方式,在 hash 冲突时也是使用链表的方式防止冲突。但是这个链表只是逻辑上的,实际上是这样的。

4、后记

多思考,多对比。还有,项目的单测很重要,看不懂可跑跑单测。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容