人生就象弈棋,一步失误,全盘皆输,这是令人悲哀之事;而且人生还不如弈棋,不可能再来一局,也不能悔棋。 —— 弗洛伊德
1. 消息分发的方式
ActiveMQ消息分发有两种方式:游标以及异步发送。
1.1 消息游标
当Producer发送的持久化消息到达Broker后,Broker首先会把消息保存在持久存储中。
(一)若发现当前有活跃的Consumer,且这个Consumer消费消息的速度能跟得上Producer产生消息的速度,那么ActiveMQ会直接把消息传递给Broker内部跟这个Consumer相关联的Dispatch Queue。
(二)若当前没有活跃的Consumer或者Consumer消费消息的速度跟不上Producer产生消息的速度,那么ActiveMQ会使用Pending Message Cursors保存堆消息的引用。在需要的时候,Pending Message Cursors把消息引用传递给Broker内部跟这个Consumer相关联的Dispatch Queue。
ActiveMQ有以下两种Pending Message Cursors:
- VM Cursor:在内存中保存消息。
- File Cursor:首先在内存中保存消息的引用,如果内存使用达到上限时,那么会把消息引用保存到临时文件中。
在缺省情况下,ActiveMQ会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors。
对于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor以及可以使用的PendingDurableSubscriberMessageStoragePolicy有vmDurableCursor 和 fileDurableSubscriberCursor。
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="org.apache.>">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
<PendingDurableSubscriberMessageStoragePolicy>
<vmDurableCursor/>
</PendingDurableSubscriberMessageStoragePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
对于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="org.apache.>">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="Test.AQ."/>
</deadLetterStrategy>
<pendingQueuePolicy>
<vmQueueCursor />
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
1.2 消息异步发送
消息的异步发送适用于慢消费者,因为对于慢消费者,使用同步发送会使Producer出现阻塞现象。
异步发送的配置方式有以下几种:
1. ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是Fast Consumer则使用dispatcheAsync=false。
2. 在Connection URI级别来配置使用Async Send
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
3. 在ConnectionFactory级别来配置使用Async Send
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
4. 在Connection级别来配置使用Async Send
((ActiveMQConnection)connection).setUseAsyncSend(true);
2. 消息确认
ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能。若希望禁止使用经过优化的确认方式,有以下几种方式:
(1) 在Connection URI 上禁止启用Optimized Acknowledgements。
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
(2) 在ConnectionFactory 上禁止启用Optimized Acknowledgements。
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
(3) 在Connection上禁止启用Optimized Acknowledgements。
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);