Consumer高级特性

人生的磨难是很多的,所以我们不可对于每一件轻微的伤害都过于敏感。在生活磨难面前,精神上的坚强和无动于衷是我们抵抗罪恶和人生意外的最好武器。 —— 洛克

1. 独占消费者(Exclusive Consumer)

  Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。
  ActiveMQ提供了Exclusive Consumer机制,Broker从多个Consumer中挑选一个Consumer来处理Queue中的所有消息。若是在此过程,这个Consumer挂掉,则会自动切换到其他的Consumer继续处理。
  可以通过Destination Options 来创建一个Exclusive Consumer。如下

queue = new ActiveMQQueue("ORDER.PRICE?consumer.exclusive=true");
consumer = session.createConsumer(queue);

2. 消息分组(Message Groups)

  Message Groups可以看成是并发的Exclusive Consumer,Message Groups机制中是使用JMS消息属性的JMSXGroupID来区分的。Message Groups保证所有具有相同的JMSXGroupID的消息会被分发到相同的Consumer。该特性也是一种负载均衡的机制。
  在消息被分发到Consumer前,Broker首先会检查消息JMSXGroupID属性。若存在,那么会检查是否有具有该Message Group的Consumer。若没有,Broker则会选择一个Consumer,将该关联到Message Group上,此后该Consumer会接收该Message Group的所有消息。直到:
  (1) Consumer被关闭。
  (2) Message group被关闭。发送一个消息,并设置该消息的JMSXGroupSeq为0。
  示例如下:

// 发送者:
Destination destination = session.createQueue("Order.Group.Queue");
MessageProducer messageProducer = session.createProducer(destination);
for(int i = 1; i <= 3; i++){
     TextMessage textMessage = session.createTextMessage("Message From OrderGroup-A:"  + i);
     textMessage.setStringProperty("JMSXGroupID", "OrderGroup-A");
     messageProducer.send(textMessage);
}
// 消费者
Destination destination = session.createQueue(");
MessageConsumer messageConsumer = session.createConsumer("Order.Group.Queue");
messageConsumer.setMessageListener(new MessageListener(){@Override
    public void onMessage(Message message) {
        try {
            System.out.println("收到的消息:" + ((TextMessage) message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
});
// 执行结果
INFO | Successfully connected to tcp://localhost:61616
收到的消息:Message From OrderGroup-A:1
收到的消息:Message From OrderGroup-B:1
收到的消息:Message From OrderGroup-A:2
收到的消息:Message From OrderGroup-B:2
收到的消息:Message From OrderGroup-A:3
收到的消息:Message From OrderGroup-B:3

  关闭一个Message Groups,只需要在message对象上设置属性即可,如下:

message.setStringProperty("JMSXGroupID","OrderGroup-A");
message.setIntProperty("JMSXGroupSeq", -1);

3. 消息选择器(JMS Selector)

  JMS Selectors用于在订阅中,基于消息属性对进行消息的过滤。JMS Selectors由SQL92语法定义。如下面示例:

consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");

  在JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等,例如:
  (1)LIKE '12%3' ('123' true,'12993' true,'1234' false)
  (2)LIKE 'l_se' ('lose' true,'loose' false)
  (3)LIKE '_%' ESCAPE '' ('_foo' true,'foo' false)
  需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值。另外表达式中的属性不会自动进行类型转换,例如:

message.setStringProperty("NumberOfOrders", "2"); //"NumberOfOrders > 1" 求值结果是false。

  Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,如:
  有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是“A”、“B”和“C”。然后令consumer A使用“JMXGroupID = ‘A’”作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有以下缺点:
  (1) producer必须知道当前正在运行的consumers。
  (2) 如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,820评论 13 425
  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 8,906评论 0 9
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,828评论 19 139
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 10,818评论 0 34
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 14,406评论 8 167

友情链接更多精彩内容