一丶ActiveMQ详解
什么是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
- 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
- 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
- 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支持通过JDBC和journal提供高速的消息持久化
- 从设计上保证了高性能的集群,客户端-服务器,点对点
- 支持Ajax
- 支持与Axis的整合
- 可以很容易得调用内嵌JMS provider,进行测试
ActiveMQ的消息形式
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
• StreamMessage -- Java原始值的数据流
• MapMessage--一套名称-值对
• TextMessage--一个字符串对象
• ObjectMessage--一个序列化的 Java对象
• BytesMessage--一个字节的数据流
ActiveMQ的安装
环境要求
- VMware Linux CentOS-6.5。
- JDK 1.7.0_07
- apache-activemq-5.12.0-bin.tar.gz
- 上传工具FileZilla Client
第一步把ActiveMQ 的压缩包上传到Linux系统并解压。
第二步将ActiveMQ的目录移动到合适的目录并进入到bin目录启动ActiveMQ
启动ActiveMQ
[root@cehae bin]# ./activemq start
查看ActiveMQ状态
[root@cehae bin]# ./activemq status
关闭ActiveMQ
[root@cehae bin]# ./activemq stop
第三步进入管理后台
在windows浏览器中输入http://192.168.25.200:8161/admin
输入账户和密码 admin后看到管理后台代表成功。
ActiveMQ的使用
ActiveMQ两种消息形式结构图
使用生产者发布Queue消息
创建maven工程,把jar包添加到工程中。使用5.11.2版本的jar包。
注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2。因为5.11.2版本的ActiveMQ会有Spring的源码,会有冲突。
使用生产者发布Queue消息
@Test
public void testQueueProducer() throws Exception {
// 1创建一个连接工厂,需要指定如果服务的ip和端口
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.200:61616");
// 2使用工厂创建一个Connection对象,
Connection connection = factory.createConnection();
// 3开启连接,调用Connection对象的start方法
connection.start();
// 4创建一个Session对象
// 参数1:是否开启事务,一般不开启事务。如果开启事务自定忽略第二个参数。
// 参数2:应答模式。自动应答和手动应答,一般使用自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5使用Session对象创建一个Destination(接口)对象, 两种形式Queue和Topic,现在应该使用queue.
Queue queue = session.createQueue("test-Queue");
// 6使用Session对象创建一个Producer对象
MessageProducer producer = session.createProducer(queue);
// 7创建一个Messaeg对象,可以使用TextMessage。
TextMessage textMessage = session.createTextMessage("hello,ActiveMQ-Queue");
// 8发送消息
producer.send(textMessage);
// 9关闭资源
producer.close();
session.close();
connection.close();
}
使用消费者接收Queue消息
@Test
public void testQueueConsumer() throws Exception {
// 1创建一个连接工厂连接MQ服务器。
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.200:61616");
// 2创建一个连接
Connection connection = factory.createConnection();
// 3开启连接
connection.start();
// 4使用Connection创建一个Session对象
// 参数1:是否开启事务,一般不开启事务。如果开启事务自定忽略第二个参数。
// 参数2:应答模式。自动应答和手动应答,一般使用自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5使用Session对象创建一个Destination(接口)对象, 两种形式Queue和Topic,现在应该使用queue.
Queue queue = session.createQueue("test-Queue"); // 注意要和发布消息的在同一个队列
// 6使用Session对象创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(queue);
// 7使用consumer接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage txMessage = (TextMessage) message;
String text = "";
try {
text = txMessage.getText();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("message = " + text);
}
});
// 8等待接收消息 ,目的是阻塞当前线程,等待接收消息
System.in.read();
// 9关闭资源
consumer.close();
session.close();
connection.close();
}
测试结果
使用生产者发布Topic消息
使用生产者发布Topic消息
@Test
public void testTopicProducer() throws Exception {
// 1创建一个连接工厂,需要指定如果服务的ip和端口
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.200:61616");
// 2使用工厂创建一个Connection对象,
Connection connection = factory.createConnection();
// 3开启连接,调用Connection对象的start方法
connection.start();
// 4创建一个Session对象
// 参数1:是否开启事务,一般不开启事务。如果开启事务自定忽略第二个参数。
// 参数2:应答模式。自动应答和手动应答,一般使用自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5使用Session对象创建一个Destination(接口)对象, 两种形式Queue和Topic,现在应该使用Topic.
Topic topic = session.createTopic("test-Topic");
// 6使用Session对象创建一个Producer对象
MessageProducer producer = session.createProducer(topic);
// 7创建一个Messaeg对象,可以使用TextMessage。
TextMessage textMessage = session.createTextMessage("hello,ActiveMQ-Topic");
// 8发送消息
producer.send(textMessage);
// 9关闭资源
producer.close();
session.close();
connection.close();
}
使用消费者接收Topic消息
@Test
public void testTopicConsumer() throws Exception {
// 1创建一个连接工厂连接MQ服务器。
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.200:61616");
// 2创建一个连接
Connection connection = factory.createConnection();
// 3开启连接
connection.start();
// 4使用Connection创建一个Session对象
// 参数1:是否开启事务,一般不开启事务。如果开启事务自定忽略第二个参数。
// 参数2:应答模式。自动应答和手动应答,一般使用自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5使用Session对象创建一个Destination(接口)对象, 两种形式Queue和Topic,现在应该使用Topic.
Topic topic = session.createTopic("test-Topic");
// 6使用Session对象创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(topic);
// 7使用consumer接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage txMessage = (TextMessage) message;
String text = "";
try {
text = txMessage.getText();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("message = " + text);
}
});
// 8等待接收消息 ,目的是阻塞当前线程
System.in.read();
// 9关闭资源
consumer.close();
session.close();
connection.close();
}
测试结果
注意
Queue形式的消息(点到点):发送完消息如果没有消费者接收会保存到(队列中)服务器,自动持久化。
Topic形式的消息(广播):发送完消息如果没有消费者接收会被MQ删除,也就是无法持久化。如果广播形式也想持久化,需要在客户顿启动的时候设置客户端id也就是订阅消息,这样MQ会保存广播的消息。
二丶Spring整合ActiveMQ
2-1丶Queue模式
创建消息生产者工程springjms_producer
引入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cehae.demo</groupId>
<artifactId>springjms_producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<spring.version>4.2.4.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
</project>
添加配置文件applicationContext-jms-producer_queue.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="com.cehae.demo.queen"></context:component-scan>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.200:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 文本信息 -->
<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue_text" />
</bean>
</beans>
编写消息生产者QueueProducer
package com.cehae.demo.queen;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
@Component
public class QueueProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination queueTextDestination;
/**
* 发送文本消息
*
* @param text
*/
public void sendTextMessage(final String text) {
jmsTemplate.send(queueTextDestination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
}
编写测试类TestQueue.java
package com.cehae.test.queue;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.cehae.demo.queen.QueueProducer;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/applicationContext-jms-producer_queue.xml")
public class TestQueue {
@Autowired
private QueueProducer queueProducer;
@Test
public void testSend() {
queueProducer.sendTextMessage("SpringJms-queue");
}
}
创建消息消费者工程springjms_consumer
引入依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cehae.demo</groupId>
<artifactId>springjms_consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<spring.version>4.2.4.RELEASE</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
</project>
添加配置文件applicationContext-jms-consumer-queue.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.200:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 文本信息 -->
<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue_text" />
</bean>
<!-- 我的监听类 -->
<bean id="myQueueMessageListener" class="com.cehae.demo.queue.MyQueueMessageListener"></bean>
<!-- 消息监听容器 -->
<bean
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueTextDestination" />
<property name="messageListener" ref="myQueueMessageListener" />
</bean>
</beans>
编写queue消息监听者MyQueueMessageListener.java
package com.cehae.demo.queue;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyQueueMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
编写测试类TestQueue.java
package com.cehae.test.queue;
import java.io.IOException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/applicationContext-jms-consumer-queue.xml")
public class TestQueue {
@Test
public void testQueue() {
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
queue方式测试结果
2-2丶Topic模式
在springjms_producer工程添加配置文件applicationContext-jms-producer_topic.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="com.cehae.demo.topic"></context:component-scan>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.200:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory" />
</bean>
<!--这个是订阅模式 文本信息 -->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic_text" />
</bean>
</beans>
编写topic消息生成者TopicProducer.java
package com.cehae.demo.topic;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
@Component
public class TopicProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination topicTextDestination;
/**
* 发送文本消息
*
* @param text
*/
public void sendTextMessage(final String text) {
jmsTemplate.send(topicTextDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
}
编写测试类TestTopic.java
package com.cehae.test.topic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.cehae.demo.topic.TopicProducer;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/applicationContext-jms-producer_topic.xml")
public class TestTopic {
@Autowired
private TopicProducer topicProducer;
@Test
public void sendTextQueue() {
topicProducer.sendTextMessage("SpringJms-topic");
}
}
在springjms_consumer工程添加配置文件applicationContext-jms-consumer-topic.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.25.200:61616" />
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--这个是队列目的地,点对点的 文本信息 -->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic_text" />
</bean>
<!-- 我的监听类 -->
<bean id="myTopicMessageListener" class="com.cehae.demo.topic.MyTopicMessageListener"></bean>
<!-- 消息监听容器 -->
<bean
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicTextDestination" />
<property name="messageListener" ref="myTopicMessageListener" />
</bean>
</beans>
编写topic消息消费者MyTopicMessageListener.java
package com.cehae.demo.topic;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyTopicMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
编写测试类TestTopic.java
package com.cehae.test.topic;
import java.io.IOException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/applicationContext-jms-consumer-topic.xml")
public class TestTopic {
@Test
public void testQueue() {
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
topic方式测试结果