RocketMQ源码解析(十)-Broker#消息存储Index

IndexFile作用

MessageStore中存储的消息除了通过ConsumeQueue提供给consumer消费之外,还支持通过MessageID或者MessageKey来查询消息。使用ID查询时,因为ID就是用broker+offset生成的,所以很容易就找到对应的commitLog文件来读取消息。对于用MessageKey来查询消息,MessageStore通过构建一个index来提高读取速度。

Index文件组织方式

在了解源码之前,最重要的是要知道Index文件的存储结构是怎么样的,下面的图引用自CSDN斩秋的文章RocketMQ原理解析

索引文件结构

上图中下半部分是索引文件的结构,一个索引文件时有文件头,slotTable,和index List组成的。
Header中存储的信息有:文件中第一个和最后一个索引对应的消息的存储时间,第一个和最后一个索引对应消息的offset的最大和最小值,文件中的索引个数。
整个slotTable+indexLinkedList可以理解成java的HashMap。每当放一个新的消息的index进来,首先取MessageKey的hashCode,然后用hashCode对slot总数取模,得到应该放到哪个slot中,slot总数系统默认500W个。只要是取hash就必然面临hash冲突的问题,跟HashMap一样,IndexFile也是使用一个链表结构来解决hash冲突。只是这里跟HashMap稍微有点区别的地方是,slot中放的是最新index的指针。这个是因为一般查询的时候肯定是优先查最近的消息。
每个slot中放的指针值是索引在indexFile中的偏移量,如上图,每个索引大小是20字节,所以根据当前索引是这个文件中的第几个(偏移量),就很容易定位到索引的位置。然后每个索引都保存了跟它同一个slot的前一个索引的位置,以此类推形成一个链表的结构。下面通过代码来看下新建一个索引的过程:

Index生成过程

上一篇讲ConsumeQueue的时候,有一个ReputMessageService在分发消息的时候还会调用CommitLogDispatcherBuildIndex用来创建index。这个类实现就是直接调用的IndexService.buildIndex()

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(request);
            }
        }
    }

直接看IndexService的实现如下:

public void buildIndex(DispatchRequest req) {
        //1、获取或者新建当前可写入的index file
        IndexFile indexFile = retryGetAndCreateIndexFile();
        if (indexFile != null) {
            //2、获取当前indexFile中记录的最大offset
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            //3、新来消息是之前的,不应该出现
            if (msg.getCommitLogOffset() < endPhyOffset) {
                return;
            }

            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE://rollback消息不更新index
                    return;
            }
            //4、单条消息
            if (req.getUniqKey() != null) {
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }
            //5、多个msg,循环逐个存入
            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }
  • 第1步,跟CommitLogConsumeQueue一样,IndexFile也是用MappedFile来存储数据,每个MappedFile大小也是固定的。所以这里第一步获取当前正在写入的文件,没有的话则新建,indexFile是用当前时间戳作为文件名的。
  • 第2、3步,判断是否是新的消息,不是则跳过
  • 第4、5步,这里分为单条消息和批量消息,其实最终的逻辑是一样的,都是调用putKey的方法
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");

            indexFile = retryGetAndCreateIndexFile();
            if (null == indexFile) {
                return null;
            }

            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
        }

        return indexFile;
    }

这里就是调用IndexFile的putKey的方法,包含了重试逻辑,因为有可能在写index的时候,上一个文件已经写满了,需要创建一个新的文件写入。

IndexFile数据写入

    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        //1、判断index是否已满,已满返回失败,由调用方来处理
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            //2、计算key的非负hashCode,调用的java String的hashcode方法
            int keyHash = indexKeyHashMethod(key);
            //3、key应该放的slot
            int slotPos = keyHash % this.hashSlotNum;
            //4、slot的数据存储位置
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;//获取slot的position

            FileLock fileLock = null;

            try {

                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                //5、如果存在hash冲突,获取这个slot存的前一个index的计数,如果没有则值为0
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
                //6、计算当前msg的存储时间和第一条msg相差秒数
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                //7、获取该条index实际存储position
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                //8、生成一个index的unit内容
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);//key的hash,不会记录完整的key
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);//消息在commitlog中偏移
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);//时间差
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);//相同hashcode的前一条index的顺序号
                //9、更新slot中的值为本条消息的顺序号
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                //10、如果是第一条消息,更新header中的起始offset和起始time
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
                //11、更新header中的计数
                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();//增加index计数
                this.indexHeader.setEndPhyOffset(phyOffset);//最后一条消息的offset
                this.indexHeader.setEndTimestamp(storeTimestamp);//最后一个index的时间

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }

以上的逻辑中,第5步获取slot之前是否已经有value,如果有代表hash冲突了,则在第8步中会把value设置成当前index的前一个index,同时将slot中的value更新成当前消息的序号。这样整个索引的生成就结束了,我们看下使用MsgKey来查询消息的时候是怎么使用索引文件的。

