Maven配置
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.8.0</version>
</dependency>
生产者
public class CdProducer {
//默认连接用户名
private static final String USERNAME
= ActiveMQConnection.DEFAULT_USER;
//默认连接密码
private static final String PASSWORD
= ActiveMQConnection.DEFAULT_PASSWORD;
//默认连接地址
private static final String BROKEURL
= "failover://tcp://58.87.114.150:61616";
//发送的消息数量
private static final int SENDNUM = 10;
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session;
Destination destination;
MessageProducer messageProducer;
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("cd.queue,topic://cd.mark");
//destination = session.createTopic("VirtualTopic.vtgroup");
messageProducer = session.createProducer(destination);
for(int i=0;i<SENDNUM;i++){
String msg = "CdProducer "+i+" "+System.currentTimeMillis();
TextMessage message = session.createTextMessage(msg);
System.out.println("发送消息:"+msg);
messageProducer.send(message);
}
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
消费者 queue
public class CdConsumerQueueA {
private static final String USERNAME
= ActiveMQConnection.DEFAULT_USER;//默认连接用户名
private static final String PASSWORD
= ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String BROKEURL
="failover://tcp://58.87.114.150:61616";//默认连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话 接受或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消费者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(CdConsumerQueueA.USERNAME,
CdConsumerQueueA.PASSWORD, CdConsumerQueueA.BROKEURL);
try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个连接HelloWorld的消息队列
//destination = session.createTopic("HelloTopic8");
destination = session.createQueue("cd.queue");
//创建消息消费者
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println("Accept msg : "
+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者 topic
public class CdConsumerTopicA {
private static final String USERNAME
= ActiveMQConnection.DEFAULT_USER;//默认连接用户名
private static final String PASSWORD
= ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String BROKEURL
= ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话 接受或者发送消息的线程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消费者
//实例化连接工厂
connectionFactory = new ActiveMQConnectionFactory(CdConsumerTopicA.USERNAME,
CdConsumerTopicA.PASSWORD, CdConsumerTopicA.BROKEURL);
try {
//通过连接工厂获取连接
connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建一个连接HelloWorld的消息队列
//destination = session.createQueue("Consumer.A.VirtualTopic.vtgroup");
destination = session.createTopic("cd.mark");
//创建消息消费者
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
System.out.println("Accept msg : "
+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}