JMS消息队列--待续

JMS(消息中间件)

1 消息中间件

消息中间件适用于需要可靠的数据传送的分布式环境。采用消息中间件机制的系统中,不同的对象之间通过传递消息来激活对方的事件,完成相应的操作。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。消息中间件能在不同平台之间通信,它常被用来屏蔽掉各种平台及协议之间的特性,实现应用程序之间的协同,其优点在于能够在客户和服务器之间提供同步和异步的连接,并且在任何时刻都可以将消息进行传送或者存储转发,这也是它比远程过程调用更进一步的原因。

1572359464129.png

MQ采用异步调动的方式,解除耦合(程序不启动也能运行),提高了运行性能(不用担心处理时间,处理时间的长短不影响程序性能,

常用的消息中间件:

  • ActiveMQ

    是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和JSEE1.4规范的JMS Provider实现。我们在本次课程中介绍ActiveMQ的使用。

  • RabbitMQ

    AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。

  • ZeroMQ

    史上最快的消息队列系统

  • Kafka

    Apache下的一个子项目。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,适合处理海量数据。

2 简介

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS本身只定义了一系列的接口规范,是一种与厂家无关的API,用来访问消息收发系统。它类似于JDBC,JDBC是可以用来访问许多不同关系数据库API_(:з」∠) _ ,而JMS则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持JMS。

JMS定义了五种不同的消息格式,以及调用的消息类型,允许你发送并接收一些不同类型的数据,提供现有消息格式的一些级别的兼容性。

  • TextMessage -- 一个字符串对象(常见,也最简单)
  • MapMessage -- 一套键值对(传多个信息)
  • ObjectMessage -- 一个序列化的Java对象(传递实现了可序列化接口的类)
  • BytesMessage -- 一个字节的数据流(文件、音频)
  • StreamMessage -- Java原始值的数据流

2.1 JMS消息传递类型

对于消息的传递有两种类型:

  • 一种是点对点模式,即一个生产者和一个消费者一一对应。

    一个或多个生产者把产品放在队列上,(一个)消费者在队列的另一边接收数据

  • 一种是发布/订阅模式,即一个生产者产生的消息进行发送后,可以由多个消费者进行接收。

    一个或多个生产者产生在Topic(主题),所有消费者都能同时(广播)接收到该Topic(主题)

3 ActiveMQ

3.1 安装ActiveMQ

  1. 官网下载ActiveMQ

  2. sftp上传到linux

  3. tar zxvf tar.gz文件(解压缩)

  4. chmod 777 解压后的文件夹 (给该文件夹赋予最高权限,任何用户都可以访问)

  5. cd 文件夹/bin

  6. chmod 755 activemq

  7. ./activemq start 启动

  8. 访问连接 http://IP地址:8161

9.
activemq首页.png

​ 访问红线部分,输入默认账号密码admin

3.2 入门Demo (点对点模式)

  1. 引入依赖

    <!-- 使用哪个版本的activemq就使用哪个版本的client -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.15.10</version>
    </dependency>
    
  2. 编写一个生产者

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 点对点关系
     */
    public class QueueProducer {
    
        public static void main(String[] args) throws JMSException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.234.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session(会话对象)
            /**
             * boolean:是否启动事务,如果为true,代表commit的时候才提交事务
             * int:消息的确认方式 AUTO_ACKNOWLEDGE = 1(自动确认); CLIENT_ACKNOWLEDGE = 2(客户端手动确认);
             *      DUPS_OK_ACKNOWLEDGE = 3(自动批量确认); SESSION_TRANSACTED = 0(事务提交并确认)
             */
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象,传入队列的名字
            Queue queue = session.createQueue("test-queue");
            //6.创建消息生产者对象
            MessageProducer producer = session.createProducer(queue);
            //7.创建消息对象(文本消息)
            TextMessage textMessage = session.createTextMessage("欢迎使用activeMQ");
            //8.生产者发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
    
        }
    }
    
  3. 运行并查看结果

    activemq第一个demo.png
  1. 编写一个消费者

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    public class QueueConsumer {
    
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.234.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象,传入队列的名字
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费者对象
            MessageConsumer consumer = session.createConsumer(queue);
            //7.设置监听,当有消息产生的时候就消费消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("提取的消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入,目的是让消费者程序不终止执行
            System.in.read();
    
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    
  2. 运行后发现消息被消费,以及消费者数+1

    activemq消费者.png
  3. 其他细节

    • 此时消费者由于System.in.read(); 就没有被终止程序,也就是一直处于监听状态
    • 如果此时再次有生产者生产时,处于监听状态的消费者会立马消费
    • 当一个队列有多个消费者时,如果有消息产生,只会有一个消费者得到消息
  4. 点对点小结

    • 当消息只需要消费一次时,可以使用点对点的方式(比如搜索服务)

3.3 发布/订阅模式

  1. 编写生产者

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    /**
     * 发布订阅模式的生产者
     */
    public class TopicProducer {
    
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.234.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象,传入主题的名字
            Topic topic = session.createTopic("test-topic");
            //6.创建消息生产者对象
            MessageProducer producer = session.createProducer(topic);
            //7.创建消息对象(文本消息)
            TextMessage textMessage = session.createTextMessage("欢迎使用activeMQ");
            //8.生产者发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }
    
  2. 运行后查看结果

    activemq发布订阅-生产.png
  3. 编写消费者

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    public class TopicConsumer {
    
        public static void main(String[] args) throws JMSException, IOException {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.234.129:61616");
            //2.创建连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象,传入队列的名字
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费者对象
            MessageConsumer consumer = session.createConsumer(topic);
            //7.设置监听,当有消息产生的时候就消费消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("提取的消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入,目的是让消费者程序不终止执行
            System.in.read();
    
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    
  4. 运行但发现之前发送的主题没有被消费,说明发布/订阅模式下

    • 当有主题发送出来时,就会向在场的消费者进行广播
    • 新进的消费者不能收到以前发布的主题

4 Spring整合JMS

4.1 点对点模式

  • 先做生产者
  1. 导入依赖包

    <spring-version>5.1.5.RELEASE</spring-version>
    
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${spring-version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring-version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring-version}</version>
    </dependency>
    <!--
     这个版本的Spring需要使用JMS 2.0版本,但spring-jms的依赖没有自动导入JMS 2.0
        可以手动除去jms的错误导入避免冲突
    -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.15.10</version>
        <exclusions>
            <exclusion>
                <artifactId>spring-context</artifactId>
                <groupId>org.springframework</groupId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.geronimo.specs</groupId>
                <artifactId>geronimo-jms_1.1_spec</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- 导入正确的jms -->
    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
        <version>2.0.1</version>
    </dependency>
    
  2. 编写配置文件applicationContext-jms.xml(此处先编写生产者的配置文件)

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
        <context:component-scan base-package="spring.activemq.demo"></context:component-scan>
    
        <!-- 配置产生Connection的ConnectionFactory,由对应的JMS服务厂商提供 -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.234.129:61616"/>
        </bean>
    
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
    
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
    
        <!-- 这个是队列目的地,点对点的文本信息 -->
        <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue_text"/>
        </bean>
    
        <!-- 这个是发布/订阅模式的文本信息 -->
        <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="topic_text"/>
        </bean>
    </beans>
    
  3. 编写生产者,里面要写生产的逻辑

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    @Component
    public class QueueProducer {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @Autowired
        private Destination queueTextDestination;
    
        /**
         * 发送文本信息
         * @param text
         */
        public void sendTextMessage(final String text) {
            jmsTemplate.send(queueTextDestination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(text);
                }
            });
        }
    }
    
  4. 编写测试类,测试运行效果

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    import spring.activemq.demo.QueueProducer;
    
    //需要导入配置文件,让spring框架能使用配置文件中的配置
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:applicationContext-jms.xml")
    public class TestQueue {
    
        @Autowired
        private QueueProducer queueProducer;
    
        @Test
        public void testSend() {
            queueProducer.sendTextMessage("spring JMS 点对点");
        }
    }
    
    
  • 编写消费者
  1. 自定义监听方法

    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    public class MyMessageListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收到消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    
    
  2. 修改配置文件,加入消费者

    但要注意加入的消费者在导入配置文件后会自动启动,可以生产者和消费者分成两个不同的配置文件

    这里为了方便就不另建配置文件和测试类

    <!-- 设置监听类 -->
    <bean id="myMessageListener" class="spring.activemq.demo.MyMessageListener"></bean>
    
    <!--
        消息监听器,需要设置connectionFactory 目的类型 监听类
        此处没有id,因为加上该配置后,在启动该配置时候自动装载启动
    -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueTextDestination"/>
        <property name="messageListener" ref="myMessageListener"/>
    </bean>
    
    
  3. 编写测试类测试消费者

    @Test
    public void textGetQueue() {
        try {
            //添加暂停语句,
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    

4.2 发布/订阅模式

  • 编写生产者
  1. 编写配置文件,此处为了方便沿用点对点模式的配置类,但要先注释掉点对点模式的消费者监听器

    (个人认为一个消费者可以类比为一个监听器了)

    <!--<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="queueTextDestination"/>
        <property name="messageListener" ref="myMessageListener"/>
    </bean>-->
    
    
  2. 编写生产者类

    对比于点对点模式的生产者,只是目的类型有变化,可以看出点对点模式和发布/订阅模式的connectionFactory是一样的(至少可以从配置类文件可以看出)

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Component;
    
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    @Component
    public class TopicProducer {
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @Autowired
        private Destination topicTextDestination;
    
        /**
         * 发送文本信息
         * @param text
         */
        public void sendTextMessage(final String text) {
            jmsTemplate.send(topicTextDestination, new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(text);
                }
            });
        }
    }
    
    
  3. 编写测试类

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    import spring.activemq.demo.TopicProducer;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath:applicationContext-jms.xml")
    public class TestTopic {
    
        @Autowired
        private TopicProducer topicProducer;
    
        @Test
        public void testSend() {
            topicProducer.sendTextMessage("spring JMS 发布/订阅");
        }
    }
    
    

    运行后可以通过浏览器查看结果

  • 编写消费者
  1. 修改配置文件,加上消费者的监听器

    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="topicTextDestination"/>
        <property name="messageListener" ref="myMessageListener"/>
    </bean>
    
    

    从配置文件上看,两种模式的不同也只是目标类型上的不同,和生产者的情况有点类似

  2. 添加测试类方法

    @Test
    public void textGetQueue() {
        try {
            //添加暂停语句,
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 226,130评论 6 524
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 97,210评论 3 410
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 173,639评论 0 370
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,755评论 1 304
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,694评论 6 404
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 54,160评论 1 317
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 42,359评论 3 433
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 41,436评论 0 282
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,990评论 1 328
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,981评论 3 351
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 42,089评论 1 359
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,669评论 5 353
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 43,363评论 3 342
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,762评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,950评论 1 278
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,689评论 3 384
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 47,124评论 2 368

推荐阅读更多精彩内容