ActiveMQ事物/持久化/其他

事务
jms中事务分为生产者和消费者两块,消息的生产和消费不能包含在同一个事务中。

生产者:
在事务状态下进行发送操作,消息并未真正投递到中间件。而只有进行session.commit操作之后,消息才会发送到中间件,再转发到适当的消费者进行处理。如果是调用rollback操作,则表明,当前事务期间内所发送的消息都取消掉。
在支持事务的session中,producer发送message时在message中带有transactionID。broker收到message后判断是否有transactionID,如果有就把message保存在transaction store中,等待commit或者rollback消息。所以ActiveMq的事务是针对broker而不是producer的,不管session是否commit,broker都会收到message。如果producer发送模式选择了persistent,那么message过期后会进入死亡队列。在message进入死亡队列之前,ActiveMQ会删除message中的transaction ID,这样过期的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。

消费者:
在Spring整合JMS的应用中,我们要进行本地的事务管理,只需要指定对应的监听容器的sessionTransacted属性为true。对于SessionAwareMessageListener,在接收到消息后发送一个返回消息时也处于同一事务下,但是对于其他操作如数据库访问等将不属于该事务控制。

    <bean id="jmsContainer"  
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="connectionFactory" />  
        <property name="destination" ref="queueDestination" />  
        <property name="messageListener" ref="consumerMessageListener" />  
        <property name="sessionTransacted" value="true"/>  
    </bean> 

jta事务:
如果想要接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer)。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理,并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。

    <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="messageListener" ref="jmsQueueReceiver" />
        <property name="destination" ref="queueDestination" />
        <property name="sessionTransacted" value="true"/>
        <property name="transactionManager" ref="jtaTransactionManager"/>
    </bean>
    <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

当指定了transactionManager时,消息监听容器将忽略sessionTransacted的值。

持久化
默认持久化到文件中:
打开安装目录下的配置文件,注意这里使用的是kahaDB,是一个基于文件支持事务的消息存储器。以日志形式存储消息,消息索引以B-Tree结构存储。在D:\ActiveMQ\apache-activemq\conf\activemq.xml中会发现默认的配置项:

    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>

消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。
消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。

如需持久化到数据库中:
首先需要把MySql的驱动放到ActiveMQ的Lib目录下,例如:mysql-connector-java-5.0.4-bin.jar,在conf/acticvemq.xml中更改persistenceAdapter节点配置并且引用定义的mysql-ds数据源。

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
            <kahaDB directory="${activemq.data}/kahadb"/>
        -->
        <persistenceAdapter> 
            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> 
        </persistenceAdapter>

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。

在conf/acticvemq.xml中定义mysql-ds,如下。

  <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value=""/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

然后重新启动消息队列,你会发现多了3张表activemq_acks,activemq_lock,activemq_msgs。

  • activemq_msgs用于存储消息,然后启动消费者,发现Mysql中已经没有这条消息了。
  • activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
  • activemq_lock在集群环境中才有用。

PERSISTENT (持久消息)和 NON_PERSISTENT(非持久消息),默认为持久消息。持久化的消息在MQ服务器宕机之后,消息不会丢失,在重启服务的时候,消息将恢复。

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
    <property name="connectionFactory" ref="cachingConnectionFactory"></property>  
    <property name="defaultDestination" ref="dest" />  
    <property name="messageConverter" ref="messageConverter" />  
    <property name="pubSubDomain" value="false" />  
    <property name="explicitQosEnabled" value="true" />  
    <!-- 发送模式  DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久-->
    <property name="deliveryMode" value="1" />   
</bean>  

prefetch:
ActiveMQ的prefetch机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。
假设有三个消费者,接收从1到99,共99条消息:
consumer A:1,4,7...
consumer B:2,5,8...
consumer C:3,6,9...
按照默认分配策略,将会把消息如上预分配。

这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除。如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。

假设一种情况:某个consumer C性能较差,处理信息速度很慢。会导致consumer C任有消息积压,但consumer A, consumer B已经空闲。
解决方案:将consumer C 的 prefetch设为1,每次处理1条消息,处理完再去取。

    prefetchPolicy.setQueuePrefetch(1);
    connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);//实例化连接工厂
    connectionFactory.setPrefetchPolicy(prefetchPolicy);

TimeToLive:

表示一个消息的有效期。只有在这个有效期内,消息消费者才可以消费这个消息。默认值为0,表示消息永不过期。

如果使用TTL来判定消息的过期,那么就首先需要确保Producer、broker两者的系统时间要尽可能的一致,Consumer也尽可能的和broker的时间保持一致。Broker将会在接收Producer消息时,以及将消息发送给Consumer之前都会检测消息是否过期,判断过期的方法也就是根据JMSExpiration和当前时间戳比较。

