1.CommitLog
消息内容原文的存储文件,同Kafka一样,消息是变长的,顺序写入
生成规则:
每个文件的默认1G =1024 * 1024 * 1024,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824, 消息存储的时候会顺序写入文件,当文件满了则写入下一个文件
文件的消息单元存储结构
顺序编号 | 字段简称 | 字段大小(字节) | 字段含义 |
---|---|---|---|
1 | msgSize | 4 | 代表这个消息的大小 |
2 | MAGICCODE | 4 | MAGICCODE = daa320a7 |
3 | BODY CRC | 4 | 消息体BODY CRC 当broker重启recover时会校验 |
4 | queueId | 4 | broker中队列的id |
5 | flag | 4 | |
6 | QUEUEOFFSET | 8 | 这个值是个自增值不是真正的consume queue的偏移量,可以代表这个consumeQueue队列或者tranStateTable队列中消息的个数,若是非事务消息或者commit事务消息,可以通过这个值查找到consumeQueue中数据,QUEUEOFFSET * 20才是偏移地址;若是PREPARED或者Rollback事务,则可以通过该值从tranStateTable中查找数据 |
7 | PHYSICALOFFSET | 8 | 代表消息在commitLog中的物理起始地址偏移量 |
8 | SYSFLAG | 4 | 指明消息是事物事物状态等消息特征,二进制为四个字节从右往左数:当4个字节均为0(值为0)时表示非事务消息;当第1个字节为1(值为1)时表示表示消息是压缩的(Compressed);当第2个字节为1(值为2)表示多消息(MultiTags);当第3个字节为1(值为4)时表示prepared消息;当第4个字节为1(值为8)时表示commit消息;当第3/4个字节均为1时(值为12)时表示rollback消息;当第3/4个字节均为0时表示非事务消息; |
9 | BORNTIMESTAMP | 8 | 消息产生端(producer)的时间戳 |
10 | BORNHOST | 8 | 消息产生端(producer)地址(address:port) |
11 | STORETIMESTAMP | 8 | 消息在broker存储时间 |
12 | STOREHOSTADDRESS | 8 | 消息存储到broker的地址(address:port) |
13 | RECONSUMETIMES | 8 | 消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了,成功消费一次记录为0; |
14 | PreparedTransaction Offset | 8 | 表示是prepared状态的事物消息 |
15 | messagebodyLength | 4 | 消息体大小值 |
16 | messagebody | bodyLength | 消息体内容 |
17 | topicLength | 1 | topic名称内容大小 |
18 | topic | topicLength | topic的内容值 |
19 | propertiesLength | 2 | 属性值大小 |
20 | properties | propertiesLength | propertiesLength大小的属性数据 |
2.ConsumeQueue
ConsumeQueue中并不需要存储消息的内容,而存储的是消息在CommitLog中的offset。也就是说,ConsumeQueue其实是CommitLog的一个索引文件。
一个ConsumeQueue文件对应topic下的一个队列
ConsumeQueue是定长的结构,每1条记录固定的20个字节。很显然,Consumer消费消息的时候,要读2次:先读ConsumeQueue得到offset,再读CommitLog得到消息内容
ConsumeQueue的作用
- 通过broker保存的offset可以在ConsumeQueue中获取消息,从而快速的定位到commitLog的消息位置
- 过滤tag是也是通过遍历ConsumeQueue来实现的(先比较hash(tag)符合条件的再到consumer比较tag原文)
- 并且ConsumeQueue还能保存于操作系统的PageCache进行缓存提升检索性能
下面是我解析的ConsumeQueue
public class ConsumeQueueFileRead {
public static void main(String[] args) throws IOException {
decodeCQ(new File("D:\\00000000000000000000"));
}
static void decodeCQ(File consumeQueue) throws IOException {
FileInputStream fis = new FileInputStream(consumeQueue);
DataInputStream dis = new DataInputStream(fis);
System.out.printf(" %s %s %s\n", "offset", "size", "tag");
while (true) {
long offset = dis.readLong();
int size = dis.readInt();
long tag = dis.readLong();
if (size == 0) {
break;
}
System.out.printf(" %d %d %d\n", offset, size, tag);
}
fis.close();
}
}
3.indexFile
如果我们需要根据消息ID,来查找消息,consumequeue 中没有存储消息ID,如果不采取其他措施,又得遍历 commitlog文件了,indexFile就是为了解决这个问题的文件
由于必须以msgId或者生产者指定的消息key作为索引key,所以其结构更复杂一些,分为三部分:文件头indexHeader,一系列槽位slots,真正的索引数据index
中可以看出,indexFile结构与hash表很相似,固定数量的slot组成数组,每个slot对应一条index链,index之间通过链表方式组织在一起。slot的值对应当前slot下最新的那个index的序号,index中存储了当前slot下、当前index的前一个index序号,这就把slot下的所有index链起来了
由于indexHeader,slot,index都是固定大小,所以:
公式1:第n个slot在indexFile中的起始位置是这样:40+(n-1)*4
公式2: 第s个index在indexFile中的起始位置是这样:40+5000000*4+(s-1)*20
查询的流程:查询的传入值除了key外,还包含一个时间起始值以及截止值
4.offset
offset就是message queue的下标(和commitLog的offset不是一回事,这个offset是ConsumeQueue文件的下标/行数),一条消息进入队列下标就会+1
offset持久化
类型(父类是OffsetStore):
本地文件类型
DefaultMQPushConsumer的BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在Consumer本地
Broker代存储类型
DefaultMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore
{
"offsetTable":{
"my-topic-filter@filter_consumer_group_name":{0:6,1:8,2:5,3:6
},
"%RETRY%filter_consumer_group_name@filter_consumer_group_name":{0:0
},
"my-topic-filter@woodie":{0:0,1:2,2:0,3:0
},
"%RETRY%woodie@woodie":{0:0
}
}
}