ref
http://chy2z.blog.163.com/blog/static/31668846201772313356256/
http://chy2z.blog.163.com/blog/static/31668846201772315257841/
http://chy2z.blog.163.com/blog/static/31668846201772315813775/
http://chy2z.blog.163.com/blog/static/3166884620177232330989/
http://chy2z.blog.163.com/blog/static/31668846201772321022650/
http://chy2z.blog.163.com/blog/static/31668846201772321548904/
一:配置文件
1.在配置文件application.yml中就是原来activemq的配置,应该是不用变化的
其中两个点需要注意:
-
activemq.pool.enabled=false
一旦是true,则
这个地方 jmsTemplate将会run failed,原因是 pool.enabled的意思是 是否自动创建连接池
是否替换默认的connectionFactory,是否创建PooledConnectionFactory,默认false
好吧,我也不知道为什么true会报错
对于 订阅/发布 模式来说 pub-sub-domain true代表这是 订阅/发布模式
if false说明是点对点模式 这对于单一的activemq服务来说是非常方便的配置,但同时也因为这个所谓的“方便”,让同一个activemq项目难以同时进行topic和queue,于是就要追本溯源,从源代码开始配置
那么,对于单一模式有单一的选择,两个功能都有该设置true还是false呢?
以下我测了几个情况
1.true
topic,queue 正常
2.false
topic,queue 正常
3.不要这个设置项
topic,queue 正常
好吧,这个配置项失效了
二:创建连接工厂
通过创建两个JmsListenerContainerFactory,分别是topicListenerFactory和
queueListenerFactory,其中topicListenerFactory创建的时候,将pubSubDomain设
置成了true,表示该Listener负责处理Topic;queueListenerFactory创建的时候,将
pubSubDomain设置成了false,也就是说,jms默认就是queue模式,该Listener主要
负责处理Queue。
package net.gvsun;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.*;
@Configuration
@EnableJms
public class JmsConfig {
@Value("${application.topicDestinationName1}")
public String TOPIC1;
@Value("${application.topicDestinationName2}")
public String TOPIC2;
@Value("${application.queueDestinationName1}")
public String QUEUE1;
@Value("${application.queueDestinationName2}")
public String QUEUE2;
@Bean
public Topic topic1() {
return new ActiveMQTopic(TOPIC1);
}
@Bean
public Topic topic2() {
return new ActiveMQTopic(TOPIC2);
}
@Bean
public Queue queue1() {
return new ActiveMQQueue(QUEUE1);
}
@Bean
public Queue queue2() {
return new ActiveMQQueue(QUEUE2);
}
/**
* topic模式的ListenerContainer
* @return
*/
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
ActiveMQConnectionFactory connectionFactor=new ActiveMQConnectionFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactor);
return factory;
}
/**
* queue模式的ListenerContainer
* @return
*/
@Bean
public JmsListenerContainerFactory<?> queueListenerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
ActiveMQConnectionFactory connectionFactor=new ActiveMQConnectionFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactor);
return factory;
}
}
需要注意的几个点 @value的${...}是什么
这其实是调用配置文件中的值,以下是application.yml的截图
application:
queueDestinationName1: test12
queueDestinationName2: test24
topicDestinationName1: test33
topicDestinationName2: test45
应该挺形象的。
原本写一个jms的消费者与发布者需要一大堆“无用”的方法,spring把这些方法给抽离了,springboot又抽离了spring,所以当springboot受限制了,就要回本,从createListener和createConnectionFactory开始。
三:Producer和Customer/订阅和分发的创建,也就是service的实现
Producer:
package net.gvsun.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.*;
@Component
public class QueueSender {
private final static Logger logger= LoggerFactory.getLogger(TopicSender.class);
@Autowired
JmsTemplate jmsTemplate;
@Autowired
private Queue queue1;
@Autowired
private Queue queue2;
/**
* 发送一条消息到指定的队列
* @param message 消息内容
*/
public void send1(final String message){
logger.info("===========Queue发送的消息为==============:"+message);
this.jmsTemplate.convertAndSend(queue1, message);
}
/**
* 发送一条消息到指定的队列
* @param message 消息内容
*/
public void send2(final String message){
logger.info("===========Queue发送的消息为==============:"+message);
this.jmsTemplate.convertAndSend(queue2, message);
}
}
topicSender
package net.gvsun.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.*;
@Component
public class TopicSender {
private final static Logger logger= LoggerFactory.getLogger(TopicSender.class);
@Autowired
JmsTemplate jmsTemplate;
@Autowired
private Topic topic1;
@Autowired
private Topic topic2;
/**
* 发送一条消息到指定的主题
* @param message 消息内容
*/
public void send1(final String message){
logger.info("===========主题:发送的消息为==============:"+message);
this.jmsTemplate.convertAndSend(topic1, message);
}
/**
* 发送一条消息到指定的主题
* @param message 消息内容
*/
public void send2(final String message){
logger.info("===========主题:发送的消息为==============:"+message);
this.jmsTemplate.convertAndSend(topic2, message);
}
}
queueReciever1
package net.gvsun.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class QueueReceiver1 {
/// <summary>
/// 日志记录
/// </summary>
private static Logger logger = LoggerFactory.getLogger(QueueReceiver1.class.getName());
@JmsListener(destination = "${application.queueDestinationName1}",containerFactory="queueListenerFactory")
public void receiveQueue(String text) {
logger.info("===========队列消费者1:Queue接受的消息为==============:" + text);
}
//如果多个消费者同时消费1个队列,QueueReceiver写多个
}
queueReciever2
package net.gvsun.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class QueueReceiver2 {
/// <summary>
/// 日志记录
/// </summary>
private static Logger logger = LoggerFactory.getLogger(QueueReceiver2.class.getName());
@JmsListener(destination = "${application.queueDestinationName2}",containerFactory="queueListenerFactory")
public void receiveQueue(String text) {
logger.info("===========队列消费者2:Queue接受的消息为==============:" + text);
}
}
四.调用这些生产者消费者们(controller调用service)
package net.gvsun.consumer;
import org.assertj.core.util.DateUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@EnableScheduling
public class ScheduledTaskMQ {
boolean debug=false;
@Autowired QueueSender queue;
@Autowired
TopicSender topic;
@Scheduled(fixedDelay=50)
public void sendQueueTest() {
String message = "对列:" + new Date();
queue.send1(message);
queue.send2(message);
}
@Scheduled(fixedDelay=50)
public void sendTopicTest() {
String message = "主题:" + new Date();
topic.send1(message);
topic.send2(message);
}
}