一、什么是ActiveMQ
ActiveMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。ActiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。
二、安装ActiveMQ
1、先在官网现在ActiveMQ软件 http://activemq.apache.org/download.html
2、windows系统的直接下载完成之后进行解压,然后在cmd进入activemq bin目录下面的, 根据系统是32位还是64位进入win32或者win64,进去之后可以直接activemq start启动 activemq,启动服务之后可以通过浏览器进行访问http://localhost:8161 里面操作需要登 陆 的用户名和密码都是admin 这样就标识安装成功了。
三、通信模式
1、队列模式(queue,类似QQ私聊)
结构图:
2、主题模式(topic,类似群聊)
3、实现代码(队列模式和主题模式代码在一个类中解决)
因为一般消息提供方和消息接收方属于不同的两个应用所有我们就分别建两个sping-boot工程
1)消息提供者代码目录:
pom.xml需要添加依赖如下需要添加
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>1.4.7.RELEASE</version>
</dependency>
application.yml配置如下:
server:
port: 9090
servlet:
context-path: /producer
spring:
activemq:
user: admin
password: admin
broker-url: tcp://localhost:61616
pool:
max-connections: 10
enabled: true
queueName: publish.queue
topicName: publish.topic
其中server.port是应用所占端口,spring.activemq.boker-url 为链接activeMQ管理的链接默认端口一般都是61616 pool为链接池,此处默认是开启使用的。
ActiveMQConfig.java
@Configuration
public class ActiveMQConfig {
@Value("${queueName}")
private StringqueueName;
@Value("${topicName}")
private StringtopicName;
@Value("${spring.activemq.user}")
private StringuserName;
@Value("${spring.activemq.password}")
private Stringpassword;
@Value("${spring.activemq.broker-url}")
private StringbrokerUrl;
@Bean
public Queue queue(){
return new ActiveMQQueue(queueName);
}
@Bean
public Topic topic(){
return new ActiveMQTopic(topicName);
}
@Bean
public ActiveMQConnectionFactory connectionFactory(){
return new ActiveMQConnectionFactory(userName,password,brokerUrl);
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactoryQueue(ActiveMQConnectionFactory activeMQConnectionFactory){
DefaultJmsListenerContainerFactory bean =new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactoryTopic(ActiveMQConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory bean =new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
PublishController.java
@RestController
@RequestMapping("publish")
public class PublishController {
@Autowired
private JmsMessagingTemplatejmsMessagingTemplate;
@Resource
private Queuequeue;
@Resource
private Topictopic;
@RequestMapping("queue")
public String queue(){
System.out.print(111);
for(int i=0;i<10;i++){
jmsMessagingTemplate.convertAndSend(queue,"queue"+i);
System.out.print(queue+"_"+i+"_已经发送"+"\n");
}
return "queue 发送成功";
}
@RequestMapping("/topic")
public String topic(){
for(int i =0 ;i<10;i++){
jmsMessagingTemplate.convertAndSend(topic,"topic"+i);
System.out.print(topic+"_"+i+"_已经发送"+"\n");
}
return "发送成功";
}
}
2)消息接受者
项目目录结构:
其中application.yml中只需要修改server.port和context-path,其他的和消息提供者一样。
pom.xml里面也跟消息提供者一样添加mq的依赖,ActiveMQConfig可以直接复制到消息接受方的应用中。
消息接受我们这里只要是根据消息监听处理。
QueueListener.java(监听队列模式消息)
@Component
public class QueueListener {
@JmsListener(destination ="publish.queue" ,containerFactory ="jmsListenerContainerFactoryQueue")
@SendTo("out.queue")
public String recevie(String text){
System.out.print(11111);
System.out.print("QueueListener:consumer-a 收到一条消息"+text);
return "consumer-a recevied:"+text;
}
TopicListener.java(监听主题模式消息)
@Component
public class TopicListener {
@JmsListener(destination ="publish.topic",containerFactory ="jmsListenerContainerFactoryTopic")
@SendTo("out.topic")
public void recevie(String text){
System.out.print("TopicListener:consumer-a 收到一条消息"+text);
}
}
测试:
启动两个应用,通过链接127.0.0.1:9090/producer/publish/queue
消息提供方打印日志:
消息接收方日志:
topic模式:127.0.0.1:9090/producer/publish/topic