ActiveMq是一个apche开源的,基于生产者(producer)-消费者(consumer)模型的消息中间件,通常用于系统间的消息传递。生产者产生消息,将消息发送至消息服务器;消费者通过监听消息服务器中指定的消息进行消费(获取并使用)。
它支持一对一(point-to-point)队列式的消息和一对多(publish/subscribe)广播式的消息。参见下图:
producer_consumer.PNG
- setp 1:下载ActiveMq,在远程或者本地,启动ActiveMq服务(需要jre环境)。
通过访问ip + 8161端口可以访问消息服务器的后台管理页面
- setp 2:引入jar包依赖
<dependencies>
<!--ActiveMq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
<!--单元测试-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
- setp 3:创建生产者
@SuppressWarnings({"Duplicates", "UnusedAssignment"})
public class Producer {
@Test
public void produceMessage() {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
// 创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.25.128:61616");
// 获得连接
connection = factory.createConnection();
// 开启连接
connection.start();
// 创建会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建队列(点对点),指定队列名称;当消费者监听该队列,消息才能被消费
Queue raw_mq_queue = session.createQueue("RAW_MQ_QUEUE");
// 创建提供者
producer = session.createProducer(raw_mq_queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置消息持久化
// 创建消息,指定消息内容
// TextMessage textMessage = session.createTextMessage("the textMessage content");
TextMessage textMessage = session.createTextMessage();
textMessage.setText("message test");
// 发送消息
producer.send(textMessage);
} catch (JMSException e) {
e.printStackTrace();
} finally {
// 释放资源(producer,session,connection),略
}
}
}
- step 4 :创建消费者
@SuppressWarnings({"Duplicates", "UnusedAssignment"})
public class Consumer {
@Test
public void consumeMessage() {
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
// 创建连接工厂
String username = "admin";
String password = "admin";
String url = "tcp://192.168.25.128:61616";
ConnectionFactory factory = new ActiveMQConnectionFactory(url);
// 创建连接
connection = factory.createConnection(username, password);
// 开启连接
connection.start();
// 获得会话
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,Destination是Queue和Topic的父接口
Destination destination = session.createQueue("RAW_MQ_QUEUE");// 指定监听的队列
// 创建消费者
messageConsumer = session.createConsumer(destination);
// 设置消息监听
/**不使用匿名内部类也可以单独创建一个类,实现MessageListener接口,重写onMessage方法*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("consumer has recieved the message:" + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();// 只是为了不让消费线程死亡,可以持续监听消息。
} catch (Exception e) {
e.printStackTrace();
}finally {
// 释放资源
}
}
}
至此,简单的案例就完成了。