我的样例工程:D:\springboot\demo-mq-maven
maven依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
点对点
Producer 生产者
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
Consumer
消费者有两种消费方法::
1、同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
2、异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。
实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。
本例演示异步消费
异步消费
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
点对点示例
package com.neuedu.test;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
public class TestQueue {
@Test
public void QueueProduct()
{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
Connection conn = connectionFactory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = session.createQueue("test-queue");
MessageProducer p = session.createProducer(q);
TextMessage msg = session.createTextMessage("hello world");
p.send(msg);
p.close();
session.close();
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void QueueConsumer()
{
//异步消费
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
Connection conn = connectionFactory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = session.createQueue("test-queue");
MessageConsumer c = session.createConsumer(q);
c.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message arg0) {
TextMessage m = (TextMessage)arg0;
try {
String tm = m.getText();
System.out.println(tm);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
try {
System.in.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
c.close();
session.close();
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Topic
Producer
Prod使用步骤:
第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象创建一个Connection对象。
第三步:开启连接,调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Topic对象。
第六步:使用Session对象创建一个Producer对象。
第七步:创建一个Message对象,创建一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
Consumer
消费者:接收消息。
第一步:创建一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中获得一个Connection对象。
第三步:开启连接。调用Connection对象的start方法。
第四步:使用Connection对象创建一个Session对象。
第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
第六步:使用Session对象创建一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
订阅/发布
package com.neuedu.test;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
public class TestTopic {
@Test
public void TopicProduct()
{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
Connection conn = connectionFactory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = session.createTopic("topic-test");
MessageProducer p = session.createProducer(t);
TextMessage msg = session.createTextMessage("hello topic11111");
p.send(msg);
p.close();
session.close();
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void TopicConsumer()
{
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
Connection conn = connectionFactory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = session.createTopic("topic-test");
MessageConsumer c = session.createConsumer(t);
c.setMessageListener(new MessageListener(){
@Override
public void onMessage(Message arg0) {
TextMessage m = (TextMessage)arg0;
try {
String tm = m.getText();
System.out.println(tm);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
try {
System.in.read();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
c.close();
session.close();
conn.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}