1、前言
最近 rocketmq 系列要做起来了,一来是为了自己能多学点东西,而是为了年底的跳槽。相信我们大家看过 rocketmq 的 github 文档时,都看过这样一张图:
它告诉我们,查询 rocketmq 的 commitlog 中消息有两种方式,一种是通过 ConsumerQueue 去查找,一种是通过 key(还可以在 key 的基础上加上时间范围)去查找。
通过 ConsumerQueue 查找的方式比较简单,因为 ConsumerQueue 的每一块都是固定的,每一块由[CommitLog Offset, size, Message Tag HashCode] 组成,我们有了相对于起始地址的 CommitLog Offset 就知道从哪查找消息(baseOffSet + CommitLog Offset),再加上 size 就能读到完整的消息了。
但是官方文档对于索引如何查找消息的描述比较模糊,它只说明了怎样根据 key 构建 hashmap,但是没说到底是怎样构建的。所以我们今天根据源码以及官方图解,来说明。
2、IndexFile 索引文件
IndexFile 首先它是一个固定大小的完整 file,它由 Header、Slot Table、Index Linked List 组成,在文件组织上,它们是存在一起的。然后关于 Index Linked List 区域,它是真正存储索引的地方,而 Slot Table 是存储 HashMap 的 table(槽)。看到这,就有疑惑了,它到底是怎么组织成的 HashMap 呢?
在说明之前,我们先说一下 IndexFile 这几部分的结构。Header 占 40 Bytes
- beginTimestamp : 该索引文件的第一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 0-7) 8bytes
- endTimestamp : 该索引文件的最后一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 8-15) 8bytes
- beginPhyoffset : 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息) 物理位置(pos: 16-23) 8bytes
- endPhyoffset : 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量 (pos: 24-31) 8bytes
- hashSlotCount : 该索引文件目前的hash slot的个数 (pos: 32-35) 4bytes
- indexCount : 该索引文件目前的索引个数 (pos: 36-39) 4bytes
Slot Table 每个 slot 占 4 Byte,有500w 个,共占用 4 * 500w Byte。slot 就放一个值,即当前 hashmap 此槽中最后一个的节点的位置(只是形象的说位置,其实不是存的地址,是计算出来的,实际上存的是最后一个索引在所有索引顺序上是第几个)。
Index Linked List 每个占用 20 Byte,共有 2000w个,共占用 20 * 2000w Byte。每个索引区域存的值分别是:
- key hash value: message key的hash值(key = topic + “#” + KEY,然后针对 key 计算 hashcode)
- phyOffset: message在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
- timeDiff: message的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
- prevIndex: hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件),每个slot位置的第一个消息的prevIndex就是0的。
3、流程
putkey 的源码为:
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
// 1、计算 key 的 hashCode
int keyHash = indexKeyHashMethod(key);
// 2、算出在哪个槽,hashSlotNum 是 500w
int slotPos = keyHash % this.hashSlotNum;
// 3、算出要槽在哪个位置,Header + (slotPos * 每个 slot 大大小)
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
// 3、取出 slot 处的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 4、算出要存的 index 的值,跳过 Header + slot Table + 当前已经存的索引数 * 每个索引的大小
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 5、开始写入20 Byte,分别为 keyHash、phyOffset、timeDiff、slotValue(这个很重要)
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 6、在 slot 中写入当前 index 是第几个
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
根据 key 或者再加上时间区间查找消息的值为:
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
// 1、计算 key 的 hashCode
int keyHash = indexKeyHashMethod(key);
// 2、算出在哪个槽,hashSlotNum 是 500w
int slotPos = keyHash % this.hashSlotNum;
// 3、算出要槽在哪个位置,Header + (slotPos * 每个 slot 的大小)
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}
// 4、取出 slot 处的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
// 从当前 index 不停往前找
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
// 5、算出要存的 index 的值,跳过 Header + slot Table + 当前已经存的索引数 * 每个索引的大小
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
// 这个为什么是 pre,因为 index 是递增的,最后一位虽然存的是 slot 的值,但是也是上一个 index 的 slot
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
// 判断是否符合是当前的 key 并且符合区间
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
// 6、往上一位找
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
假设我们初始的 IndexFile 如下:
然后我们给他插入2个 key,变成了这样:
假设插入第三个 key 后,与位置2冲突了,那么此时第3个 index 应该记录第2个 slot 的值,然后再第2个 slot 后写入,并把第2个 slot 的值改成2:
然后我们查找第3个 key 的时候,首先找到第3个 key 的位置,然后发现 hashCode 相同,那么便把它加入;然后取出 slotValue,计算上一个的位置,即第二个,然后把它取出来,发现 key 不一样,不加入;接着取出 slotValue 的位置,发现值是0,即前面没有 index 了,然后直接返回了。
总结来说,index 索引文件模仿了 hashmap 的实现方式,在 hash 冲突时也是使用链表的方式防止冲突。但是这个链表只是逻辑上的,实际上是这样的。
4、后记
多思考,多对比。还有,项目的单测很重要,看不懂可跑跑单测。