字典里最重要的三个词,就是意志、工作、等待。我将要在这三块基石上建立我成功的金字塔。 —— 巴斯德
消息持久化方式
消息持久化是保证消息不丢失的重要方式。ActiveMQ提供了以下三种的消息存储方式:
(1) 基于KahaDB的消息存储方式,这种方式是现在的默认存储方式。它提供了容量的提升和恢复能力。
(2) 基于JDBC的消息存储方式-数据存储于数据库中。
(3) Memory 消息存储-基于内存的消息存储。
1 KahaDB消息存储
1.1 KahaDB消息存储的配置方式
KahaDB是目前默认的存储方式,这种方式的消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。KahaDB存储配置的配置在conf/activemq.xml,如下:
<broker brokerName="broker" ... >
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="100mb"/>
</persistenceAdapter>
...
</broker>
<persistenceAdapter>中指定了存储方式为kahaDB,并表明数据存储在 "data/activemq-data"目录下,日志文件最大长度是100MB。
1.2 KahaDB消息存储的实现
KahaDB存储方式下的数据目录如下:
从上图可以看出,该目录下共有四个文件:
(1) db.data
它是消息的索引文件。本质上是B-Tree的实现,使用B-Tree作为索引指向db-.log里面存储的消息。
(2)db.redo
该文件主要用来进行消息恢复。
(3)db-.log
该文件用来存储消息的内容。对于一个消息而言,不仅仅有消息本身的数据(message data),而且还有(Destinations、订阅关系、事务...等方面的信息)
data log以日志形式存储消息,而且新的数据总是以APPEND的方式追加到日志文件末尾。因此,消息的存储是很快的。比如,对于持久化消息,Producer把消息发送给Broker,Broker先把消息存储到磁盘中(enableJournalDiskSyncs配置选项),然后再向Producer返回Acknowledge。Append方式在一定程度上减少了Broker向Producer返回Acknowledge的时间。
(4) lock
主要是用来存放一些锁的信息。
另外,一些KahaDB的配置选项如下:
1)indexWriteBatchSize: 默认值1000,当Metadata Cache中更新的索引到达了1000时,才同步到磁盘上的Metadata Store中。不是每次更新都写磁盘,而是批量更新写磁盘。
2)indexCacheSize: 默认值10000,(number of index pages cached in memory),在内存中最多分配多个页面来缓存index。缓存的index越多,命中的概率就越大,检索的效率就越高。
3)journalMaxFileLength: 默认值32MB,当存储的消息达到32MB时,新建一个新文件来保存消息。
4)enableJournalDiskSyncs: 默认值true,默认采用同步写磁盘,即消息先存储到磁盘中再向Producer返回ACK。
5)cleanupInterval: 默认值30000ms,当消息被消息者成功消费之后,Broker就可以将消息删除了。
5)checkpointInterval: 默认值5s,每隔5s将内存中的Index(Metadata Cache)更新到磁盘的Index文件中(Metadata Store)。
6)director: KahaDB存放的路径,默认值activemq-data。
KahaDB的实现原理如下图所示:
在内存(cache)中的那部分B-Tree是Metadata Cache。通过将索引缓存到内存中,可以加快查询的速度,但是需要定时将 Metadata Cache 与 Metadata Store同步。这个同步过程就称为:check point。由checkpointInterval选项 决定每隔多久时间进行一次check point操作。
BTree Indexes则是保存在磁盘上的,称为Metadata Store,它对应于文件db.data,它就是对Data Logs以B树的形式 索引。Broker就是根据Metadata Store进行快速地重启恢复。若是Metadata Store损坏了,则只能扫描整个Data Logs来重建Metadata Store了。
Data Logs则对应于文件 db-*.log,是消息存储的真正载体。
2 JDBC消息存储
2.1 JDBC消息存储的配置方式
<!--JDBC Jdbc用于master/slave模式的数据库分享 -->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
<!--配置数据库连接池-->
<bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/db_activemq?useUnicode=true&characterEncoding=UTF-8" />
<property name="username" value="root" />
<property name="password" value="123456"/>
</bean>
注意:此处需要上传数据库驱动包到/activemq/lib下。
2.1 JDBC消息存储的实现
重启ActiveMQ后,在数据库中就可以看到有如下几张表:
从图中可以看出,数据库中有activemq_acks、activemq_lock、activemq_msgs三张表。
activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
主要的数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID:记录最后消费过的消息的IDactivemq_msgs:用于存储消息,Queue和Topic都存储在这个表中。
主要的数据库字段如下:
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker。
3 Memory消息存储
Memory消息存储主要是存储所有的持久化的消息在内存中。使用这种方式必须注意设置你的broker所在的JVM和内存限制。
配置方式如下:
<broker ... persistent="false" ...></broker>
因是基于内存的,也就是说当服务器重启后,存储在内存中的所有消息都会丢失。