5、JMS

JMS是什么

JMS是JavaEE其中的一个模块

JavaEE是一套使用Java进行企业级应用开发,大家一致遵循的13个核心规范工业标准。JavaEE平台提供了一个机遇组件的方法来加快设计、开发、装配以及部署企业应用程序。
1、JDBC(数据库连接)
2、JNDI(Java的命名和目录接口)
3、EJB(快被spring全家桶干掉了)
4、RMI(远程方法调用)
5、Java IDL/CORBA(接口定义语言/公用对象请求代理程序体系结构)
6、JSP
7、Servlet
8、XML
9、JMS(Java消息服务)
10、JTA(Java事务的API)
11、JTS(Java事务服务)
12、JavaMail(不了解)
13、JAF(不了解)

JMS(Java消息服务)是两个应用程序之间进行异步通讯的API,它为标准协议和消息服务提供了一组通用接口(创建、发送、读取消息等等),用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行实时通信时,他们之间并不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦、削峰、异步的效果。


image.png

JMS的组成结构和特点

1、JMS provider:具体的中间件(activeMQ、kafka、rocketMQ等等MQ)
2、JMS producer:生产者
3、JMS consumer:消费者
4、JMS message:消息(消息头、消息属性、消息体)


重点说一下JMS message

消息分为:消息头、消息体、消息属性

消息头(常用的属性)
JMSDestination:目的地一般指的是队列或主题
JMSDeliveryMode:持久或非持久模式
JMSExpiration:消息过期时间(默认永不过期)
JMSPriority:优先级
JMSMessageID:消息的唯一标识
在消息头上设置一些属性:过期时间、目的地、是否重发、时间戳等;通过send方法发送消息(send里面还可以设置目的地、消息、持久化、过期时间等)

  • 持久和非持久模式
    持久:传递一条消息,无论JMS提供者(消息服务)是否出现故障,该条消息不会丢失,他在服务器恢复后会再次传递。
    非持久:消息最多会传递一次,一旦服务器出现故障,该条消息永远丢失。

  • 优先级:JMS优先级划分为10个等级,0到9,默认等级为4。
    JMS不要求MQ严格按照这10个优先级发送消息,但必须保证加急的消息要先与普通级的消息有限到达。(大白话:三条消息4、8、9,加急的消息8、9可能8的先到达,但无论是8还是9都会比4先到达)

消息体(前两种常用)
TextMessage:普通字符串消息
MapMessage:Map类型消息,key为String
BytesMessage:二进制数组
StreamMessage:流
ObectMessage:祖宗,可序列化

消息属性(消息体的增强)
如果需要除消息头字段以外的值,那么可以设置消息属性
有识别、去重、重点标注的用途

代码演示消息头、消息属性

主题消费者

