1、简介。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。
⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通过JDBC和journal提供高速的消息持久化
⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点
⒏ 支持Ajax
2、ActiveMQ的消息形式
ActiveMQ对于消息的传递有两种类型:
1). 一种是点对点的,即一个生产者和一个消费者一一对应。
2).发布订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接受。
3).JMS定义 了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接受以一些不同的形式的数据,提供现有消息格式的一些级别的兼容性。
- StreamMessge : Java原始值的数据流
- MapMessage : 一套名称一对值
- TextMessage : 一个字符串对象
- ObjectMessage : 一个序列化的Java对象
- ByteMessage : 一个字节的数据流
3、ActiveMQ的使用方法
1).queue 生产者消费者一对一
/**
* Producer
* 生产者
*/
@Test
public void testQueueProducer() throws Exception {
//1.创建一个连接工厂对象ConnectionFactory对象。需要指定mq服务的ip及端口
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
//2.使用ConnectionFactory创建一个连接Connection对象
Connection connection = connectionFactory.createConnection();
//3.开启连接。调用Connection对象的start方法
connection.start();
//4.使用Connection对象创建一个Session对象
//第一个参数是是否开启事务,一般不使用事务。保证数据的最终一致,可以使用消息队列实现。
//如果第一个参数为true,第二个参数自动忽略。如果不开启事务false,第二个参数为消息的应答模式。一般自动应答就可以。
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.使用Session对象创建一个Destination对象,两种形式queue、topic。现在应该使用queue
//参数就是消息队列的名称
Queue queue = session.createQueue("test-queue");
//6.使用Session对象创建一个Producer对象
MessageProducer producer = session.createProducer(queue);
//7.创建一个TextMessage对象
/*TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("hello activemq");*/
TextMessage textMessage = session.createTextMessage("hello activemq1111");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
/* *
* 消费者
* Consumer
*/
@Test
public void testQueueConsumer() throws Exception {
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
//使用连接工厂对象创建一个连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用连接对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session创建一个Destination,Destination应该和消息的发送端一致。
Queue queue = session.createQueue("test-queue");
//使用Session创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(queue);
//向Consumer对象中设置一个MessageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//取消息的内容
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
//打印消息内容
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//系统等待接收消息
/*while(true) {
Thread.sleep(100);
}*/
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
2).topic 生产者消费者一对多(广播模式)
/**
* Producer
* 生产者
*/
@Test
public void testTopicProducer() throws Exception {
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建Destination,应该使用topic
Topic topic = session.createTopic("test-topic");
//创建一个Producer对象
MessageProducer producer = session.createProducer(topic);
//创建一个TextMessage对象
TextMessage textMessage = session.createTextMessage("hello activemq topic");
//发送消息
producer.send(textMessage);
//关闭资源
producer.close();
session.close();
connection.close();
}
/**
* Consumer
* 消费者
*/
@Test
public void testTopicConsumser() throws Exception {
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
//使用连接工厂对象创建一个连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用连接对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session创建一个Destination,Destination应该和消息的发送端一致。
Topic topic = session.createTopic("test-topic");
//使用Session创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(topic);
//向Consumer对象中设置一个MessageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//取消息的内容
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
//打印消息内容
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
//系统等待接收消息
/*while(true) {
Thread.sleep(100);
}*/
System.out.println("topic消费者1.。。。");
System.out.println("topic消费者2.。。。");
System.out.println("topic消费者3.。。。");
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
3、ActiveMQ整合Spring
1).因为整合Spring需要用到JMS中的jmsTemplate,所以要加入Spring的jar包以及jsmTemplate的Jar包
...
2).Spring配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
<!-- JMS服务厂商提供的ConnectionFactory -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg name="brokerURL" value="tcp://192.168.25.168:61616"/>
</bean>
<!-- spring对象ConnectionFactory的封装 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
</bean>
<!-- 配置JMSTemplate -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 配置消息的Destination对象 -->
<bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="test-queue"></constructor-arg>
</bean>
<bean id="itemAddtopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg name="name" value="item-add-topic"></constructor-arg>
</bean>
<!-- 配置消息的接收者 -->
<bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="test-queue" />
<property name="messageListener" ref="myMessageListener" />
</bean>
<bean id="itemAddMessageListener" class="com.taotao.search.listener.ItemAddMessageListener"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="itemAddTopic" />
<property name="messageListener" ref="itemAddMessageListener" />
</bean>
</beans>
3).编写监听ActiveMQ的类MyMessageListener、需要实现MessageListener接口,完成后将这个Bean配置到Spring容器中
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.xml.soap.Text;
/**
* 接收Activemq发送的消息
* <p>Title: MyMessageListener</p>
* <p>Description: </p>
* @version 1.0
*/
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
//接收到消息
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(text);
} catch (Exception e) {
e.printStackTrace();
}
}
}