ActiveMQ 是一个非常强大的开源消息总线,并且极其简单的设计可以轻松应用于项目。以下示例仅用了必要的XML配置,以进行消息交换。
在消息发送/接收端,ActiveMQ 连接工厂须被创建,我们先创建双方公用的工厂。
公共配置 spring-common.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- ActiveMQ 连接工厂 -->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!-- 连接工厂定义 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="activeMQConnectionFactory"/>
</bean>
</beans>
发送方 spring-sender.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="spring-common.xml"/>
<!-- 默认的目的地队列定义 -->
<bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="queue-test"/>
</bean>
<!-- JmsTemplate 定义 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="activeMQQueue"/>
</bean>
<!-- 消息发送者定义 -->
<bean id="messageSender" class="com.caobug.demo.springjmsactivemq.MessageSender">
<constructor-arg index="0" ref="jmsTemplate"/>
</bean>
</beans>
我们需要创建一个类用于消息发送:
package com.caobug.demo.springjmsactivemq;
import org.springframework.jms.core.JmsTemplate;
import java.util.Map;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageSender {
private final JmsTemplate jmsTemplate;
public MessageSender(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sender(final Map<String, Object> map) {
jmsTemplate.convertAndSend(map);
}
}
接收方 spring-receiver.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="spring-common.xml"/>
<!-- 消息接收者定义 -->
<bean id="messageReceiver" class="com.caobug.demo.springjmsactivemq.MessageReceiver"/>
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="queue-test"/>
<property name="messageListener" ref="messageReceiver"/>
</bean>
</beans>
我们还需要创建类用来接收消息:
package com.caobug.demo.springjmsactivemq;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.util.Enumeration;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageReceiver implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof MapMessage) {
final MapMessage mapMessage = (MapMessage) message;
try {
Enumeration mapNames = mapMessage.getMapNames();
while (mapNames.hasMoreElements()) {
String name = (String) mapNames.nextElement();
Object value = mapMessage.getObject(name);
System.out.println("value = " + value);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
System.out.println("已接收消息");
}
}
到此为止我们的代码写完了。以下为测试发送方和消费方,我们需要执行以下三个类的JUNIT测试方法,顺序不分先后。值得注意的是,由于默认配置为点对点,因此消费者不可能会同时接受同一个消息。
测试:发送方
package com.caobug.demo.springjmsactivemq;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageSenderTest {
@Test
public void sender() throws Exception {
ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-sender.xml");
MessageSender messageSender = ctx.getBean("messageSender", MessageSender.class);
for (int i = 0; i < 148; i++) {
Map<String, Object> content = new HashMap<>();
content.put("name", "caobug");
content.put("age", i);
content.put("will", "say hello");
messageSender.sender(content);
TimeUnit.SECONDS.sleep(1);
}
}
}
测试:接收方 A
package com.caobug.demo.springjmsactivemq;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.TimeUnit;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageReceiverATest {
@Test
public void receiverA() throws Exception {
new ClassPathXmlApplicationContext("spring-receiver.xml");
TimeUnit.DAYS.sleep(Integer.MAX_VALUE);
}
}
测试:接收方 B
package com.caobug.demo.springjmsactivemq;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.TimeUnit;
/**
* Created by caobug on 16/6/23.
*
* @author caobug
*/
public class MessageReceiverBTest {
@Test
public void receiverA() throws Exception {
new ClassPathXmlApplicationContext("spring-receiver.xml");
TimeUnit.DAYS.sleep(Integer.MAX_VALUE);
}
}
** 搞定!以上示例编译环境为JDK7,JAR依赖(gradle config)如下 **
dependencies {
compile group: 'org.springframework', name: 'spring-jms', version: '4.2.5.RELEASE'
compile group: 'org.apache.activemq', name: 'activemq-core', version: '5.7.0'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
** 完整项目结构看起来是这样:**