Message发送之高级特性(一)

  人生就象弈棋,一步失误,全盘皆输,这是令人悲哀之事;而且人生还不如弈棋,不可能再来一局,也不能悔棋。 —— 弗洛伊德

1. 消息分发的方式

  ActiveMQ消息分发有两种方式:游标以及异步发送

1.1 消息游标

  当Producer发送的持久化消息到达Broker后,Broker首先会把消息保存在持久存储中。
  (一)若发现当前有活跃的Consumer,且这个Consumer消费消息的速度能跟得上Producer产生消息的速度,那么ActiveMQ会直接把消息传递给Broker内部跟这个Consumer相关联的Dispatch Queue。


  (二)若当前没有活跃的Consumer或者Consumer消费消息的速度跟不上Producer产生消息的速度,那么ActiveMQ会使用Pending Message Cursors保存堆消息的引用。在需要的时候,Pending Message Cursors把消息引用传递给Broker内部跟这个Consumer相关联的Dispatch Queue。


  ActiveMQ有以下两种Pending Message Cursors:

  • VM Cursor:在内存中保存消息。
  • File Cursor:首先在内存中保存消息的引用,如果内存使用达到上限时,那么会把消息引用保存到临时文件中。

  在缺省情况下,ActiveMQ会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配置Message Cursors。
  对于topic,可以使用的pendingSubscriberPolicy 有vmCursor和fileCursor以及可以使用的PendingDurableSubscriberMessageStoragePolicy有vmDurableCursor 和 fileDurableSubscriberCursor。

<destinationPolicy>
   <policyMap>
     <policyEntries>
       <policyEntry topic="org.apache.>">
       <pendingSubscriberPolicy>
         <vmCursor />
       </pendingSubscriberPolicy>
       <PendingDurableSubscriberMessageStoragePolicy>
         <vmDurableCursor/>
       </PendingDurableSubscriberMessageStoragePolicy>
       </policyEntry>
     </policyEntries>
   </policyMap>
</destinationPolicy>

  对于queue,可以使用的pendingQueuePolicy有vmQueueCursor 和 fileQueueCursor。

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue="org.apache.>">
                <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="Test.AQ."/>
                </deadLetterStrategy>
                <pendingQueuePolicy>
                    <vmQueueCursor />
                </pendingQueuePolicy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

1.2 消息异步发送

  消息的异步发送适用于慢消费者,因为对于慢消费者,使用同步发送会使Producer出现阻塞现象。
  异步发送的配置方式有以下几种:
  1. ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是Fast Consumer则使用dispatcheAsync=false。
  2. 在Connection URI级别来配置使用Async Send

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

  3. 在ConnectionFactory级别来配置使用Async Send

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

  4. 在Connection级别来配置使用Async Send

((ActiveMQConnection)connection).setUseAsyncSend(true);

2. 消息确认

  ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能。若希望禁止使用经过优化的确认方式,有以下几种方式:
  (1) 在Connection URI 上禁止启用Optimized Acknowledgements。

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");

  (2) 在ConnectionFactory 上禁止启用Optimized Acknowledgements。

((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);

  (3) 在Connection上禁止启用Optimized Acknowledgements。

((ActiveMQConnection)connection).setOptimizeAcknowledge(true);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,831评论 13 425
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,178评论 19 139
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 10,823评论 0 34
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 14,412评论 8 167
  • 大家有没有这样的经历,朋友聚会,为了不打扰别人的兴致,不冷场。我们总是在将就,我们说不出,“酒吧太吵我不想去你们去...
    闵太半袭白衣阅读 2,405评论 0 0

友情链接更多精彩内容