磁盘文件结构
总体结构
│ abort
│ checkpoint
│ lock
├─commitlog
│ 00000000000000000000
│ 00000000000000010240
│ 00000000000000020480
├─consumequeue
│ └─FooBar
│ ├─0
│ │ 00000000000000000000
│ ├─1
│ │ 00000000000000000000
│ ├─2
│ │ 00000000000000000000
│ ├─3
│ │ 00000000000000000000
│ ├─4
│ │ 00000000000000000000
│ ├─5
│ │ 00000000000000000000
│ ├─6
│ │ 00000000000000000000
│ └─7
│ 00000000000000000000
└─index
20210512152103416
文件格式
commitLog
消息主体和元数据的物理存储,生产者发送的消息会持久化到这个文件,文件大小默认1G,文件名称为20位整数,表示当前commitLog中消息的起始偏移量,消息是顺序追加的,文件满了,则写入下一个。Broker单个实例下所有队列共用一个commitLog,优点是主题队列很多时,能利用PageCache保证消息写入的高效,缺点是消息的消费是随机读,需要额外维护消费队列,虽然是随机读,但总体是有序的,只要消费的消息偏移量跨度不是太大,随机的那部分区域落在PageCache热点范围内,就仍能命中pagecache
编号 | 消息存储结构 | 备注 | 长度(字节数) |
---|---|---|---|
1 | TOTALSIZE | 消息大小 | 4 |
2 | MAGICCODE | 消息魔数,-626843481 | 4 |
3 | BODYCRC | 消息体的CRC32校验码,broker重启时会校验 | 4 |
4 | QUEUEID | 队列ID | 4 |
5 | FLAG | 保留字,不处理 | 4 |
6 | QUEUEOFFSET | 队列逻辑偏移量,真正在consumeQueue的偏移量=queueoffset*20 | 8 |
7 | PHYSICALOFFSET | 消息在commitLog中的物理偏移量 | 8 |
8 | SYSFLAG | 消息标志,记录消息的事务状态(Prepared,commit,rollback)、消息是否压缩,是否多个tags | 4 |
9 | BORNTIMESTAMP | 生产者生成的消息发送时间戳 | 8 |
10 | BORNHOST(IP+PORT) | 生产者地址 | 8 |
11 | STORETIMESTAMP | broker生成的存储时间戳,排它锁内生成,确保有序 | 8 |
12 | STOREHOST(IP+PORT) | 消息所在broker地址 | 8 |
13 | RECONSUMETIMES | 消费失败重试次数 | 8 |
14 | Prepared Transaction Offset | 事务消息中,消息提交后,提交消息对应的prepare消息的物理偏移量 | 8 |
15 | BODY | 消息体,前4byte为消息体大小,其余为消息体 | 4+body lenth |
16 | TOPIC | 主题,前1byte为主题大小,其余是主题 | 1+topic lenth |
17 | PROPERTIES | 消息属性,前2byte存放属性大小,其余是属性 | 2+prop lenth |
consumerQueue
消费的逻辑队列,每个主题队列对应一至多个consumerQueue。可以看成是数组元素为20byte的数组,存储了消息在commitLog中的物理偏移量、消息大小、TAGS hashcode,文件名是20位整数,表示消息的起始逻辑偏移量。逻辑队列由30万条数据组成,大小固定为30w*20=600w字节=5.72M,写满了则写下一个。构建机制是一个异步分发线程ReputMessageService
定时sleep(1),调用CommitLogDispatcherBuildConsumeQueue.dispatch
去根据commitLog中的消息,构建队列
编号 | 消息存储结构 | 长度(byte) |
---|---|---|
1 | 消息的物理偏移量 | 8 |
2 | 消息大小 | 4 |
3 | TAGS hashcode,便于快速tags过滤 | 8 |
indexFile
因为所有消息都放到commitLog中,如果要根据key来查询某个消息,就会很耗费资源,indexFile是根据消息的key建立的索引文件,方便消息查询。文件名为文件创建的时间戳,大小固定为200M左右,可存储2000w个索引。
构建机制是异步分发线程ReputMessageService
定时sleep(1),调用CommitLogDispatcherBuildIndex.dispatch
,会根据key、物理偏移量、消息存储时间来构建索引。
索引文件包含索引头、哈希槽、索引条目,索引条目由key hashcode、commitLogOffset、存储时间、下一个索引条目,通过拉链法解决哈希冲突,链表以倒序存储。key hashcode%slotNum获取应该落到哪个槽,哈希槽指向最新插入的索引条目,因为是顺序写,只能根据最近插入的2个索引条目来建立链表。
存储时间记录的是消息存储时间与索引头开始时间的差值,再除以1000
查询时,查询条件是主题名称、key、时间范围,先根据主题名称和key组合得出消息的哈希槽,再遍历链表,判断存储时间是否满足传入条件,可限制查询结果最大条数,默认64条
消息读写流程
消息存储整体架构
发送消息
CommitLog中准备向MappedFile写消息时,会先尝试获取锁,保证顺序写入,锁可以是可重入锁或自旋锁,这是一个调优参数,broker配置参数useReentrantLockWhenPutMessage。默认false,使用自旋锁,基于AtomicBoolean CAS自旋实现,如果cpu性能较差,则推荐用可重入锁。