通过IndexFile查询消息

MessageStore中提供了根据MessageKey查询消息的接口,接口如下:

public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
        QueryMessageResult queryMessageResult = new QueryMessageResult();

        long lastQueryMsgTime = end;

        for (int i = 0; i < 3; i++) {
            //1、从indexService中查询所有offset
            QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
            if (queryOffsetResult.getPhyOffsets().isEmpty()) {
                break;
            }

            Collections.sort(queryOffsetResult.getPhyOffsets());

            queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
            queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());

            for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
                long offset = queryOffsetResult.getPhyOffsets().get(m);

                try {

                    boolean match = true;
                    MessageExt msg = this.lookMessageByOffset(offset);
                    if (0 == m) {
                        lastQueryMsgTime = msg.getStoreTimestamp();
                    }
                    if (match) {
                        //2、根据offset,到CommitLog读取消息详情
                        SelectMappedBufferResult result = this.commitLog.getData(offset, false);
                        if (result != null) {
                            int size = result.getByteBuffer().getInt(0);
                            result.getByteBuffer().limit(size);
                            result.setSize(size);
                            queryMessageResult.addMessage(result);
                        }
                    } else {
                        log.warn("queryMessage hash duplicate, {} {}", topic, key);
                    }
                } catch (Exception e) {
                    log.error("queryMessage exception", e);
                }
            }
            ...
        }

        return queryMessageResult;
    }

以上的接口输入参数如下:
topic:只能按topic维度来查询消息,因为索引生成的时候key是用的topic+MessageKey
key: messageKey
maxNum : 最多返回的消息数,因为key是由用户设置的,并不保证唯一,所以可能取到多个消息;同时index中只存储了hash,所以hash相同的消息也会取出来
begin,end:起始和结束时间,只会查询指定时间段的消息
这个接口的具体实现就是,先从IndexService中读取到offset,然后到CommitLog中读取消息详情。下面主要看下indexService中的实现:

public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
        List<Long> phyOffsets = new ArrayList<Long>(maxNum);

        long indexLastUpdateTimestamp = 0;
        long indexLastUpdatePhyoffset = 0;
        maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
        try {
            this.readWriteLock.readLock().lock();
            if (!this.indexFileList.isEmpty()) {
                //1、从最新的index文件开始向前找
                for (int i = this.indexFileList.size(); i > 0; i--) {
                    IndexFile f = this.indexFileList.get(i - 1);
                    boolean lastFile = i == this.indexFileList.size();
                    if (lastFile) {
                        indexLastUpdateTimestamp = f.getEndTimestamp();
                        indexLastUpdatePhyoffset = f.getEndPhyOffset();
                    }
                    //2、index文件的时间包含了begin和end的全部或者部分
                    if (f.isTimeMatched(begin, end)) {
                        //3、从文件中读取index中的offset
                        f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
                    }

                    if (f.getBeginTimestamp() < begin) {
                        break;
                    }

                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }
                }
            }
        } catch (Exception e) {
            log.error("queryMsg exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }

        return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
    }
  • 第1步,循环扫描indexFile,先扫描最近的
  • 第2步,从header中读出最小和最大的timestamp,然后看是否符合请求的时间范围
  • 第3步,从符合条件的index文件中获取offset。下面看下indexFile的读取步骤
    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {//根据key来获取消息offset
        if (this.mappedFile.hold()) {
            //1、跟生成索引时一样,找到key的slot
            int keyHash = indexKeyHashMethod(key);
            int slotPos = keyHash % this.hashSlotNum;
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;
            try {
                if (lock) {
                }
                //2、获取该槽位上的最后一条索引的序号
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {//到达最大返回条数
                            break;
                        }
                        //3、找到index的位置
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;

                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        //4、在commitlog中偏移
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        //5、相同hashcode的前一条消息的序号
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                        if (timeDiff < 0) {
                            break;
                        }

                        timeDiff *= 1000L;

                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
                        //6、Hash和time都符合条件,加入返回列表
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }

                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }
                        //7、前一条不等于0,继续读入前一条
                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }

                this.mappedFile.release();
            }
        }
    }
  • 第1步,首先根据Key获取索引对应的slot,这里逻辑跟存入索引时时一样的
  • 第2步,slot中value存储的是最后一个index的序号
  • 第3-6步,将符合条件的offset加入返回列表
  • 第7步,如果存在相同hash的前一条index,并且返回列表没到最大值,则继续向前搜索

总结

从通过index查询消息的逻辑可以看出,相同的hashCode的message都会返回客户端,如果调用这个接口通过key来查询消息,需要在客户端再做一次过滤。为了提高查询效率,在发送消息时应该在保证便于查询的同时,尽量在一段时间内让消息有不同key。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容