activeMQ-03Java实现通讯(队列模式QUEUE)

首先看activeMQ通讯中的点对点模式——队列模式。特点:
1)每一个消息只能有一个消费者,一对一的关系
2)生产者和消费者没有时间上的相关性
3)消息被消费后队列中不会再存储,不会再次被消费掉。

  • 简易图如下:


    activemq.png
  • 具体实现看如下代码:
    生产者:
package com.jjclub.activeMQ_01;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

//生产者01
public class Producer01 {
    //服务器地址
    private static String url = "tcp://localhost:61616";
    //队列名称
    private static String queueName="queue1";
    public static void main(String[] args) {
        //创建activemq连接工场
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = null;
        Session session =null;
        MessageProducer messageProducer =null;
        try {
            //创建连接connection
            connection = activeMQConnectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建消息目的地队列(队列名称)
            Queue queue = session.createQueue(queueName);
            //创建生产者
            messageProducer = session.createProducer(queue);
            //创建消息体(文本消息内容)
            TextMessage textMessage = session.createTextMessage("hello");
            //发送消息到队列中
            messageProducer.send(textMessage);
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            try {
                //关闭消息
                messageProducer.close();
                session.close();
                connection.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }   
    }
}

消费者:
1)receive方式

package com.jjclub.activeMQ_01;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

//消费者
public class Consumer01 {
        //服务器地址
        private static String url = "tcp://localhost:61616";
        //队列名称
        private static String queueName="queue1";
        
        public static void main(String[] args) {
            //创建activemq连接工场
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
            Connection connection = null;
            Session session =null;
            MessageConsumer messageConsumer =null;
            try {
                //创建连接connection
                connection = activeMQConnectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建消息目的地队列(队列名称)
                Queue queue = session.createQueue(queueName);
                //创建消费者
                messageConsumer = session.createConsumer(queue);
                while(true) {
                    // messageConsumer.receive();此方法会一直等待消息,不会中止进程
                    // messageConsumer.receive(4000L);等待4s后,若无消息,则中止进程,不再等待
                    Message message = messageConsumer.receive();
                    if(message!=null) {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("消费的消息是"+textMessage);
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }finally {
                try {
                    //关闭消息
                    messageConsumer.close();
                    session.close();
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
}

2)监听方式

package com.jjclub.activeMQ_01;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

//消费者
public class Consumer01_1 {
        //服务器地址
        private static String url = "tcp://localhost:61616";
        //队列名称
        private static String queueName="queue1";
        
        public static void main(String[] args) {
            //创建activemq连接工场
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
            Connection connection = null;
            Session session =null;
            MessageConsumer messageConsumer =null;
            try {
                //创建连接connection
                connection = activeMQConnectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建连接session;第一个参数为事务,第二个参数为签发机制。暂时选择默认,后续说明;
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建消息目的地队列(队列名称)
                Queue queue = session.createQueue(queueName);
                //创建消费者
                messageConsumer = session.createConsumer(queue);
                //此方式在进程不停止情况下,会一直监听消息,一旦有消息就执行消费
                messageConsumer.setMessageListener(new MessageListener() {
                    public void onMessage(Message message) {
                        if(message!=null) {
                            TextMessage textMessage = (TextMessage)message;
                            System.out.println("消费消息为"+textMessage);
                        }
                    }
                });
                //保证控制台不灭,进程不停止。
                System.in.read();
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }finally {
                try {
                    //关闭消息
                    messageConsumer.close();
                    session.close();
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 简介 ActiveMQ 特点 ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高...
    预流阅读 11,133评论 4 21
  • 什么是activeMQ activeMQ是一种开源的,实现了JMS1.1规范的,面向消息(MOM)的中间件,为应用...
    赵铁柱啊阅读 5,974评论 1 6
  • 概述 ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持...
    闽越布衣阅读 12,464评论 0 11
  • 1 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,...
    Bobby0322阅读 13,711评论 0 24
  • 以诗之名 文/静怡的兰苑 我想为你写首诗 这个念头放在心里很久了 可是我的笔尖太轻了 轻的不足以承载这份厚重 但我...
    静怡的兰苑阅读 8,138评论 59 66