版本
Stringboot: 1.5.8.RELEASE
ActiveMQ: 5.15.4
JDK: 1.8
Maven: 3.3.9
概要(先来点干货)
本文主要关于消息队列发送与监听的一些学习使用小结,基于Springboot。
- Queue - Point-to-Point (点对点)
一条消息只能被一个消费者消费, 且是持久化消息 - 当没有可用的消费者时,该消息保存直到被消费为止;当消息被消费者收到但不响应时(具体等待响应时间是多久,如何设置,暂时还没去了解),该消息会一直保留或会转到另一个消费者当有多个消费者的情况下。当一个Queue有多可用消费者时,可以在这些消费者中起到负载均衡的作用。 - Topic - Publisher/Subscriber Model (发布/订阅者)
一条消息发布时,所有的订阅者都会收到,topic有2种模式,Nondurable subscription(非持久订阅)和durable subscription (持久化订阅 - 每个持久订阅者,都相当于一个持久化的queue的客户端), 默认是非持久订阅。
- 持久化:消息产生后,会保存到文件/DB中,直到消息被消费, 如上述Queue的持久化消息。默认保存在ActiveMQ中:%ActiveMQ_Home%/data/kahadb
- 非持久化:消息不会保存,若当下没有可用的消费者时,消息丢失。
下图引用自:
https://blog.csdn.net/yan69594281/article/details/72598313
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置
Springboot 自动配置方式:
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
若用Springboot这种方式去配置ActiveMQ,代码部署到JBOSS后,连不上ActiveMQ,不知道为什么,也没有去深究。若哪位大神知道,可否告知,非常感谢!
所以我就改用下面的方式来配置ActiveMQ:
activemq.url=tcp://localhost:61616
#cluster configuration
#activemq.url=failover:(tcp://host1:port1,tcp://host2:port2)?randomize=false
activemq.username=admin
activemq.password=admin
相应的Java代码如下,这种方式可以方便地配置多个ActiveMQ,下面的Demo只配置了一个:
@Configuration
@EnableAsync // enable asynchronous task
@EnableJms
public class JmsConfiguration {
private Logger logger = LoggerFactory.getLogger(JmsConfiguration.class);
@Bean(name = "firstConnectionFactory")
public ActiveMQConnectionFactory getFirstConnectionFactory(@Value("${activemq.url}") String brokerUrl,
@Value("${activemq.username}") String userName, @Value("${activemq.password}") String password)
{
logger.debug(brokerUrl + " - " + userName);
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(userName);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean(name = "firstJmsTemplate")
public JmsMessagingTemplate getFirstJmsTemplate(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory) {
JmsMessagingTemplate template = new JmsMessagingTemplate(connectionFactory);
return template;
}
@Bean(name = "firstTopicListener")
public DefaultJmsListenerContainerFactory getFirstTopicListener(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory)
{
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true); // if topic, set true
// factory.setSessionAcknowledgeMode(4); // change acknowledge mode
return factory;
}
@Bean(name = "firstQueueListener")
public DefaultJmsListenerContainerFactory getFirstQueueListener(@Qualifier("firstConnectionFactory") ConnectionFactory connectionFactory)
{
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// factory.setSessionAcknowledgeMode(4); // change acknowledge mode
return factory;
}
}
消息发送
- 配置
activemq.topic=T_Topic1
activemq.queue=Q_Queue1
activemq.virtual.topic=VirtualTopic.Topic1
activemq.virtual.topic.A=Consumer.A.VirtualTopic.Topic1
activemq.virtual.topic.B=Consumer.B.VirtualTopic.Topic1
- 实现
@Component
public class JmsProducer {
@Autowired
@Qualifier("firstJmsTemplate")
private JmsMessagingTemplate jmsTemplate;
@Value("${activemq.topic}")
private String topic;
@Value("${activemq.queue}")
private String queue;
@Value("${activemq.virtual.topic}")
private String vTopic;
public void sendMsg(Destination destination, Message msg) {
jmsTemplate.convertAndSend(destination, msg);
}
/**
* send msg to queue.
* @param data
*/
public void sendToQueue(Map<String, String> data) {
ActiveMQQueue mqQueue = new ActiveMQQueue(queue);
ActiveMQMessage msg = new ActiveMQMessage();
try {
msg.setStringProperty("value", data.get("value"));
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sendMsg(mqQueue, msg);
}
/**
* send msg to topic.
* @param data
*/
public void sendToTopic(Map<String, String> data) {
ActiveMQTopic mqTopic = new ActiveMQTopic(topic);
ActiveMQMessage msg = new ActiveMQMessage();
try {
msg.setStringProperty("value", data.get("value"));
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sendMsg(mqTopic, msg);
}
/**
* send msg to virtual topic.
* @param data
*/
public void sendToVTopic(Map<String, String> data) {
ActiveMQTopic mqVTopic = new ActiveMQTopic(vTopic);
ActiveMQMessage msg = new ActiveMQMessage();
try {
msg.setStringProperty("value", data.get("value"));
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sendMsg(mqVTopic, msg);
}
}
消息监听
- 实现
@Component
public class JmsConsumer {
private Logger logger = LoggerFactory.getLogger(JmsConsumer.class);
@JmsListener(destination = "${activemq.topic}", containerFactory = "firstTopicListener")
@Async // receive msg asynchronously
//@Async("taskExecutePool")
public void receiveTopic(Message msg) throws JMSException {
logger.debug(Thread.currentThread().getName() + ": topic===========" + msg.getStringProperty("value"));
try {
Thread.sleep(1000L);
// msg.acknowledge(); //消息确认
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.debug(Thread.currentThread().getName() + ": topic===========" + msg.getStringProperty("value"));
}
@JmsListener(destination = "${activemq.queue}", containerFactory = "firstQueueListener")
@Async
public void receiveQueue(Message msg) throws JMSException {
logger.debug(Thread.currentThread().getName() + ": Queue===========" + msg.getStringProperty("value"));
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.debug(Thread.currentThread().getName() + ": Queue===========" + msg.getStringProperty("value"));
}
@JmsListener(destination = "${activemq.virtual.topic.A}", containerFactory = "firstQueueListener")
@Async
public void receiveVTopicA1(Message msg) throws JMSException {
logger.debug(Thread.currentThread().getName() + ": vtopic A1===========" + msg.getStringProperty("value"));
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.debug(Thread.currentThread().getName() + ": vtopic A1===========" + msg.getStringProperty("value"));
}
@JmsListener(destination = "${activemq.virtual.topic.A}", containerFactory = "firstQueueListener")
@Async
public void receiveVTopicA2(Message msg) throws JMSException {
logger.debug(Thread.currentThread().getName() + ": vtopic A2===========" + msg.getStringProperty("value"));
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.debug(Thread.currentThread().getName() + ": vtopic A2===========" + msg.getStringProperty("value"));
}
@JmsListener(destination = "${activemq.virtual.topic.B}", containerFactory = "firstQueueListener")
@Async
public void receiveVTopicB(Message msg) throws JMSException {
logger.debug(Thread.currentThread().getName() + ": vtopic B===========" + msg.getStringProperty("value"));
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.debug(Thread.currentThread().getName() + ": vtopic B===========" + msg.getStringProperty("value"));
}
}
从上面代码可以看到,消费者并没有发送响应(消息确认)给ActiveMQ在监听到消息之后。这是因为SpringBoot在默认情况下,会自动发送消息确认给ActiveMQ.
SpringBoot可用下面这种方式进行改AcknowledgeMode:
spring.jms.listener.acknowledge-mode= # Acknowledge mode of the container. By default, the listener is transacted with automatic acknowledgment.
由于我上面不是用Springboot的配置自动注入ActiveMQ, 所以可以使用下面方法改AcknowledgeMode:
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
JMS规范的ack消息确认机制有一下四种,定于在session对象中:
AUTO_ACKNOWLEDGE = 1 :自动确认
CLIENT_ACKNOWLEDGE = 2:客户端手动确认
DUPS_OK_ACKNOWLEDGE = 3: 自动批量确认
SESSION_TRANSACTED = 0:事务提交并确认
但是在activemq补充了一个自定义的ACK模式:
INDIVIDUAL_ACKNOWLEDGE = 4:单条消息确认
但关于Spring下ActiveMQ的消息确认机制有一个问题,发现AcknowledgeMode设置成Session.CLIENT_ACKNOWLEDGE并没有什么用,还是会自动确认,以下是相关源码:
// org.springframework.jms.listener.AbstractMessageListenerContainer
protected void commitIfNecessary(Session session, Message message) throws JMSException {
// Commit session or acknowledge message.
if (session.getTransacted()) {
// Commit necessary - but avoid commit call within a JTA transaction.
if (isSessionLocallyTransacted(session)) {
// Transacted session created by this container -> commit.
JmsUtils.commitIfNecessary(session);
}
}
else if (message != null && isClientAcknowledge(session)) {
message.acknowledge();
}
}
protected boolean isClientAcknowledge(Session session) throws JMSException {
return (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);
}
由源码可知,设置sessionAcknowledgeMode为2时,虽然是客户端手动确认,但是却被spring自动确认了,造成设置无效。这时只需要把sessionAcknowledgeMode的值设置成activemq自定义的类型INDIVIDUAL_ACKNOWLEDGE = 4即可
关于消息自动确认,感谢这位大哥的帖子:
https://segmentfault.com/a/1190000008707181
虚拟Topic
个人觉得,虚拟Topic在分布式部署/集群中用处很大,最起码解决了我在现实工作中遇到的问题。
- 使用场景
有一个application, 订阅一个Topic消息, 这个application部署到2台server上时(当然可以更多,这里以2台为例),2台server会同时监听这个Topic, 这样会导致一个消息重复消费2次,这不符合一些特定的应用场景。
如果设置成Queue,能确保一个消息只被消费一次,但就不能有其他的application同时消费这个消息,也不符合一些特定的应该场景。
为了达到在分布式部署的情况下,一个消息, 被同一个application只消费一次,同时也能被其他的application消费,就引入虚拟Topic. 这种方式可以说:在Topic消息层面上,实现负载均衡。
先看代码:
activemq.virtual.topic=VirtualTopic.Topic1
activemq.virtual.topic.A=Consumer.A.VirtualTopic.Topic1
activemq.virtual.topic.B=Consumer.B.VirtualTopic.Topic1
@Value("${activemq.virtual.topic}")
private String vTopic;
// 消息发送
/**
* send msg to virtual topic.
* @param data
*/
public void sendToVTopic(Map<String, String> data) {
ActiveMQTopic mqVTopic = new ActiveMQTopic(vTopic);
ActiveMQMessage msg = new ActiveMQMessage();
try {
msg.setStringProperty("value", data.get("value"));
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
sendMsg(mqVTopic, msg);
}
// 消息监听
@JmsListener(destination = "${activemq.virtual.topic.A}", containerFactory = "firstQueueListener")
@Async
public void receiveVTopicA1(Message msg) throws JMSException {
logger.debug(Thread.currentThread().getName() + ": vtopic A1===========" + msg.getStringProperty("value"));
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// logger.debug(Thread.currentThread().getName() + ": vtopic A1===========" + msg.getStringProperty("value"));
}
@JmsListener(destination = "${activemq.virtual.topic.A}", containerFactory = "firstQueueListener")
@Async
public void receiveVTopicA2(Message msg) throws JMSException {
logger.debug(Thread.currentThread().getName() + ": vtopic A2===========" + msg.getStringProperty("value"));
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// logger.debug(Thread.currentThread().getName() + ": vtopic A2===========" + msg.getStringProperty("value"));
}
@JmsListener(destination = "${activemq.virtual.topic.B}", containerFactory = "firstQueueListener")
@Async
public void receiveVTopicB(Message msg) throws JMSException {
logger.debug(Thread.currentThread().getName() + ": vtopic B===========" + msg.getStringProperty("value"));
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// logger.debug(Thread.currentThread().getName() + ": vtopic B===========" + msg.getStringProperty("value"));
}
- 分析
从上面代码可以看出,有3个JmsListener监听:
2个 Consumer.A.VirtualTopic.Topic1监听
1个Consumer.B.VirtualTopic.Topic1监听
看一下运行结果(共发5个消息):
A的2个监听者,不会重复监听同一条消息,A监听的同时,不会影响B的监听。
从消息类型上来说:对于虚拟Topic, 对消息生产者来说,是一个普通的topic;对于消息消费者来说,是一个queue。
从使用意义上来说,对于虚拟Topic,它会对消费者进行分组,同一组的,同一个消息只会被消费一次。比如上面例子的Consumer.A。
虚拟Topic命名规范
Topic命名: VirtualTopic.xxx
消费者命名: Consumer.yyy.VirtualTopic.xxx
关于虚拟Topic, 感谢这2位大哥:
http://blog.sina.com.cn/s/blog_7d1968e20102wyq0.html
https://www.cnblogs.com/jiangxiaoyaoblog/p/5659734.html
Junit Test
顺便提一下,在使用Junit同时测试消息发送与接收的时候,有可能会接收不到一些消息。如使用for循环发送10个消息,可能只会接收到5个(特别是同步的情况下,异步的没有充分测试所以不清楚异步时会不会有同样的情况)。
原因:当Junit发送完10个消息后,Spring服务就停掉了,导致有些还没来得及接收的消息就接收不到了。也就是说,Spring服务停了,导致Spring中的ActiveMQ客户端也停掉了,导致没法继续监听消息。
@SpringBootTest
@RunWith(SpringRunner.class)
@EnableAsync
public class JmsTest {
@Autowired
private JmsProducer producer;
private Logger logger = LoggerFactory.getLogger(JmsTest.class);
@Test
public void sendMsg() {
for (int i = 0; i < 10; i++) {
Map<String, String> map = new HashMap<String, String>();
map.put("value", "value = " + i);
producer.sendToTopic(map);
// producer.sendToQueue(map);
// producer.sendToVTopic(map);
}
}
若有错误和补充,请多多指教!