一.P2P开发
1.TestProvider.java提供者
package nz.study.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import javax.jms.*;
public class TestProvider {
@Test
public void testProvider(){
//用缺省的用户名,密码和url,创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
try{
//通过连接工厂创建连接对象
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//通过连接对象创建会话对象,第一个参数为书否开启事务
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//通过session创建队列对象
Queue queue = session.createQueue("helloqueue");
MessageProducer producer = session.createProducer(queue);
sendMessage(session,producer);
//提交信息
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
//发送消息的方法
private void sendMessage(Session session, MessageProducer producer) {
try{
for (int i = 0; i < 20; i++) {
TextMessage msg = session.createTextMessage("hello nz1903");
System.out.println("active mq 发送消息" + i);
producer.send(msg);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2.TestConsumer.java消费者
package nz.study.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import javax.jms.*;
public class TestConsumer {
@Test
public void testConsumer(){
//用缺省的用户名,密码和url,创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
try{
//通过连接工厂创建连接对象
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//通过连接对象创建会话对象,第一个参数为书否开启事务
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
//通过session创建队列对象
Queue queue = session.createQueue("helloqueue");
MessageConsumer consumer = session.createConsumer(queue);
while (true){
TextMessage msg = (TextMessage)consumer.receive(10000);
if(msg != null){
System.out.println("接收到的信息为" + msg.getText());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
二.PubSub开发(publish/Subscribe发布订阅开发)
1.TestTopicProvider.java发布消息
package nz.study.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import javax.jms.*;
public class TestTopicProvider {
@Test
public void testTopicProvider(){
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
try{
//通过连接工厂创建连接对象
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("hellotopic");
MessageProducer producer = session.createProducer(topic);
sendMessage(session,producer);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
private void sendMessage(Session session, MessageProducer producer) {
try{
for (int i = 0; i < 20; i++) {
TextMessage msg = session.createTextMessage("activeMQ 发送消息" + i);
System.out.println("activeMQ发送消息" + i);
producer.send(msg);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
2.TestTopicConsumer.java消息订阅
package nz.study.test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import javax.jms.*;
public class TestTopicConsumer {
@Test
public void testTopicConsumer(){
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
try{
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("hellotopic");
MessageConsumer consumer = session.createConsumer(topic);
while (true){
/**
* receive()
* 无参代表长连接
* 带参代表时间到了之后就结束
*/
TextMessage msg = (TextMessage) consumer.receive(10000);
if(msg != null){
System.out.println("接收到的消息:" + msg.getText());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
三.总结
1.P2P的模式相当于一个消息生产者一个消费者,PubSub中可以有一个消息发布者多个消息订阅者,开发与P2P几乎一样,只是将createQueue(创建消息队列)改为了createTopic(创建主题);生产消息改为了发布消息;接收消息改为了订阅消息.
2.P2P模式时需要先启动生产者Provider类,再去启动消费者Consumer类,也就是先有生产者生产消息,再由消费者去消费消息.
3.PubSub模式是先启动订阅消息Consumer类,再去启动发布消息Provider类,Consumer需要去订阅发布的消息,如果先发布消息,再去订阅,则无法捕获到发布的消息,所以需要先启动Consumer类去订阅,然后去发布消息.