Message发送之高级特性(二)

人生活在希望之中,旧的希望实现了,或者泯灭了,新的希望的烈焰随之燃烧起来。如果一个人只管活一天算一天,什么希望也没有,他的生命实际上也就停止了。- 莫泊桑

1. 消息的延迟和定时

  若不希望消息马上被Broker发送给Comsumer,而是延迟1分钟发送给Comsumer,又或者需要消息每隔一定时间发送一次,一共发送N次。对于这样的需求,ActiveMQ提供了一种broker端消息定时调度机制。只需要在activemq.xml中配置schedulersupport属性值为true。

<broker xmlns="http://activemq.apache.org/schema/core" schedulersupport="true" brokerName="localhost" dataDirectory="${activemq.data}">

  提供了以下四种属性:

 1:AMQ_SCHEDULED_DELAY :延迟投递的时间
 2:AMQ_SCHEDULED_PERIOD :重复投递的时间间隔
 3:AMQ_SCHEDULED_REPEAT:重复投递次数
 4:AMQ_SCHEDULED_CRON:Cron表达式

  以下是示例:
  (1) 延迟60秒

MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("延迟60秒");
long time = 60 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
producer.send(message);

  (2) 延迟60秒,共投递10次,每次间隔10秒

TextMessage message = session.createTextMessage("延迟60秒,共投递10次");
long delay = 60 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);

  (3) 使用 CRON 表达式,每隔1小时发送一次

TextMessage message = session.createTextMessage("每隔1小时发送一次");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");

  CRON表达式的优先级高于另外三个参数,如果在设置了CRON的同时,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔为period。

2. 消息策略

  ActiveMQ中提供了众多的“策略”(policy),它们可以在broker端为每个通道“定制”消息的管理方式。

2.1 DispatchPolcicy:转发策略(Topic)

  此策略表明Broker端消息转发给多个Consumer时,消息被发送的顺序性,这个顺序通常指Consumer的顺序,只对Topic有效,它有4种常用的类型:
  1) RoundRobinDispatchPolicy:“轮询”,消息将依次发送给每个“订阅者”。“订阅者”列表默认按照订阅的先后顺序排列,在转发消息时,对于匹配消息的第一个订阅者,将会被移动到“订阅者”列表的尾部,这也意味着“下一条”消息,将会较晚的转发给它。
   2) StrictOrderDispatchPolicy:严格有序,消息依次发送给每个订阅者,按照“订阅者”订阅的时间先后。它和RoundRobin最大的区别是,没有移动“订阅者”顺序的操作。
   3) PriorityDispatchPolicy: 基于“property”权重对“订阅者”排序。它要求开发者首先需要对每个订阅者指定priority,默认每个consumer的权重都一样。
  4) SimpleDispatchPolicy: 默认值,按照当前“订阅者”列表的顺序。其中PriorityDispatchPolicy是其子类。
  其中,轮询是比较常用的策略,示例如下:

<policyEntry topic=">">                 
  <dispatchPolicy>    
    <roundRobinDispatchPolicy/>  
  </dispatchPolicy>  
</policyEntry>

2.2 SubscriptionRecoveryPolicy:恢复策略(Topic)

   在非持久订阅者“失效”期间或一个新的Topic,broker可以保留的可追溯的消息量。前提是Topic必须是“retroactive”,我们可以在distination地址中指定此属性,例如:"order.topic?consumer.retroactive=true"。默认情况下,订阅者只能获取“订阅”开始之后的消息,如果retroactive=true,那么订阅者就可以获取其创建之前的消息列表。此Policy就是用来控制“retroactive”的消息量。
   (1) FixedSizedSubscriptionRecoveryPolicy: 保存一定size的消息,broker将为此Topic开辟定额的RAM用来保存最新的消息。

<!-- 保存1K的消息 -->  
<fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>

   (2) FixedCountSubscriptionRecoveryPolicy:保存一定条数的消息。

<!-- 保存100条消息 -->  
<fixedCountSubscriptionRecoveryPolicy maximumSize="100"/> 

   (3) LastImageSubscriptionRecoveryPolicy:只保留最新的一条数据。
   (4) QueryBasedSubscriptionRecoveryPolicy:符合置顶selector的消息都将被保存,具体能够“恢复”多少消息,由底层存储机制决定;比如对于非持久化消息,只要内存中还存在,则都可以恢复。
   (5) TimedSubscriptionRecoveryPolicy:保留最近一段时间的消息。

<!-- 可追溯最近1分钟的消息-->  
<timedSubscriptionRecoveryPolicy recoverDuration="60000" />

   (6) NoSubscriptionRecoveryPolicy:关闭“恢复机制”。默认值。

