RocketMQ消息存储
1 CommitLog
要想知道RocketMQ如何存储消息,我们先看看CommitLog。在RocketMQ中,所有topic的消息都存储在一个称为CommitLog的文件中,该文件默认最大为1GB,超过1GB后会轮到下一个CommitLog文件。通过CommitLog,RocketMQ将所有消息存储在一起,以顺序IO的方式写入磁盘,充分利用了磁盘顺序写减少了IO争用提高数据存储的性能,
消息在CommitLog中的存储格式如下:
· 4字节表示消息的长度,消息的长度是整个消息体所占用的字节数的大小
· 4字节的魔数,是固定值,有MESSAGE_MAGIC_CODE和BLANK_MAGIC_CODE
· 4字节的CRC,是消息体的校验码,用于防止网络、硬件等故障导致数据与发送时不一样带来的问题
· 4字节的queueId,表示消息发到了哪个MessageQueue(逻辑上相当于kakka的partition)
· 4字节的flag,flag是创建Message对象时由生产者通过构造器设定的flag值
· 8字节的queueOffset,表示在queue中的偏移量
· 8字节的physicalPosition,表示在存储文件中的偏移量
· 4字节sysFlag,是生产者相关的信息标识,具体生产逻辑可以看相关代码
· 8字节消息创建时间
· 8字节消息生产者的host
· 8字节消息存储时间
· 8字节消息存储的机器的host
· 4字节表示重复消费次数
· 8字节消息事务相关偏移量
· 4字节表示消息体的长度
· 消息休,不是固定长度,和前面的4字节的消息体长度值相等
· 1字节表示topic的长度,因此topc的长度最多不能超过127个字节,超过的话存储会出错(有前置校验)
· Topic,存储topic,因为topic不是固定长度,所以这里所占的字节是不固定的,和前一个表示topic长度的字节的值相等
· 2字节properties的长度,properties是创建消息时添加到消息中的,因此,添加在消息中的poperties不能太多太大,所有的properties的kv对在拼接成string后,所占的字节数不能超过2^15-1
· Properties的内容,也不是固定长度,和前面的2字节properties长度的值相同
2 ConsumeQueue
一个ConsumeQueue表示一个topic的一个queue,类似于kafka的一个partition,但是rocketmq在消息存储上与kafka有着非常大的不同,RocketMQ的ConsumeQueue中不存储具体的消息,具体的消息由CommitLog存储,ConsumeQueue中只存储路由到该queue中的消息在CommitLog中的offset,消息的大小以及消息所属的tag的hash(tagCode),一共只占20个字节,整个数据包如下:
3 消息存储方式
前文已经描述过,RocketMQ的消息存储由CommitLog和ConsumeQueue两部分组成,其中CommitLog用于存储原始的消息,而ConsumeQueue用于存储投递到某一个queue中的消息的位置信息,消息的存储如下图所示:
消费者在读取消息时,先读取ConsumeQueue,再通过ConsumeQueue中的位置信息读取CommitLog,得到原始的消息。
4 消息存储与kafka的对比
Kafka中,每个partition有独立的消息存储,投递到每个partition的消息,存储在partition自己的存储文件中,示意图如下:
在消息的存储上,RocketMQ与Kafka的主要区别在于,RocketMQ将所有消息存储在同一个CommitLog中且ConsumeQueue中每个消息只存储20个字节的消息位置信息,而Kafka将每个partition的消息分开存储,这导致RocketMQ单个broker能支持更多的topic和partition。
因为在RocketMQ中,所有消息都存储在同一个文件中,这使得RocketMQ的消息存储是磁盘的顺序写,而kafka将消息按partition存储在不同的文件中,因此kafka在消息存储上是随机IO,磁盘的顺序IO要比随机IO快得多,顺序IO可以接近内存的速度。将partition的数量非常大时,kafka中的随机IO将非常多,这将导致kafka在所有topic的partition变大了之后broker性能会明显下降。
但是RocketMQ的ConsumeQueue也是随机IO,为何相比kafka能支持更多的partition呢,原因是RocketMQ通过MappedFile的方式读写ConsumeQueue,操作系统对内存映射文件有page cache而ConsumeQueue中的数据都非常小(只有20bytes),读写几乎都是page cache的操作,因此虽然是随机IO但效率也非常高。
5 源码阅读
RocketMQ中的相关源码在DefaultMessageStore类、CommitLog类和ConsumeQueue类中,其中DefaultMessageStore的内部类ReputMessageService会将存储到CommitLog中的消息写入到ConsumeQueue:
从doDispatch方法进去,依次进入调用链,在DefaultMessageStore类的内部类CommitLogDispatcherBuildConsumeQueue中,我们可以看到将消息的位置信息写入到ConsumeQueue中的相关代码:
而putMessagePositionInfo方法的实现就非常简单了,它先选出正确的MessageQueue,然后调用其方法写入数据:
最后附上ConsumeQueue中的关键代码片断,表示如何写入那20个字节的:
Long + int + long 一共20字节。