说明
这个是consumeQueue类的拓展,记录一些不重要的信息
在MessageStoreConfig#enableConsumeQueueExt为true时生效(默认false)
存储路径默认 {user.home}/store/consumequeue_ext/{topic}/{queueId}/
里面利用MappedFileQueue管理一个MappedFile的队列,进行put,get,truncate,recover等操作
每个mappedFile默认最大48M,存放CqExtUnit(存储单元,由头部和内容两部分组成,详见下面内部类解析),标志位 short -1(代表数据结尾),尾部预留4个字节作为END_BLANK
整个consumequeue_ext记录的文件大小不得超过属性 MAX_REAL_OFFSET(见属性说明)
PS:ConsumeQueue后面再讲
属性
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final MappedFileQueue mappedFileQueue;//对应的mappedFileQueue
private final String topic;
private final int queueId;
private final String storePath;//存储路径,默认 user.home/store/consumequeue_ext
//consumeQueueExt文件大小,默认48M
private final int mappedFileSize;
private ByteBuffer tempContainer;
public static final int END_BLANK_DATA_LENGTH = 4;//结尾需要预留4字节空白
/**
* Addr can not exceed this value.For compatible.
*/
public static final long MAX_ADDR = Integer.MIN_VALUE - 1L;//extAddr的上限
public static final long MAX_REAL_OFFSET = MAX_ADDR - Long.MIN_VALUE;//ext文件记录的偏移量上限
备注:ConsumeQueue需要Topic和queueId信息,ConsumeQueueExt因此也需要
内部类 CqExtUnit
作为mappedFile里面存放的数据结构,由头部和内容两部分组成
头部为20字节,依次为short size,long tagsCode, long msgStoreTime, short bitMapSize 共 2+8+8+2=20
内容为byte[] filterBitMap;
其中 bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
size = 20 + bitMapSize ,规定不能超过32k
CqExtUnit属性
//size(short) + tagsCode(long) + msgStoreTime(long) + bitMapSize(short) = 20字节
public static final short MIN_EXT_UNIT_SIZE
= 2 * 1 // size, 32k max
+ 8 * 2 // msg time + tagCode
+ 2; // bitMapSize
public static final int MAX_EXT_UNIT_SIZE = Short.MAX_VALUE;//一个unit最大32k
/**
* unit size
*/
private short size;
/**
* has code of tags
*/
private long tagsCode;
/**
* the time to store into commit log of message
*/
private long msgStoreTime;
/**
* size of bit map
*/
private short bitMapSize;
/**
* filter bit map
*/
private byte[] filterBitMap;
CqExtUnit 构造函数
public CqExtUnit() {
}
public CqExtUnit(Long tagsCode, long msgStoreTime, byte[] filterBitMap) {
this.tagsCode = tagsCode == null ? 0 : tagsCode;
this.msgStoreTime = msgStoreTime;
this.filterBitMap = filterBitMap;
this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);//size的值 为 20字节(头信息) + bitMapSize(实际内容)
}
CqExtUnit函数
object相关函数不讲,下面主要注意read函数和write函数即可
/**
* build unit from buffer from current position.
* 将buffer数据读入到内存记录的数据结构中
*/
private boolean read(final ByteBuffer buffer) {
if (buffer.position() + 2 > buffer.limit()) {//还没有写size
return false;
}
this.size = buffer.getShort();
if (this.size < 1) {
return false;
}
this.tagsCode = buffer.getLong();
this.msgStoreTime = buffer.getLong();
this.bitMapSize = buffer.getShort();
if (this.bitMapSize < 1) {
return true;
}
if (this.filterBitMap == null || this.filterBitMap.length != this.bitMapSize) {
this.filterBitMap = new byte[bitMapSize];
}
buffer.get(this.filterBitMap);
return true;
}
/**
* Only read first 2 byte to get unit size.
* <p>
* if size > 0, then skip buffer position with size.
* </p>
* <p>
* if size <= 0, nothing to do.
* </p>
*/
//如果size>0,buffer跳过对应长度的大小
private void readBySkip(final ByteBuffer buffer) {
ByteBuffer temp = buffer.slice();
short tempSize = temp.getShort();//上面slice使得,这里temp读取 不会导致buffer.position变化
this.size = tempSize;
if (tempSize > 0) {
buffer.position(buffer.position() + this.size);
}
}
//内存数据结构 放到 ByteBuffer中,转化成byte[]返回
private byte[] write(final ByteBuffer container) {
this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
this.size = (short) (MIN_EXT_UNIT_SIZE + this.bitMapSize);
ByteBuffer temp = container;
if (temp == null || temp.capacity() < this.size) {
temp = ByteBuffer.allocate(this.size);
}
temp.flip();
temp.limit(this.size);
temp.putShort(this.size);
temp.putLong(this.tagsCode);
temp.putLong(this.msgStoreTime);
temp.putShort(this.bitMapSize);
if (this.bitMapSize > 0) {
temp.put(this.filterBitMap);
}
return temp.array();
}
/**
* Calculate unit size by current data.
* 实际大小为 头部长度20字节 + map的长度
*/
private int calcUnitSize() {
int sizeTemp = MIN_EXT_UNIT_SIZE + (filterBitMap == null ? 0 : filterBitMap.length);
return sizeTemp;
}
//这里其实size没有更新,上层确保调用calcUnitSize函数
public void setFilterBitMap(final byte[] filterBitMap) {
this.filterBitMap = filterBitMap;
// not safe transform, but size will be calculate by #calcUnitSize
this.bitMapSize = (short) (filterBitMap == null ? 0 : filterBitMap.length);
}
ConsumeQueueExt函数
flush,destroy,checkSelf,load函数直接调用的mappedFileQueue对应方法,不讲
getMaxAddress,getMinAddress只在测试类用,不讲
构造函数
/**
* Constructor.
*
* @param topic topic
* @param queueId id of queue
* @param storePath root dir of files to store.
* @param mappedFileSize file size
* @param bitMapLength bit map length.
*/
public ConsumeQueueExt(final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final int bitMapLength) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.topic = topic;
this.queueId = queueId;
String queueDir = this.storePath
+ File.separator + topic
+ File.separator + queueId;//对应queueId的目录是 user.home/store/consumequeue_ext/topic/queueId/
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
if (bitMapLength > 0) {
this.tempContainer = ByteBuffer.allocate(
bitMapLength / Byte.SIZE
);
}
}
注意每个ConsumeQueueExt的目录为 {user.home}/store/consumequeue_ext/{topic}/{queueId}/即可
编解码相关
public static boolean isExtAddr(final long address) {
return address <= MAX_ADDR;
}
public long unDecorate(final long address) {
if (isExtAddr(address)) {
return address - Long.MIN_VALUE;
}
return address;
}
public long decorate(final long offset) {
if (!isExtAddr(offset)) {
return offset + Long.MIN_VALUE;
}
return offset;
}
就是put到mappedFile时,返回的是编码之后的addr(即decorate,一个负数)
从mappedFile利用addr获取CqExtUnit时,要进行解码(即unDecorate,一个正数)
并不清楚为什么要这样设计,猜测是历史兼容性原因
get相关
两个函数,就是根据编码之后的addr进行解码,从mappedFileQueue中找对应的mappedFile,
通过%mappedFileSize拿到偏移,继而拿到buffer
读取buffer来给cqExtUnit的数据结构赋值
/**
* Get data from buffer.
*
* @param address less than 0
*/
//根据修饰后的addr,拿到mappedFile中对应位置描述的CqExtUnit
public CqExtUnit get(final long address) {
CqExtUnit cqExtUnit = new CqExtUnit();
if (get(address, cqExtUnit)) {
return cqExtUnit;
}
return null;
}
/**
* Get data from buffer, and set to {@code cqExtUnit}
*
* @param address less than 0
*/
//根据修饰后的addr,从MappedFileQueue中找到对应的file
//通过%mappedFileSize拿到偏移,继而拿到buffer
//读取buffer来给cqExtUnit的数据结构赋值
public boolean get(final long address, final CqExtUnit cqExtUnit) {
if (!isExtAddr(address)) {
return false;
}
final int mappedFileSize = this.mappedFileSize;
final long realOffset = unDecorate(address);//真实偏移
//realOffset找到对应的MappedFile
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(realOffset, realOffset == 0);
if (mappedFile == null) {
return false;
}
int pos = (int) (realOffset % mappedFileSize);
SelectMappedBufferResult bufferResult = mappedFile.selectMappedBuffer(pos);//到特定的位置读取 buffer
if (bufferResult == null) {
log.warn("[BUG] Consume queue extend unit({}) is not found!", realOffset);
return false;
}
boolean ret = false;
try {
ret = cqExtUnit.read(bufferResult.getByteBuffer());//读buffer到cqExtUnit中的内存中
} finally {
bufferResult.release();//引用-1
}
return ret;
}
put相关
put函数,存放一个cqExtUnit,返回编码后的地址
fullFillToEnd函数,wrotePosition存放-1代表结束,更新wrotePosition为mappedFileSize代表该mappedFile写结束
/**
* Save to mapped buffer of file and return address.
* <p>
* Be careful, this method is not thread safe.
* </p>
*
* @return success: < 0: fail: >=0
*/
//存放一个cqExtUnit,返回编码后的地址
public long put(final CqExtUnit cqExtUnit) {
final int retryTimes = 3;
try {
int size = cqExtUnit.calcUnitSize();//计算大小
if (size > CqExtUnit.MAX_EXT_UNIT_SIZE) {//太大了,超过了32k
log.error("Size of cq ext unit is greater than {}, {}", CqExtUnit.MAX_EXT_UNIT_SIZE, cqExtUnit);
return 1;
}
if (this.mappedFileQueue.getMaxOffset() + size > MAX_REAL_OFFSET) {//ext文件记录数据过多
log.warn("Capacity of ext is maximum!{}, {}", this.mappedFileQueue.getMaxOffset(), size);
return 1;
}
// unit size maybe change.but, the same most of the time.
if (this.tempContainer == null || this.tempContainer.capacity() < size) {
this.tempContainer = ByteBuffer.allocate(size);
}
for (int i = 0; i < retryTimes; i++) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);//之前最新的写满了,自动创建一个
}
if (mappedFile == null) {
log.error("Create mapped file when save consume queue extend, {}", cqExtUnit);
continue;
}
final int wrotePosition = mappedFile.getWrotePosition();
final int blankSize = this.mappedFileSize - wrotePosition - END_BLANK_DATA_LENGTH;
// check whether has enough space.
if (size > blankSize) {//空间不够了
fullFillToEnd(mappedFile, wrotePosition);//标记这个mappedFile为写满了
log.info("No enough space(need:{}, has:{}) of file {}, so fill to end",
size, blankSize, mappedFile.getFileName());
continue;//重新再试
}
//mappedFile追加消息
if (mappedFile.appendMessage(cqExtUnit.write(this.tempContainer), 0, size)) {
return decorate(wrotePosition + mappedFile.getFileFromOffset());
}
}
} catch (Throwable e) {
log.error("Save consume queue extend error, " + cqExtUnit, e);
}
return 1;
}
//wrotePosition存放-1代表结束,更新wrotePosition为mappedFileSize代表该mappedFile写满
protected void fullFillToEnd(final MappedFile mappedFile, final int wrotePosition) {
ByteBuffer mappedFileBuffer = mappedFile.sliceByteBuffer();
mappedFileBuffer.position(wrotePosition);
// ending.
mappedFileBuffer.putShort((short) -1);//-1标记结束位
mappedFile.setWrotePosition(this.mappedFileSize);//标记写满了,isFull
}
recover相关
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (mappedFiles == null || mappedFiles.isEmpty()) {
return;
}
// load all files, consume queue will truncate extend files.
int index = 0;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
CqExtUnit extUnit = new CqExtUnit();
while (true) {
extUnit.readBySkip(byteBuffer);//在一个mappedFile之间skip
// check whether write sth.
if (extUnit.getSize() > 0) {
mappedFileOffset += extUnit.getSize();
continue;
}
//最后肯定getSize返回-1,是文件尾部写入的short标志位
index++;
if (index < mappedFiles.size()) {
mappedFile = mappedFiles.get(index);//下一个mappedFile
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("Recover next consume queue extend file, " + mappedFile.getFileName());
continue;
}
log.info("All files of consume queue extend has been recovered over, last mapped file "
+ mappedFile.getFileName());
break;
}
processOffset += mappedFileOffset;//拿到最后一个mappedFile最后的绝对偏移
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);
}
这里就是为了获取最后一个mappedFile#getFileFromOffset(); 而且这个函数,设置标志位然后truncate,见吐槽
truncate相关
truncateByMinAddress: 删除minAddress之前的mappedFile
truncateByMaxAddress: 删除maxAddress之后的mappedFile以及同一个mappedFile之后的记录
/**
* Delete files before {@code minAddress}.
*
* @param minAddress less than 0
*/
//删除minAddress之前的mappedFile
public void truncateByMinAddress(final long minAddress) {
if (!isExtAddr(minAddress)) {
return;
}
log.info("Truncate consume queue ext by min {}.", minAddress);
List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
final long realOffset = unDecorate(minAddress);
for (MappedFile file : mappedFiles) {
long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
if (fileTailOffset < realOffset) {
log.info("Destroy consume queue ext by min: file={}, fileTailOffset={}, minOffset={}", file.getFileName(),
fileTailOffset, realOffset);
if (file.destroy(1000)) {
willRemoveFiles.add(file);
}
}
}
this.mappedFileQueue.deleteExpiredFile(willRemoveFiles);
}
/**
* Delete files after {@code maxAddress}, and reset wrote/commit/flush position to last file.
*
* @param maxAddress less than 0
*/
//删除maxAddress之后的mappedFile以及同一个mappedFile之后的记录
public void truncateByMaxAddress(final long maxAddress) {
if (!isExtAddr(maxAddress)) {
return;
}
log.info("Truncate consume queue ext by max {}.", maxAddress);
CqExtUnit cqExtUnit = get(maxAddress);//拿到mappedFile对应的CqExtUnit记录,用来获取size属性
if (cqExtUnit == null) {
log.error("[BUG] address {} of consume queue extend not found!", maxAddress);
return;
}
final long realOffset = unDecorate(maxAddress);
//之后的偏移全部清空
this.mappedFileQueue.truncateDirtyFiles(realOffset + cqExtUnit.getSize());
}
思考
结束标记,与END_BLANK属性
fullFillToEnd 函数中,一个mappedFile剩余空间不够了,此时会放置short -1,来填充,short 2字节就够了。
文件最后一定会留END_BLANK即4字节,在put函数中计算
final int blankSize = this.mappedFileSize - wrotePosition - END_BLANK_DATA_LENGTH;
时体现
CqExtUnit 与 IndexFile和IndexHeader
CqExtUnit其实也是有头部和内容两部分的,后IndexFile以及IndexHeader 这点比较像
问题
注释中 week reliable?
这个是怎么体现的?
CqExtUnit作用是什么
filterBitMap以及tagsCode,值到底是什么,有什么用,暂时不是很清楚,涉及ConsumeQueue,等ConsumeQueue看懂了再补充
吐槽
ConsumeQueueExt.CqExtUnit#setFilterBitMap 以及 calcUnitSize函数
一定要再调用了setFilterBitMap 函数之后,再自行调用calcUnitSize
确保bitMapSize变更之后,再单独变更size,不如直接在setFilterBitMap 这里设置size好了
unDecorate & decorate
不明白为什么要编解码,很绕,感觉是历史,兼容性等原因
recover函数
这里并不懂为什么要readBySkip各个mappedFile直到最后一个,不如直接mappedFileQueue#getLastMappedFile不就好了吗,
而且processOffset已经是最大的offset了,那么调用
this.mappedFileQueue.truncateDirtyFiles(processOffset);
还有什么意义呢,看ConsumeQueueExtTest#testRecovery也没有删除后续的file,因为本来就没有后续的file了
不知道要这个函数干嘛
refer
org.apache.rocketmq.store.ConsumeQueueExtTest