2.3 DeadLetterStrategy:死信策略

  当消息过期后,或者“重发”几次之后仍然不能被正常消费,那么此消息将会被移除到DeadLetter队列中。此后,我们可以通过侦听死信队列,来获取相关通知或者对消息做额外的操作。
  (1) IndividualDeadLetterStrategy:把DeadLetter放入各自的死信通道中,对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue.”,Topic为“ActiveMQ.DLQ.Topic.”。比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order”。
   默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定为Topic。

<policyEntry queue="order">  
  <deadLetterStrategy>  
    <individualDeadLetterStrategy  
      queuePrefix="DLQ." useQueueForQueueMessages="false" />  
  </deadLetterStrategy>  
</policyEntry>  

  (2) SharedDeadLetterStrategy:将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。共享队列默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定。

<deadLetterStrategy>  
    <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>  
 </deadLetterStrategy> 

  (3) SharedDeadLetterStrategy:broker将直接抛弃DeadLeatter。如果开发者不需要关心DeadLetter,可以使用此策略。AcitveMQ提供了一个便捷的插件:DiscardingDLQBrokerPlugin,来抛弃DeadLetter。

<broker>  
    <plugins>  
      <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" />  
    </plugins>  
</broker>  

2.4 PendingMessageLimitStrategy:消息限制策略(面向慢消费者)

  此策略只对Topic有效,只对未持久化订阅者有效,当通道中有大量的消息积压时,broker可以保留的消息量。为了防止Topic中有慢速消费者,导致整个通道消息积压。
  (1) ConstantPendingMessageLimitStrategy:保留固定条数的消息,如果消息量超过limit,将使用消息剔除策略移除消息。

<policyEntry topic="ORDERS.>">  
    <!-- lets force old messages to be discarded for slow consumers -->  
    <pendingMessageLimitStrategy>  
        <constantPendingMessageLimitStrategy limit="50"/>  
    </pendingMessageLimitStrategy>  
</policyEntry>

  (2) PrefetchRatePendingMessageLimitStrategy:保留prefetchSize倍数条消息。

<!-- 若prefetchSize为100,则保留2.5 * 100条消息 -->  
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

2.5 SlowConsumerStrategy:慢速消费者策略

  对于慢消费者,broker会启动一个后台线程用来检测所有的慢速消费者,并定期的关闭慢消费者。
  (1) AbortSlowConsumerStrategy abortConnection:中断慢速消费者,慢速消费将会被关闭。

<slowConsumerStrategy>    
    <abortSlowConsumerStrategy abortConnection="false"/><!-- 不关闭底层链接 -->    
</slowConsumerStrategy>

  (2) AbortSlowConsumerStrategy maxTimeSinceLastAck:如果慢速消费者最后一个ACK距离现在的时间间隔超过阀值,则中断慢速消费者。

<slowConsumerStrategy>    
    <abortSlowConsumerStrategy  maxTimeSinceLastAck="30000"/><!-- 30秒滞后 -->    
</slowConsumerStrategy>

2.6 MessageEvictionStrategy:消息剔除策略(面向慢消费者)

  对于多余的消息,ActiveMQ提供了以下策略来移除。
  (1) OldestMessageEvictionStrategy:移除旧消息,默认策略。
  (2) OldestMessageWithLowestPriorityEvictionStrategy:旧数据中权重较低的消息,将会被移除。
  (3) UniquePropertyMessageEvictionStrategy:移除具有指定property的旧消息。

<policyEntry topic="ORDER.WEIGHT.>">   
    <pendingMessageLimitStrategy>  
        <constantPendingMessageLimitStrategy limit="1000"/>  
    </pendingMessageLimitStrategy>  
    <messageEvictionStrategy>  
        <uniquePropertyMessageEvictionStrategy propertyName="ORDER" />  
    </messageEvictionStrategy>  
</policyEntry>  

  上述例子中,对于ORDER.WEIGHT Topic,只保留1000条消息,超出时,将ORDER值相同的消息列表中较旧的消息移除(只保留最新的一条消息)。

2.7 PendingQueueMessageStoragePolicy:消息剔除策略(面向慢消费者)

  当通道中存在大量的慢消费者时,此时便会产生大量的Pending Message(待消费消息)。对于这些Pending Message,ActiveMQ提供了几种Cursor来保存。
  (1) vmQueueCursor:将待转发消息保存在额外的内存(JVM linkeList)的存储结构中。是“非持久化消息”的默认设置,如果Broker不支持Persistent,它是任何类型消息的默认设置。有OOM风险。
  (2) fileQueueCursor:将消息保存到临时文件中。文件存储方式有broker的tempDataStore属性决定。是“持久化消息”的默认设置。
  (3) storeCursor:“综合”设置,对于非持久化消息,将采用vmQueueCursor存储,对于持久化消息采用fileQueueCursor。这是强烈推荐的策略,也是效率最好的策略。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容