RocketMQ源码-Index索引介绍


1 概述
2 入口方法介绍
3 索引结构介绍
4 索引操作
5 索引查询

1 概述

RocketMQ中Broker在收到生产者发送的消息时,会将消息存储下来,写入CommitLog,但是此时消息是不可消费也不可查询的。需要等待专门的服务对刚写入的消息进行Reput操作,将消息信息记录到ConsumeQueueIndex中,消息记录到ConsumeQueue完成后该消息就可被消费,消息完成索引到Index中之后就可以根据时间戳和Key进行查询了。

DefaultMessageStore中用于进行Reput操作的服务实现类为ReputMessageService,该类扩展自ServiceThread,自身就是一个线程,其run方法定义如下:

//ReputMessageService
//@Override
public void run() {
    DefaultMessageStore.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            //sleep(1)则保证基本上时刻在尝试数据构建,
            //如果速度够快,即消息刚完成写入就进行reput操作
            //则消息还在缓冲中,可避免从硬盘读取数据
            Thread.sleep(1);
            //进行reput操作,完成ConsumeQueue和Index
            //数据的构建
            this.doReput();
        } catch (Exception e) {
            DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

ReputMessageService.doReput方法具体实现不展开介绍,就是将还没有进行ConsumeQueueIndex构建的消息提取出来,进行ConsumeQueueIndex构建。

构建ConsumeQueue则入口类为CommitLogDispatcherBuildConsumeQueue,而Index的构建入口类为CommitLogDispatcherBuildIndex

本文我们主要介绍IndexService的实现。

2 入口方法介绍

第一节概述已经提到,Index构建的口类为CommitLogDispatcherBuildIndex,其源码如下:

class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        //如果启用了索引,则调用indexService.buildIndex
        //进行索引构建
        if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
            DefaultMessageStore.this.indexService.buildIndex(request);
        }
    }
}

3 索引结构介绍

IndexFile是索引结构的具体实现,因为索引也会持久化到硬盘中,所以IndexFile也通过MappedFile进行文件写入操作,关于MappedFile的介绍可以参考笔者文章RocketMQ源码-MappedFile介绍

RocketMQ中的索引文件分为三个部分,分别为头部、SlotTable和index,如下图所示:

索引文件结构.jpg

默认的Slot数量为5000000,默认的index数量为4*5000000个,可配置。

因为可能消息数目较多,一个索引文件不能保存所有的消息索引信息,所以会使用多个索引文件,索引文件的头部保存了偏移等信息,其结构如下图所示:

index header.jpg

各字段含义如下:

  1. beginTimestamp : 该索引文件的第一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 0-7) 8bytes
  2. endTimestamp : 该索引文件的最后一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 8-15) 8bytes
  3. beginPhyoffset : 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息) 物理位置(pos: 16-23) 8bytes
  4. beginPhyoffset : 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量 (pos: 24-31) 8bytes
  5. hashSlotCount : 该索引文件目前的hash slot的个数 (pos: 32-35) 4bytes
  6. indexCount : 该索引文件目前的索引个数 (pos: 36-39) 4bytes

每个消息在索引文件尾部的占用一个节点,保存key的hash,还保存了该消息在消息文件中的物理位置,插入时间,解决hash冲突用的上一个冲突索引的位置,具体结构如下图所示:

index结构.jpg
  1. key hash value: message key的hash值
  2. phyOffset: message在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
  3. timeDiff: message的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
  4. prevIndex: hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件),每个slot位置的第一个消息的prevIndex就是0的。

上面截图和部分说明引用自文章rocketMq - index介绍

根据加入索引的时间依次放置,第一个加入索引的放在索引的第一个位置,第二个则在索引的第二个位置,以此类推。

每个消息根据key的hash值被映射到slotTable节点上,在对应的slot节点上保存的是其在索引的位置,如果发生冲突,即该slot上的值不为0,则表示已经有其他消息索引占用了该slot,那么使用链表方法处理冲突,该slot更新为最新索引的消息在索引中的位置,先前加入的冲突索引位置则记录在该索引的prevIndex字段中。

使用的映射方法如下:

//keyHash为key的hash,hashSlotNum默认500w
//也就是hash算法为求余法
int slotPos = keyHash % this.hashSlotNum;

4 索引操作

