1.2-ActiveMQ

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>

一.队列模式:

public class AppProducer {
//    队列服务器的地址
    private static final String url = "tcp://你的服务器地址:61616";
//    消息队列的名字
    private static final String queueName="queue-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.创建连接Connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话,这里是自动应答模式,第一个参数是是否在实务中处理
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列模式目标
        //队列模式和主题模式的区别在这里
        Destination destination = session.createQueue(queueName);
        //6.创建生产者
        MessageProducer producer = session.createProducer(destination);

        for (int i=0; i<100; i++) {
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            //8.发布消息
            producer.send(textMessage);
            System.out.println("消息发送成功" + textMessage.getText());
        }
        //9.关闭连接
        connection.close();
    }
}
public class AppConsumer {
    //    队列服务器的地址
    private static final String url = "tcp://你的服务器地址:61616";
    //    消息队列的名字
    private static final String queueName="queue-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.创建连接Connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话,这里是自动应答模式,第一个参数是是否在实务中处理
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列模式目标
        //队列模式和主题模式的区别在这里
        Destination destination = session.createQueue(queueName);
        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        //8.关闭连接,消息监听是异步过程,不能关闭连接
//        connection.close();
    }
}

二.主题模式

public class AppProducer {
//    队列服务器的地址
    private static final String url = "tcp://你的服务器:61616";
//    消息队列的名字
    private static final String topicName="topic-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.创建连接Connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话,这里是自动应答模式,第一个参数是是否在实务中处理
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建目标
        Destination destination = session.createTopic(topicName);
        //6.创建生产者
        MessageProducer producer = session.createProducer(destination);

        for (int i=0; i<100; i++) {
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("test" + i);
            //8.发布消息
            producer.send(textMessage);
            System.out.println("消息发送成功" + textMessage.getText());
        }
        //9.关闭连接
        connection.close();
    }
}
public class AppConsumer {
    //    队列服务器的地址
    private static final String url = "tcp://你的服务器:61616";
    //    消息队列的名字
    private static final String topicName="topic-test";

    public static void main(String[] args) throws JMSException {
        //1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.创建连接Connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建会话,这里是自动应答模式,第一个参数是是否在实务中处理
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建目标
        Destination destination = session.createTopic(topicName);
        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接收消息" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

        //8.关闭连接,消息监听是异步过程,不能关闭连接
//        connection.close();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。