前言:
从本节开始,我们将分七个章节从底层源码来分析Rocket mq 是如何实现自己的消息持久化的:
第(1)章节主要总从体上分析rmq消息的存储格式和文件组织以及实现上的技术选用。
接下来的(2)、(3)、(4)、(5)、(6)、(7)章分别从源码上深入分析消息的具体内容、高可用、索引、存储文件删除、启动存储状态恢复以及业务消息查询是如何实现的。
以下用4小节分析:(1)存储总概。
1、消息【内容】
说明:
consumer客户端消费的就是这些【内容】。消息会按照broker接收的顺序串行写入存储文件,在写入文件之前,先判断当前文件的剩余空间是否能够写入该条完整的消息,如果不足,就创建下一个文件,把这条消息写入新创建的存储文件。
文件组织:
存储地址:user_root_path/store/commitlog/{fileName}
文件大小:1G = 1073741824 byte
文件起名:20位数字填充位
-
user_root_path/store/commitlog/
-00000000000000000000
-00000000001073741824
-00000000002147483648
这里说明一下文件如何起名:
我们知每个存储道文件的大小为1G,文件的起名方式为,文件所在的物理存储位置映射的起始地址。
第n个文件的起名方式为:
(n-1) * 1073741824 ,然后左填充0,直至20位
消息体的存储格式及内容
存储序列 | 字段名称 | 长度 | 注析 |
---|---|---|---|
1 | TOTALSIZE | 4字节 | 消息总长度 |
2 | MAGICCODE | 4字节 | 消息魔数 |
3 | BODYCRC | 4字节 | 消息完整校验码 |
4 | QUEUEID | 4字节 | 所在队列 |
5 | FLAG | 4字节 | 一般标识 |
6 | QUEUEOFFSET | 8字节 | 所在队列逻辑位移 |
7 | PHYSICALOFFSET | 8字节 | 物理位移 |
8 | SYSFLAG | 4字节 | 系统标识 |
9 | BORNTIMESTAMP | 8字节 | producer端的消息生成时间戳 |
10 | BORNHOST | 8字节 | producer端的通讯address |
11 | STORETIMESTAMP | 8字节 | broker端存储时间戳 |
12 | STOREHOSTADDRESS | 8字节 | broker端地址 |
13 | RECONSUMETIMES | 4字节 | 重消费次数 |
14 | Prepared Transaction Offset | 8字节 | 预提交位移 |
15 | BODY LENGTH | 4字节 | 消息体长度 |
16 | BODY | value(BODY LENGTH) 字节 | 具体消息体内容 |
17 | TOPIC LENGTH | 4字节 | topic 长度 |
18 | TOPIC | value(TOPIC LENGTH) 字节 | 具体topic内容 |
19 | PROPERTIES LENGTH | 2字节 | 属性长度 |
20 | PROPERTIES | value(PROPERTIES LENGTH) 字节 | 属性内容 |
解析:
3-BODYCRC: CRC是一种数据错误检查技术,它可以确保最初写入镜像文件的数据与从镜像文件中使用>的数据保持一致,以确保在将该文件还原到磁盘时能够检测到它是否已经损坏。4-QUEUEID:rockmq中 topic-brokerName-queueId可以确定集群中的一条具体的队列。
5-FLAG:由producer消息生产方指定的一个int类型的标志,默认为0。
6-QUEUEOFFSET:消息所在队列中的逻辑位移,即第几条消息。
7-PHYSICALOFFSET:消息存储的物理位移,即存储的开始位置。
8-SYSFLAG:系统标识位;例如消息是否压缩,消息的类型(普通类型消息或是事务类型)。
20-PROPERTIES:producer端指定的消息属性,以字符1和2作为分割符,如下key-a1value-a2key-b1valueb-2拼接字符串,其中性客户端生成的消息id是必有属,PROPERTIES用于构建消息的【key查询索引】。
2、消息的【逻辑位移索引】存储格式
说明:
类比于List的index,conusmer端可以通过该index,查询出具体的消息【内容】。
例如我们consumer指定消费哪个broker下的topic,哪条队列(queueId)的第n条消息,broker端会根据topic-queueId 以及逻辑索引n,先查询出【逻辑位移索引】的具体内容,然后通过【逻辑位移索引】存储对应的消息【内容】的物理位移,最后再找出具体的消息内容。
文件组织:
存储地址:user_root_path/store/consumequeue/{topic}/{queueId}/{fileName}
文件大小:300000 * 20 byte
文件起名:20位数字填充位
第n个文件的起名方式为:
(n-1) * 300000 * 20 ,然后左填充0,直至20位
-
user_root_path/store/consumequeue/
- topic1
- 0
-00000000000000000000
-00000000000006000000
-00000000000012000000 - 1
-00000000000000000000
-00000000000006000000 - 2
-00000000000000000000
- 0
- topic1
消息的【逻辑位移索引】存储格式及内容
存储序列 | 字段名称 | 长度 | 注析 |
---|---|---|---|
1 | offset | 8字节 | 消息存储的物理位移 |
2 | size | 4字节 | 消息总长度 |
2 | tagsCode | 8字节 | producer端指定消息的tags属性的hashcode |
3、消息的【key查询索引】存储格式
说明:
在producer生产消息时,我们可以为消息指定一个业务上的String 字符串类型的key。那么,当该消息落地成功后,我们可以通过指定的key,去查询指定的消息,broker端会返回通过key的散列值,落在相同的槽的一定数量的消息给客户端。
文件组织:
存储地址:user_root_path/store/index/{fileName}
文件大小:40 + (5,000,000[hashSlotNum] * 4[hashSlotSize]) + ((4 * 5,000,000)[indexNum] * 20[indexSize]) byte
文件起名:yyyyMMddhhmmss(年月日时分秒) + mmm(毫秒)
-
user_root_path/store/index/
- yyyyMMddhhmmssmmm
消息的【key查询索引】存储格式及内容
查询索引文件是由【索引头】 + 【索引内容】两部分组成:
【索引头】
存储序列 | 字段名称 | 长度 | 注析 |
---|---|---|---|
1 | beginTimestampIndex | 8字节 | 表示索引文件 第一个构建的 消息 的存储时间(即消息放入缓存时的时间戳) |
2 | endTimestampIndex | 8字节 | 表示索引文件 最后一个构建的 消息 的存储时间 |
3 | beginPhyoffsetIndex | 8字节 | 表该索引文件 第一个构建的 消息 的物理存储地址 |
4 | beginPhyoffsetIndex | 8字节 | 表该索引文件 最后一个构建的 消息 的物理存储地址 |
5 | hashSlotCount | 4字节 | 散列槽个数,每构建一个消息的索引,个数加一 |
6 | indexCount | 4字节 | 索引个数,每构建一个消息的索引,个数加一 |
【索引内容】
存储序列 | 字段名称 | 长度 | 注析 |
---|---|---|---|
1 | keyHash | 4字节 | 查询消息的 key.hashCode()的绝对值 |
2 | phyOffset | 8字节 | 消息的物理存储地址 |
3 | timeDiff | 4字节 | 与第一个消息的存储时间差 |
4 | slotValue | 8字节 | 上一条索引消息所在的逻辑索引 |
4、rmq消息持久化技术
4.1系统标准IO与直接内存
传统I/O操作中,java io文件的read和write操作都是通过调用系统底层的标准IO函数 read()和write() 实现的。对于写操作,即通过java调用write方法以后,首先将当前的用户态(即java进程)转成内核态,并把java 内存(heap)的字节数据拷贝到内核态的io缓存区(pagecache),最后在把内核态的缓存区数据拷贝到磁盘文件(pagecache->disk),这样就完成了一次写操作;对于读操作,也是先将用户态转成内核态,然后由操作系统的内核代码将磁盘文件拷贝到内核态的缓冲区中,最后再从内核态拷贝到用户态的缓存中,这样就完成了一次读操作。当然,根据空间局部性原理,也就是当我们的应用程序需要访问磁盘文件上的指定的一段数据时,操作系统会认为我们可能也需要访问该段数据的下一段数据,并且,由于磁盘io的操作速度比内存操作的速度慢几个数量级,因此,会同时把指定的一段数据以及下一段数据一并从磁盘文件拷贝到内核态的缓冲区中,即预读更多的数据。所以,当我们读完指定的一段数据后,再读下一段数据时,就可以直接从内核态拷贝到用户态的缓存中,减少了低效率的io操作。
内存映射和标准的io操作一样需要从磁盘文件中获取数据,但它并不需要从内核态的缓冲区拷贝到用户态的缓冲区,而是直接将进程中一部分私有地址空间区域与磁盘文件建立起映射关系,通过缺页,把磁盘文件的内容直接拷贝到进程的缓冲区。相对标准io操作,这里有两个最主要的优势,第一,性能上,由于直接内存映射少一次内存拷贝,因此会比标准io操作要快,文件越大,优势越明显;第二,直接内存映射可以加载普通方式无法访问的大文件,这里的大文件指,例如1G的文件,如果是标准io操作,我们需要将1G的文件直接加载进内存,才可以访问,明显这是不可能的,但通过虚拟内存的映射方式以及操作系统层面上发起的页面请求,便可将所需数据加载到程序内存中。
下面快速总结一下Java内存映射文件和IO
1)Java语言通过java.nio包支持内存映射文件和IO。
2)内存映射文件用于对性能要求高的系统中,如繁忙的电子交易系统
3)使用内存映射IO你可以将文件的一部分加载到内存中
4)如果被请求的缓存页面不在内存中,内存映射文件会导致页面错误
5)将一个文件区间映射到内存中的能力取决于内存的可寻址范围。在32位机器中,不能超过4GB,即2^32比特。
6)Java中的内存映射文件比流IO要快(译注:对于大文件而言是对的,小文件则未必)
7)用于加载文件的内存在Java的堆内存之外,存在于共享内存中,允许两个不同进程访问文件。顺便说一下,这依赖于你用的是direct还是non-direct字节缓存。
8)读写内存映射文件是操作系统来负责的,因此,即使你的Java程序在写入内存后就挂掉了,只要操作系统工作正常,数据就会写入磁盘。
9)Direct字节缓存比non-direct字节缓存性能要好
10)不要经常调用MappedByteBuffer.force()方法,这个方法强制操作系统将内存中的内容写入硬盘,所以如果你在每次写内存映射文件后都调用force()方法,你就不能真正从内存映射文件中获益,而是跟disk IO差不多。
11)如果电源故障或者主机瘫痪,有可能内存映射文件还没有写入磁盘,意味着可能会丢失一些关键数据。
12)MappedByteBuffer和文件映射在缓存被GC之前都是有效的。sun.misc.Cleaner可能是清除内存映射文件的唯一选择。
这里在说一下MappedByteBuffer,可能会导致JVM crash ,因为MappedByteBuffer可以通过特殊的方法释放,实际上调用了unmap的方法。此时,之前映射到jvm的地址空间就是非法地址,如果此后仍然对MappedByteBuffer进行读写,系统就会向jvm发送sigbus信号来通知进程非法操作,这个问题一般是由于程序没有处理好并发问题导致的。
因此rmq通过引用计数法,即只要引用计数不为0,MappedByteBuffer对象就不会释放来解决这个问题。具体的抽象实现为ReferenceResource,使用AtomicLong原子变量来保证并发,性能上会比较好。
当然,这种方式也会存在弊端,就是程序不能正确操作引用计数,可能会导致文件无法删除,因此,rmq增加了一个补救措施,就是一旦文件被关闭了状态位available会设置为false,并且开始计时,如果超过2分钟,引用计数还没有变为0,就强行释放。上文提到,MappedByteBuffer会在jvm发生gc时,可能被回收,但不是一定,rmq通过反射的方式调用Cleaner.clean,手动清除。
DirectByteBuffer本身是一个java heap内的对象,自身所占用的内存并不会很大,只是其实例所映射的堆外内存可能会比较大,当jvm发起young gc时,如果DirectByteBuffer实例是非可达性对象,那么,jvm就会将DirectByteBuffer实例回收,在回收前,会通过Cleaner.clean方法,委托Deallocator释放堆外内存;但DirectByteBuffer经过多次ygc后,会晋升到老年代,此时,如果不通过full gc 或old gc,就无法释放堆外内存;
因此我们可以通过程序手动释放。
Cleaner是 PhantomReference虚引用的子类,并通过自身的next和prev字段维护的一个双向链表。PhantomReference的作用在于跟踪垃圾回收过程,并不会对对象的垃圾回收过程造成任何的影响。
所以cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); 用于对当前构造的DirectByteBuffer对象的垃圾回收过程进行跟踪。
当DirectByteBuffer对象从pending状态 ——> enqueue状态时,会触发Cleaner的clean(),而Cleaner的clean()的方法会实现通过unsafe对堆外内存的释放。
4.2消息在java堆,物理内存,虚拟内存以及磁盘间的流动
(1)对于producer消息生产来说,消息通过socket转入java heap,然后通过直接内存映射写到pagecache(内核态共享缓存,内存的一种),最后在通过刷盘线程异步写到flush到磁盘文件。如果broker端设置为sync,则同步等待刷盘结果。
(2)消费者(consumer1)正常拉取消息,消息直接从pagecache转入socket,不经过java heap,这张场景最多,例如向上96G物理内存,按照1K消息算,可在物理内存缓存1亿条消息
(3)消费者(consumer2)异常拉取消息,由于socket所访问的消息不在pagecache中,因此需要通过虚拟内存,发生缺页中断,产生磁盘IO,从磁盘把消息加载到pagecache,最后在直接从socket发出去。
参考:
1:http://javarevisited.blogspot.hk/2012/01/memorymapped-file-and-io-in-java.html
2:RocketMQ STORE Q&A
3:http://www.jianshu.com/p/007052ee3773