在介绍索引操作之前,我们先看下建立索引依赖的消息key字段到底是什么,其实根据源码发现就是消息的MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX属性,该属性可在生产者生产消息时自己指定,如果不指定则会在发送之前调用MessageClientIDSetter.setUniqID(msg);进行初始化,具体如何产生唯一的ID算法这里不做介绍。

索引操作主要实现在IndexFile.putKey方法中:

//IndexFile
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    //如果该indexFile还没有达到最大的索引数目,则
    //可以继续写入,否则发挥失败
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        //计算key的hash值
        int keyHash = indexKeyHashMethod(key);
        //根据hash值计算在slotTable中的位置
        int slotPos = keyHash % this.hashSlotNum;
        //根据在slotTable中的位置、索引头部大小(40b)、
        //每个slot的大小(4b)计算该slot在文件中的物理位置
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;

        try {

            // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
            // false);
            //获取slotTable该slot位置上的值
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            //如果值小于invalidIndex(该值为0)或者值大于
            //当天存在的索引个数,则置为0
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }
            //根据消息的存储时间和该索引文件头部记录的
            //第一个消息存储时间计算时间差
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

            timeDiff = timeDiff / 1000;

            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }
            //计算该索引在索引链表中的位置
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;

            //先存key的hash
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            //记录该消息的在消息文件中的物理位置
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            //记录落地时间差
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            //记录上一个落在该位置上的冲突索引位置
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

            //在slotTable中记录该消息在索引中的位置
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

            //如果是第一次索引消息,则记录开始物理偏移和
            //开始时间
            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }

            //每次追加一个新的索引,递增slotTable中slot
            //数量、索引数量
            //同时每次新增的索引也就是最后一个索引,记录
            //最后一个索引物理偏移以及最后落地时间
            this.indexHeader.incHashSlotCount();
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);

            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
            + "; index max num = " + this.indexNum);
    }

    return false;
}

5 索引查询

索引查询方法为IndexFile.selectPhyOffset:

//IndexFile
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
    final long begin, final long end, boolean lock) {
    if (this.mappedFile.hold()) {
        //根据key计算hash
        int keyHash = indexKeyHashMethod(key);
        //根据hash值计算在slotTable中的位置
        int slotPos = keyHash % this.hashSlotNum;
        //根据在slotTable中的位置、索引头部大小(40b)、
        //每个slot的大小(4b)计算该slot在文件中的物理位置
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        FileLock fileLock = null;
        try {
            if (lock) {
                // fileLock = this.fileChannel.lock(absSlotPos,
                // hashSlotSize, true);
            }

            //获取该slot位置上的值
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            // if (fileLock != null) {
            // fileLock.release();
            // fileLock = null;
            // }
            //如果slot上的值为无效值或者该值大于最大索引
            //数量,则表示没有符合条件的索引数据,不作任何
            //操作
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                || this.indexHeader.getIndexCount() <= 1) {
            } else {
                //下面的实现也比较简单,因为使用链地址法
                //解决hash冲突,所以这里读取链表上的每个
                //数据,如果时间满足要求并且key的hash
                //一致,则加入到返回列表中
                for (int nextIndexToRead = slotValue; ; ) {
                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }

                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + nextIndexToRead * indexSize;

                    int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                    long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);

                    long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                    int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);

                    if (timeDiff < 0) {
                        break;
                    }

                    timeDiff *= 1000L;

                    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                    //消息落地时间符合要求
                    boolean timeMatched = (timeRead >= begin) && (timeRead <= end);

                    //key的hash一致且消息落地时间符合要求
                    if (keyHash == keyHashRead && timeMatched) {
                        //加入到返回列表中
                        phyOffsets.add(phyOffsetRead);
                    }

                    if (prevIndexRead <= invalidIndex
                        || prevIndexRead > this.indexHeader.getIndexCount()
                        || prevIndexRead == nextIndexToRead || timeRead < begin) {
                        break;
                    }
                    //读取链表中的下一个冲突索引
                    nextIndexToRead = prevIndexRead;
                }
            }
        } catch (Exception e) {
            log.error("selectPhyOffset exception ", e);
        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();
                } catch (IOException e) {
                    log.error("Failed to release the lock", e);
                }
            }

            this.mappedFile.release();
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351