可以通过下面的方式设置:

    producer.setTimeToLive(3600000); //有效期1小时 (1000毫秒 * 60秒 * 60分)

可以在消息发送时,为当前消息设定ttl。

    messageProducer.send(Message message, int deliveryMode, int priority, long timeToLive)

如果消息过期,将会把消息发送到DLQ中,此消息不会被Consumer消费。如果broker端对DLQ使用Discard策略或者Broker没有开启DLQ相关策略,这些过期的消息可能将不复存在。在conf/activemq.xml中:

    <destinationPolicy>
        <policyMap>
         <policyEntries>
           <policyEntry queue=">" topic=">">
             <deadLetterStrategy>
               <sharedDeadLetterStrategy processExpired="false" />
             </deadLetterStrategy>
             <!-- discard all -->
             <!--
             <discardingDeadLetterStrategy />
             -->
           </policyEntry>
         </policyEntries>
        </policyMap>
    </destinationPolicy>

Priority:

我们可以在发送消息时,指定消息的权重,broker可以建议权重较高的消息将会优先发送给Consumer。不过因为各种原因,priority并不能决定消息传送的严格顺序(order)。
JMS标准中约定priority可以为09的数值,值越大表示权重越高,默认值为4。不过activeMQ中各个存储器对priority的支持并非完全一样。比如JDBC存储器可以支持09。但是对于kahadb/levelDB等这种基于日志文件的存储器而言,priority支持相对较弱,只能识别三种优先级(LOW: < 4,NORMAL: =4,HIGH: > 4)。在broker端,默认是不支持priority排序的,我们需要手动开启。在conf/activemq.xml中::

    <policyEntry queue=">" prioritizedMessages="true"/>

设置message的优先级:
TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);//创建一条文本消息
message.setJMSPriority(9);

failover:
如果集群中的某一台消息服务器宕机,与该台消息服务器相连接的生产者和消费者需要能够自动连接到其他正常工作的消息服务器。对此ActiveMQ提供了一种叫做失效转移(也叫故障转移,FailOver)的策略。失效转移提供了在传输层上重新连接到其他任何传输器的功能。

只需要在uri中配置,语法如下:

    failover:(uri1,...,uriN)?transportOptions 或者 failover:uri1,...,uriN

例子:

    failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
    failover:(tcp://localhost:61616,tcp://remotehost:61616)?randomize=false&initialReconnectDelay=100 

如果某个ActiveMQ客户端发现uri1地址失效了,它会立即转向uri地址列表中其他可以连接的消息服务器进行重连,以保证继续正常工作,这种选择其他地址的方式默认是随机的,以保证负载均衡。如果你想关闭随机,可以transportOptions中加入randomize=false。

Java例子:

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    "failover:(
    tcp://192.168.0.87:61616?wireFormat.maxInactivityDuration=0,
    tcp://192.168.0.87:61617?wireFormat.maxInactivityDuration=0
    )");    
    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);    
  • transportOptions有多种参数可以选择,如下:
  • initialReconnectDelay:默认为10,单位毫秒,表示第一次尝试重连之前等待的时间。
  • maxReconnectDelay:默认30000,单位毫秒,表示两次重连之间的最大时间间隔。
  • useExponentialBackOff:默认为true,表示重连时是否加入避让指数来避免高并发。
  • reconnectDelayExponent:默认为2.0,重连时使用的避让指数。
  • maxReconnectAttempts:5.6版本之前默认为-1, 5.6版本及其以后,默认为0。 0表示重连的次数无限,配置大于0可以指定最大重连次数。
  • randomize:默认为true,表示在URI列表中选择URI连接时是否采用随机策略。如果为true的话,有可能生产者连接的是第一个,而消费者连接的是第二个,造成一个服务器上只有生产者,一个服务器上只有消费者。
  • backup:默认为false,表示是否在连接初始化时将URI列表中的所有地址都初始化连接,以便快速的失效转移,默认是不开启。
  • timeout:默认为-1,单位毫秒,是否允许在重连过程中设置超时时间来中断的正在阻塞的发送操作。-1表示不允许,其他表示超时时间。

5.9版本使用levelDB+zookeeper的方式来实现HA了。


参考:
http://haohaoxuexi.iteye.com/blog/1983532
http://manzhizhen.iteye.com/blog/2105572

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,654评论 18 139
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,469评论 0 34
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,831评论 8 167
  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,827评论 4 54
  • 烟鬼的自白 我就爱抽烟, 一抽十几年, 提起那滋味, 飘飘欲仙。点燃细细的烟, 燃烧在纤细的指间, 淡定从容, 显...
    阿超Lilian阅读 147评论 0 0