人生的磨难是很多的,所以我们不可对于每一件轻微的伤害都过于敏感。在生活磨难面前,精神上的坚强和无动于衷是我们抵抗罪恶和人生意外的最好武器。 —— 洛克
1. 独占消费者(Exclusive Consumer)
Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。
ActiveMQ提供了Exclusive Consumer机制,Broker从多个Consumer中挑选一个Consumer来处理Queue中的所有消息。若是在此过程,这个Consumer挂掉,则会自动切换到其他的Consumer继续处理。
可以通过Destination Options 来创建一个Exclusive Consumer。如下
queue = new ActiveMQQueue("ORDER.PRICE?consumer.exclusive=true");
consumer = session.createConsumer(queue);
2. 消息分组(Message Groups)
Message Groups可以看成是并发的Exclusive Consumer,Message Groups机制中是使用JMS消息属性的JMSXGroupID来区分的。Message Groups保证所有具有相同的JMSXGroupID的消息会被分发到相同的Consumer。该特性也是一种负载均衡的机制。
在消息被分发到Consumer前,Broker首先会检查消息JMSXGroupID属性。若存在,那么会检查是否有具有该Message Group的Consumer。若没有,Broker则会选择一个Consumer,将该关联到Message Group上,此后该Consumer会接收该Message Group的所有消息。直到:
(1) Consumer被关闭。
(2) Message group被关闭。发送一个消息,并设置该消息的JMSXGroupSeq为0。
示例如下:
// 发送者:
Destination destination = session.createQueue("Order.Group.Queue");
MessageProducer messageProducer = session.createProducer(destination);
for(int i = 1; i <= 3; i++){
TextMessage textMessage = session.createTextMessage("Message From OrderGroup-A:" + i);
textMessage.setStringProperty("JMSXGroupID", "OrderGroup-A");
messageProducer.send(textMessage);
}
// 消费者
Destination destination = session.createQueue(");
MessageConsumer messageConsumer = session.createConsumer("Order.Group.Queue");
messageConsumer.setMessageListener(new MessageListener(){@Override
public void onMessage(Message message) {
try {
System.out.println("收到的消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 执行结果
INFO | Successfully connected to tcp://localhost:61616
收到的消息:Message From OrderGroup-A:1
收到的消息:Message From OrderGroup-B:1
收到的消息:Message From OrderGroup-A:2
收到的消息:Message From OrderGroup-B:2
收到的消息:Message From OrderGroup-A:3
收到的消息:Message From OrderGroup-B:3
关闭一个Message Groups,只需要在message对象上设置属性即可,如下:
message.setStringProperty("JMSXGroupID","OrderGroup-A");
message.setIntProperty("JMSXGroupSeq", -1);
3. 消息选择器(JMS Selector)
JMS Selectors用于在订阅中,基于消息属性对进行消息的过滤。JMS Selectors由SQL92语法定义。如下面示例:
consumer = session.createConsumer(destination, "JMSType = 'car' AND weight > 2500");
在JMS Selectors表达式中,可以使用IN、NOT IN、LIKE等,例如:
(1)LIKE '12%3' ('123' true,'12993' true,'1234' false)
(2)LIKE 'l_se' ('lose' true,'loose' false)
(3)LIKE '_%' ESCAPE '' ('_foo' true,'foo' false)
需要注意的是,JMS Selectors表达式中的日期和时间需要使用标准的long型毫秒值。另外表达式中的属性不会自动进行类型转换,例如:
message.setStringProperty("NumberOfOrders", "2"); //"NumberOfOrders > 1" 求值结果是false。
Message Groups虽然可以保证具有相同message group的消息被唯一的consumer顺序处理,但是却不能确定被哪个consumer处理。在某些情况下,Message Groups可以和JMS Selector一起工作,如:
有三个consumers分别是A、B和C。你可以在producer中为消息设置三个message groups分别是“A”、“B”和“C”。然后令consumer A使用“JMXGroupID = ‘A’”作为selector。B和C也同理。这样就可以保证message group A的消息只被consumer A处理。需要注意的是,这种做法有以下缺点:
(1) producer必须知道当前正在运行的consumers。
(2) 如果某个consumer失效,那么应该被这个consumer消费的消息将会一直被积压在broker上。