个人专题目录
六、ActiveMQ的持久化
ActiveMQ中,持久化是指对消息数据的持久化。在ActiveMQ中,默认的消息是保存在内存中的。当内存容量不足的时候,或ActiveMQ正常关闭的时候,会将内存中的未处理的消息持久化到磁盘中。具体的持久化策略由配置文件中的具体配置决定。
ActiveMQ的默认存储策略是kahadb。如果使用JDBC作为持久化策略,则会将所有的需要持久化的消息保存到数据库中。
所有的持久化配置都在conf/activemq.xml中配置,配置信息都在broker标签内部定义。
1 kahadb方式
是ActiveMQ默认的持久化策略。kahadb是一个文件型数据库。是使用内存+文件保证数据的持久化的。kahadb可以限制每个数据文件的大小。不代表总计数据容量。
<persistenceAdapter>
<!-- directory:保存数据的目录; journalMaxFileLength:保存消息的文件大小 -->
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>
特性是:1、日志形式存储消息;2、消息索引以B-Tree结构存储,可以快速更新;3、完全支持JMS事务;4、支持多种恢复机制;
2 AMQ方式
只适用于5.3版本之前。
AMQ也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。
<persistenceAdapter>
<!-- directory:保存数据的目录 ; maxFileLength:保存消息的文件大小 -->
<amqPersistenceAdapter directory="${activemq.data}/amq" maxFileLength="32mb"/>
</persistenceAdapter>
性能高于JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。
当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档。
主要的缺点是AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间。
而且由于索引巨大,一旦Broker(ActiveMQ应用实例)崩溃,重建索引的速度会非常慢。
虽然AMQ性能略高于Kaha DB方式,但是由于其重建索引时间过长,而且索引文件占用磁盘空间过大,所以已经不推荐使用。
3 JDBC持久化方式
ActiveMQ将数据持久化到数据库中。 不指定具体的数据库。 可以使用任意的数据库中。 本环节中使用MySQL数据库。
下述文件为activemq.xml配置文件部分内容。不要完全复制。
首先定义一个mysql-ds的MySQL数据源,然后在persistenceAdapter节点中配置jdbcPersistenceAdapter并且引用刚才定义的数据源。
dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。
<broker brokerName="test-broker" persistent="true"
xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"
createTablesOnStartup="false"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url"
value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
配置成功后,需要在数据库中创建对应的database,否则无法访问。表格ActiveMQ可以自动创建。
activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高
activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:
主要的数据库字段如下:
CONTAINER:消息的Destination
SUB_DEST:如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID:记录消费过的消息的ID。
表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,
其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。
这个表用于记录哪个Broker是当前的Master Broker。
只有在消息必须保证有效,且绝对不能丢失的时候。使用JDBC存储策略。
如果消息可以容忍丢失,或使用集群/主备模式保证数据安全的时候,建议使用levelDB或Kahadb。