1 概述
2 入口方法介绍
3 索引结构介绍
4 索引操作
5 索引查询
1 概述
RocketMQ中Broker在收到生产者发送的消息时,会将消息存储下来,写入CommitLog
,但是此时消息是不可消费也不可查询的。需要等待专门的服务对刚写入的消息进行Reput
操作,将消息信息记录到ConsumeQueue
和Index
中,消息记录到ConsumeQueue
完成后该消息就可被消费,消息完成索引到Index
中之后就可以根据时间戳和Key进行查询了。
DefaultMessageStore
中用于进行Reput
操作的服务实现类为ReputMessageService
,该类扩展自ServiceThread
,自身就是一个线程,其run
方法定义如下:
//ReputMessageService
//@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//sleep(1)则保证基本上时刻在尝试数据构建,
//如果速度够快,即消息刚完成写入就进行reput操作
//则消息还在缓冲中,可避免从硬盘读取数据
Thread.sleep(1);
//进行reput操作,完成ConsumeQueue和Index
//数据的构建
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}
ReputMessageService.doReput
方法具体实现不展开介绍,就是将还没有进行ConsumeQueue
和Index
构建的消息提取出来,进行ConsumeQueue
和Index
构建。
构建ConsumeQueue
则入口类为CommitLogDispatcherBuildConsumeQueue
,而Index
的构建入口类为CommitLogDispatcherBuildIndex
。
本文我们主要介绍IndexService
的实现。
2 入口方法介绍
第一节概述已经提到,Index
构建的口类为CommitLogDispatcherBuildIndex
,其源码如下:
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
//如果启用了索引,则调用indexService.buildIndex
//进行索引构建
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
3 索引结构介绍
IndexFile
是索引结构的具体实现,因为索引也会持久化到硬盘中,所以IndexFile
也通过MappedFile
进行文件写入操作,关于MappedFile
的介绍可以参考笔者文章RocketMQ源码-MappedFile介绍。
RocketMQ中的索引文件分为三个部分,分别为头部、SlotTable和index,如下图所示:
默认的Slot数量为5000000,默认的index数量为4*5000000个,可配置。
因为可能消息数目较多,一个索引文件不能保存所有的消息索引信息,所以会使用多个索引文件,索引文件的头部保存了偏移等信息,其结构如下图所示:
各字段含义如下:
- beginTimestamp : 该索引文件的第一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 0-7) 8bytes
- endTimestamp : 该索引文件的最后一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 8-15) 8bytes
- beginPhyoffset : 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息) 物理位置(pos: 16-23) 8bytes
- beginPhyoffset : 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量 (pos: 24-31) 8bytes
- hashSlotCount : 该索引文件目前的hash slot的个数 (pos: 32-35) 4bytes
- indexCount : 该索引文件目前的索引个数 (pos: 36-39) 4bytes
每个消息在索引文件尾部的占用一个节点,保存key的hash,还保存了该消息在消息文件中的物理位置,插入时间,解决hash冲突用的上一个冲突索引的位置,具体结构如下图所示:
- key hash value: message key的hash值
- phyOffset: message在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
- timeDiff: message的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
- prevIndex: hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件),每个slot位置的第一个消息的prevIndex就是0的。
上面截图和部分说明引用自文章rocketMq - index介绍
根据加入索引的时间依次放置,第一个加入索引的放在索引的第一个位置,第二个则在索引的第二个位置,以此类推。
每个消息根据key的hash值被映射到slotTable节点上,在对应的slot节点上保存的是其在索引的位置,如果发生冲突,即该slot上的值不为0,则表示已经有其他消息索引占用了该slot,那么使用链表方法处理冲突,该slot更新为最新索引的消息在索引中的位置,先前加入的冲突索引位置则记录在该索引的prevIndex字段中。
使用的映射方法如下:
//keyHash为key的hash,hashSlotNum默认500w
//也就是hash算法为求余法
int slotPos = keyHash % this.hashSlotNum;
4 索引操作
在介绍索引操作之前,我们先看下建立索引依赖的消息key字段到底是什么,其实根据源码发现就是消息的MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
属性,该属性可在生产者生产消息时自己指定,如果不指定则会在发送之前调用MessageClientIDSetter.setUniqID(msg);
进行初始化,具体如何产生唯一的ID算法这里不做介绍。
索引操作主要实现在IndexFile.putKey
方法中:
//IndexFile
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
//如果该indexFile还没有达到最大的索引数目,则
//可以继续写入,否则发挥失败
if (this.indexHeader.getIndexCount() < this.indexNum) {
//计算key的hash值
int keyHash = indexKeyHashMethod(key);
//根据hash值计算在slotTable中的位置
int slotPos = keyHash % this.hashSlotNum;
//根据在slotTable中的位置、索引头部大小(40b)、
//每个slot的大小(4b)计算该slot在文件中的物理位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
//获取slotTable该slot位置上的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
//如果值小于invalidIndex(该值为0)或者值大于
//当天存在的索引个数,则置为0
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
//根据消息的存储时间和该索引文件头部记录的
//第一个消息存储时间计算时间差
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
//计算该索引在索引链表中的位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
//先存key的hash
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
//记录该消息的在消息文件中的物理位置
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
//记录落地时间差
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
//记录上一个落在该位置上的冲突索引位置
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
//在slotTable中记录该消息在索引中的位置
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//如果是第一次索引消息,则记录开始物理偏移和
//开始时间
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
//每次追加一个新的索引,递增slotTable中slot
//数量、索引数量
//同时每次新增的索引也就是最后一个索引,记录
//最后一个索引物理偏移以及最后落地时间
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
5 索引查询
索引查询方法为IndexFile.selectPhyOffset
:
//IndexFile
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
//根据key计算hash
int keyHash = indexKeyHashMethod(key);
//根据hash值计算在slotTable中的位置
int slotPos = keyHash % this.hashSlotNum;
//根据在slotTable中的位置、索引头部大小(40b)、
//每个slot的大小(4b)计算该slot在文件中的物理位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}
//获取该slot位置上的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
//如果slot上的值为无效值或者该值大于最大索引
//数量,则表示没有符合条件的索引数据,不作任何
//操作
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
//下面的实现也比较简单,因为使用链地址法
//解决hash冲突,所以这里读取链表上的每个
//数据,如果时间满足要求并且key的hash
//一致,则加入到返回列表中
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
//消息落地时间符合要求
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
//key的hash一致且消息落地时间符合要求
if (keyHash == keyHashRead && timeMatched) {
//加入到返回列表中
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
//读取链表中的下一个冲突索引
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}