<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
一.队列模式:
public class AppProducer {
// 队列服务器的地址
private static final String url = "tcp://你的服务器地址:61616";
// 消息队列的名字
private static final String queueName="queue-test";
public static void main(String[] args) throws JMSException {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.创建连接Connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话,这里是自动应答模式,第一个参数是是否在实务中处理
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列模式目标
//队列模式和主题模式的区别在这里
Destination destination = session.createQueue(queueName);
//6.创建生产者
MessageProducer producer = session.createProducer(destination);
for (int i=0; i<100; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("test" + i);
//8.发布消息
producer.send(textMessage);
System.out.println("消息发送成功" + textMessage.getText());
}
//9.关闭连接
connection.close();
}
}
public class AppConsumer {
// 队列服务器的地址
private static final String url = "tcp://你的服务器地址:61616";
// 消息队列的名字
private static final String queueName="queue-test";
public static void main(String[] args) throws JMSException {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.创建连接Connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话,这里是自动应答模式,第一个参数是是否在实务中处理
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列模式目标
//队列模式和主题模式的区别在这里
Destination destination = session.createQueue(queueName);
//6.创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.关闭连接,消息监听是异步过程,不能关闭连接
// connection.close();
}
}
二.主题模式
public class AppProducer {
// 队列服务器的地址
private static final String url = "tcp://你的服务器:61616";
// 消息队列的名字
private static final String topicName="topic-test";
public static void main(String[] args) throws JMSException {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.创建连接Connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话,这里是自动应答模式,第一个参数是是否在实务中处理
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建目标
Destination destination = session.createTopic(topicName);
//6.创建生产者
MessageProducer producer = session.createProducer(destination);
for (int i=0; i<100; i++) {
//7.创建消息
TextMessage textMessage = session.createTextMessage("test" + i);
//8.发布消息
producer.send(textMessage);
System.out.println("消息发送成功" + textMessage.getText());
}
//9.关闭连接
connection.close();
}
}
public class AppConsumer {
// 队列服务器的地址
private static final String url = "tcp://你的服务器:61616";
// 消息队列的名字
private static final String topicName="topic-test";
public static void main(String[] args) throws JMSException {
//1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2.创建连接Connection
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话,这里是自动应答模式,第一个参数是是否在实务中处理
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建目标
Destination destination = session.createTopic(topicName);
//6.创建消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.创建一个监听器
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//8.关闭连接,消息监听是异步过程,不能关闭连接
// connection.close();
}
}