创建临时消息
ActiveMQ通过createTemporaryQueue和createTemporaryTopic创建临时目标,这些目标持续到创建它的Connection关闭,只有创建临时目标的Connection所创建的客户端才可以从临时目标中接收消息,但是任何的生产者都可以向临时目标中发送消息。如果关闭了创建此目标的Connection,那么临时目标被关闭,内容也将消失。
TemporaryQueue createTemporaryQueue ();
TemporaryTopic createTemporaryTopic();
发布订阅模式代码实现:
package bhz.mq.pb;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Publish {
private ConnectionFactory connectionFactory;
//2:连接对象
private Connection connection;
//3:Session对象
private Session session;
//4:生产者
private MessageProducer messageProducer;
public Publish(){
try {
this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
"tcp://localhost:61616");
this.connection = this.connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
this.messageProducer = this.session.createProducer(null);
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendMessage() throws Exception{
Destination destination = this.session.createTopic("topic1");
TextMessage t = session.createTextMessage("我是内容");
this.messageProducer.send(destination, t);
}
public static void main(String[] args) throws Exception {
Publish p = new Publish();
p.sendMessage();
}
}
package bhz.mq.pb;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer1 {
//1:连接工厂
private ConnectionFactory connectionFactory;
//2:连接对象
private Connection connection;
//3:Session对象
private Session session;
//4:消费者
private MessageConsumer messageConsumer;
//5:目标地址
private Destination destination;
public Consumer1(){
try {
this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
"tcp://localhost:61616");
this.connection = this.connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
this.destination=this.session.createTopic("topic1");
this.messageConsumer = this.session.createConsumer(this.destination);
} catch (Exception e) {
e.printStackTrace();
}
}
public void receiver(){
try {
this.messageConsumer.setMessageListener(new Listener());
} catch (Exception e) {
e.printStackTrace();
}
}
class Listener implements MessageListener{
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.println("c1收到消息");
TextMessage m = (TextMessage) message;
System.out.println(m.getText());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Consumer1 c = new Consumer1();
c.receiver();
}
}
package bhz.mq.pb;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer2 {
//1:连接工厂
private ConnectionFactory connectionFactory;
//2:连接对象
private Connection connection;
//3:Session对象
private Session session;
//4:消费者
private MessageConsumer messageConsumer;
//5:目标地址
private Destination destination;
public Consumer2(){
try {
this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
"tcp://localhost:61616");
this.connection = this.connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
e.printStackTrace();
}
}
public void receiver(){
try {
this.destination=this.session.createTopic("topic1");
this.messageConsumer = this.session.createConsumer(this.destination);
this.messageConsumer.setMessageListener(new Listener());
} catch (Exception e) {
e.printStackTrace();
}
}
class Listener implements MessageListener{
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.println("c2收到消息");
TextMessage m = (TextMessage) message;
System.out.println(m.getText());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Consumer2 c = new Consumer2();
c.receiver();
}
}
package bhz.mq.pb;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer3 {
//1:连接工厂
private ConnectionFactory connectionFactory;
//2:连接对象
private Connection connection;
//3:Session对象
private Session session;
//4:消费者
private MessageConsumer messageConsumer;
//5:目标地址
private Destination destination;
public Consumer3(){
try {
this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
"tcp://localhost:61616");
this.connection = this.connectionFactory.createConnection();
this.connection.start();
this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
e.printStackTrace();
}
}
public void receiver(){
try {
this.destination=this.session.createTopic("topic1");
this.messageConsumer = this.session.createConsumer(this.destination);
this.messageConsumer.setMessageListener(new Listener());
} catch (Exception e) {
e.printStackTrace();
}
}
class Listener implements MessageListener{
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
System.out.println("c3收到消息");
TextMessage m = (TextMessage) message;
System.out.println(m.getText());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Consumer3 c = new Consumer3();
c.receiver();
}
}