public class JmsConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException, IOException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        // destination目的地(queue队列、topic主题)
        //Destination destination = session.createQueue(QUEUE_NAME);
        Topic topic = session.createTopic(TOPIC_NAME);
        // 6、创建消息消费者
        MessageConsumer consumer = session.createConsumer(topic);
        // 7、通过监听的方式来消费消息
        consumer.setMessageListener(message -> {
            if (null != message && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听消费者消费消息" + textMessage.getText());
                    if(textMessage.getStringProperty("vip") != null){
                        // 打印消息体属性
                        System.out.println("======" + textMessage.getStringProperty("vip"));
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            // 接收MapMessage
            if (null != message && message instanceof MapMessage) {
                MapMessage mapMessage = (MapMessage) message;
                try {
                    System.out.println("监听消费者消费消息" + mapMessage.getString("key"));
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 8、保持控制台不灭
        System.in.read();
        // 9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

主题生产者

public class JmsProduce {
    //为什么是tcp,看源码!
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String TOPIC_NAME = "topic01";
    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        // destination目的地(queue队列、topic主题)
        //Destination destination = session.createQueue(QUEUE_NAME);
        Topic topic = session.createTopic(TOPIC_NAME);
        // 6、创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        // 7、通过使用producer产生三条消息发送到队列里面
        for(int i = 0;i<3;i++){
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("String类型的msg------------" + i);
            // 为textMessage消息体设置属性
            if(i == 1){
                textMessage.setStringProperty("vip","我是vip");
            }

            // 通过producer发送给mq
            producer.send(textMessage);



            // 创建MapMessage
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("key","Map类型的msg-----"+i);
            producer.send(mapMessage);
        }
        // 8、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("主题生产完成!");
    }
}
image.png

JMS可靠性

  • persistent:持久性
  • 事务
  • Acknowledge:签收

JMS可靠性--持久性

非持久化设置
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
服务器宕机,消息丢失
测试方法(队列):1、设置非持久化。2、启动生产者。3、观察AMQ页面。4、执行./activemq stop(停止AMQ服务)。5、执行./activemq start,启动成功后再观察AMQ页面。

执行结果:3030变为0000

持久化设置
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
服务器宕机,消息存在
测试方法(队列):同上

执行结果:3030变为3000,启动消费者后可以被正常消费

默认持久模式


持久化topci

与队列持久化方式不同,主题需要消费者持久化订阅,类似于微信公众号,只有关注了公众号,公众号才会给你发消息
消费者订阅

public class JmsConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String TOPIC_NAME = "Topic";

    public static void main(String[] args) throws JMSException, IOException {
        // 描述:张三(z3)订阅名为TOPIC_NAME的主题
        System.out.println("我是消费者张三");
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、设置订阅ID
        connection.setClientID("z3");
        // 4、创建会话session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        // 6、创建持久化订阅者
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "备注");
        // 7、启动连接
        connection.start();
        Message message = topicSubscriber.receive();

        while(message != null){
            TextMessage textMessage = (TextMessage) message;
            System.out.println("================收到持久化主题"+textMessage.getText());
            // 轮询接收消息receive(long timeout)
            message = topicSubscriber.receive();
        }
        // 8、关闭资源
        session.close();
        connection.close();
    }
}

发布持久化主题

public class JmsProduce {
    //为什么是tcp,看源码!
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String TOPIC_NAME = "Topic";

    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、创建会话session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4、创建接收的对象(主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        // 5、创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        // 6、设置主题持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 7、启动连接
        connection.start();
        // 8、通过使用producer产生三条消息发送到队列里面
        for (int i = 0; i < 3; i++) {
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("String类型的msg------------" + i);

            // 通过producer发送给mq
            producer.send(textMessage);

        }
        // 9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("主题持久化完成!");
    }
}

AMQ页面

image.png

更改订阅中message = topicSubscriber.receive(1000L);重新运行会发现。
image.png

image.png

思考一下,消费者进入离线状态,生产者再发布新消息后,启动该消费者,消息会不会被消费?

总结

  • 一定要先运行一次消费者,将消费者向MQ中注册,类似于订阅了这个主题。
  • 然后在运行生产者发送信息。
  • 只要消费者订阅该主题,无论下次消费者是否在线,都会受到消息。离线的消费者只要上线,会将没收到的消息都接收下来。

JMS可靠性--事务(更注重于生产者)

事务:一组增删改从开始到结束
特性:ACID
原子性(atomicity)
一致性(consistency)
隔离性(isolation)
持久性(durability)
这里面写过

如果生产者设置事务,在session关闭之前要commit

public class QueueProducer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        // 设置开启事务
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        // 7、通过使用producer产生三条消息发送到队列里面
        for(int i = 0;i<3;i++){
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("msg------------" + i);
            // 通过producer发送给mq
            producer.send(textMessage);
        }
        // 8、事务提交
//        session.commit();
        // 9、关闭资源(先进后出,同jdbc)
        producer.close();
        session.close();
        connection.close();
    }
}

运行之后


image.png

就是因为没有session.commit();才会0000。

参照JDBC改造上面代码

public class QueueProducer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        // 设置开启事务
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        // 7、通过使用producer产生三条消息发送到队列里面
        for (int i = 0; i < 3; i++) {
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("msg------------" + i);
            // 通过producer发送给mq
            producer.send(textMessage);
        }
        // 8、事务提交
        try {
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
            session.rollback();
        } finally {
            // 9、关闭资源(先进后出,同jdbc)
            if (producer != null) {
                producer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

消费者开启事务

public class QueueConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息消费者
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(message -> {
            if (null != message && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听消费者消费消息" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 7、保持控制台不灭
        System.in.read();
        // 8、提交
        session.commit();
        // 9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

JMS可靠性--签收(更注重于消费者)

非事务
自动签收(前面有例子)。Session.AUTO_ACKNOWLEDGE
手动签收(设置手动签收后,消息体接收到消息,必须设置acknowledge()代表已签收)。Session.CLIENT_ACKNOWLEDGE
重复签收(不会,没用过)

消费者非事务手动签收

public class QueueConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 非事务、手动签收
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息消费者
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(message -> {
            if (null != message && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听消费者消费消息" + textMessage.getText());
                    // 消息签收(必须签收,否则会造成重复消费)
                    textMessage.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 7、保持控制台不灭
        System.in.read();
        // 8、提交
//        session.commit();
        // 9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 理论上来说,我们需要保证消息中间件上的消息,只有被消费者确认过以后才会被签收,相当于我们寄一个快递出去,收件人没有...
    Mrsunup阅读 4,249评论 0 0
  • JMS(Java Message Service) API规范 模式点对点发布订阅 点对点 组成消息队列(Queu...
    宠辱不惊的咸鱼阅读 2,662评论 0 0
  • 一、 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能...
    步积阅读 57,377评论 10 138
  • 一、JMS简介 1.1 JMS是什么 JMS Java Message Service Java消息服务。是JAV...
    这一刻_776b阅读 4,001评论 0 0
  • 这些日子是不是过的有点丧? 自己手头上事儿越积越多,每天都过得麻木辛苦又孤独。 别太难过了,其实有很多美好的事情,...
    94美酱阅读 2,166评论 0 0

友情链接更多精彩内容