ActiveMQ-API(四)

创建临时消息
ActiveMQ通过createTemporaryQueue和createTemporaryTopic创建临时目标,这些目标持续到创建它的Connection关闭,只有创建临时目标的Connection所创建的客户端才可以从临时目标中接收消息,但是任何的生产者都可以向临时目标中发送消息。如果关闭了创建此目标的Connection,那么临时目标被关闭,内容也将消失。
TemporaryQueue createTemporaryQueue ();
TemporaryTopic createTemporaryTopic();

发布订阅模式代码实现:

package bhz.mq.pb;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {

    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
    
    //3:Session对象
    private Session session;
    
    //4:生产者
    private MessageProducer messageProducer;
    
    public Publish(){
        try {
            
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.messageProducer = this.session.createProducer(null);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public void sendMessage() throws Exception{
        Destination destination = this.session.createTopic("topic1");
        TextMessage t = session.createTextMessage("我是内容");
        this.messageProducer.send(destination, t);
    }
    
    public static void main(String[] args) throws Exception {
        Publish p = new Publish();
        p.sendMessage();
    }
    
}

package bhz.mq.pb;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer1 {

    
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public Consumer1(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.destination=this.session.createTopic("topic1");
            this.messageConsumer = this.session.createConsumer(this.destination);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    System.out.println("c1收到消息");
                    TextMessage m = (TextMessage) message;
                    System.out.println(m.getText());
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        Consumer1 c = new Consumer1();
        c.receiver();
    }
    
}

package bhz.mq.pb;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer2 {

    
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public Consumer2(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            this.destination=this.session.createTopic("topic1");
            this.messageConsumer = this.session.createConsumer(this.destination);
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    System.out.println("c2收到消息");
                    TextMessage m = (TextMessage) message;
                    System.out.println(m.getText());
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        Consumer2 c = new Consumer2();
        c.receiver();
    }
    
}

package bhz.mq.pb;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer3 {

    
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public Consumer3(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            this.destination=this.session.createTopic("topic1");
            this.messageConsumer = this.session.createConsumer(this.destination);
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    System.out.println("c3收到消息");
                    TextMessage m = (TextMessage) message;
                    System.out.println(m.getText());
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        Consumer3 c = new Consumer3();
        c.receiver();
    }
    
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,981评论 19 139
  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 1,515评论 0 11
  • 1、前言 之前我们通过两篇文章(架构设计:系统间通信(19)——MQ:消息协议(上)、架构设计:系统间通信(20)...
    境里婆娑阅读 1,914评论 0 4
  • 豆瓣评分:8.6 主要内容:日本著名作家东野圭吾的《解忧杂货店》,出版当年即获中央公论文艺奖。作品超越推理小说的范...
    采采二小乙阅读 481评论 0 1
  • 2014年春节回家,一到村口,就看到马路旁边农田里的一个坟头,鲜艳的花圈在一片萧瑟的寒冬里格外扎眼,显然,此人过世...
    蛋仔美美阅读 220评论 0 1