5、JMS

JMS是什么

JMS是JavaEE其中的一个模块

JavaEE是一套使用Java进行企业级应用开发,大家一致遵循的13个核心规范工业标准。JavaEE平台提供了一个机遇组件的方法来加快设计、开发、装配以及部署企业应用程序。
1、JDBC(数据库连接)
2、JNDI(Java的命名和目录接口)
3、EJB(快被spring全家桶干掉了)
4、RMI(远程方法调用)
5、Java IDL/CORBA(接口定义语言/公用对象请求代理程序体系结构)
6、JSP
7、Servlet
8、XML
9、JMS(Java消息服务)
10、JTA(Java事务的API)
11、JTS(Java事务服务)
12、JavaMail(不了解)
13、JAF(不了解)

JMS(Java消息服务)是两个应用程序之间进行异步通讯的API,它为标准协议和消息服务提供了一组通用接口(创建、发送、读取消息等等),用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行实时通信时,他们之间并不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦、削峰、异步的效果。


image.png

JMS的组成结构和特点

1、JMS provider:具体的中间件(activeMQ、kafka、rocketMQ等等MQ)
2、JMS producer:生产者
3、JMS consumer:消费者
4、JMS message:消息(消息头、消息属性、消息体)


重点说一下JMS message

消息分为:消息头、消息体、消息属性

消息头(常用的属性)
JMSDestination:目的地一般指的是队列或主题
JMSDeliveryMode:持久或非持久模式
JMSExpiration:消息过期时间(默认永不过期)
JMSPriority:优先级
JMSMessageID:消息的唯一标识
在消息头上设置一些属性:过期时间、目的地、是否重发、时间戳等;通过send方法发送消息(send里面还可以设置目的地、消息、持久化、过期时间等)

  • 持久和非持久模式
    持久:传递一条消息,无论JMS提供者(消息服务)是否出现故障,该条消息不会丢失,他在服务器恢复后会再次传递。
    非持久:消息最多会传递一次,一旦服务器出现故障,该条消息永远丢失。

  • 优先级:JMS优先级划分为10个等级,0到9,默认等级为4。
    JMS不要求MQ严格按照这10个优先级发送消息,但必须保证加急的消息要先与普通级的消息有限到达。(大白话:三条消息4、8、9,加急的消息8、9可能8的先到达,但无论是8还是9都会比4先到达)

消息体(前两种常用)
TextMessage:普通字符串消息
MapMessage:Map类型消息,key为String
BytesMessage:二进制数组
StreamMessage:流
ObectMessage:祖宗,可序列化

消息属性(消息体的增强)
如果需要除消息头字段以外的值,那么可以设置消息属性
有识别、去重、重点标注的用途

代码演示消息头、消息属性

主题消费者

public class JmsConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException, IOException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        // destination目的地(queue队列、topic主题)
        //Destination destination = session.createQueue(QUEUE_NAME);
        Topic topic = session.createTopic(TOPIC_NAME);
        // 6、创建消息消费者
        MessageConsumer consumer = session.createConsumer(topic);
        // 7、通过监听的方式来消费消息
        consumer.setMessageListener(message -> {
            if (null != message && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听消费者消费消息" + textMessage.getText());
                    if(textMessage.getStringProperty("vip") != null){
                        // 打印消息体属性
                        System.out.println("======" + textMessage.getStringProperty("vip"));
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            // 接收MapMessage
            if (null != message && message instanceof MapMessage) {
                MapMessage mapMessage = (MapMessage) message;
                try {
                    System.out.println("监听消费者消费消息" + mapMessage.getString("key"));
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 8、保持控制台不灭
        System.in.read();
        // 9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

主题生产者

public class JmsProduce {
    //为什么是tcp,看源码!
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String TOPIC_NAME = "topic01";
    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        // destination目的地(queue队列、topic主题)
        //Destination destination = session.createQueue(QUEUE_NAME);
        Topic topic = session.createTopic(TOPIC_NAME);
        // 6、创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        // 7、通过使用producer产生三条消息发送到队列里面
        for(int i = 0;i<3;i++){
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("String类型的msg------------" + i);
            // 为textMessage消息体设置属性
            if(i == 1){
                textMessage.setStringProperty("vip","我是vip");
            }

            // 通过producer发送给mq
            producer.send(textMessage);



            // 创建MapMessage
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("key","Map类型的msg-----"+i);
            producer.send(mapMessage);
        }
        // 8、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("主题生产完成!");
    }
}
image.png

JMS可靠性

  • persistent:持久性
  • 事务
  • Acknowledge:签收

JMS可靠性--持久性

非持久化设置
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
服务器宕机,消息丢失
测试方法(队列):1、设置非持久化。2、启动生产者。3、观察AMQ页面。4、执行./activemq stop(停止AMQ服务)。5、执行./activemq start,启动成功后再观察AMQ页面。

执行结果:3030变为0000

持久化设置
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
服务器宕机,消息存在
测试方法(队列):同上

执行结果:3030变为3000,启动消费者后可以被正常消费

默认持久模式


持久化topci

与队列持久化方式不同,主题需要消费者持久化订阅,类似于微信公众号,只有关注了公众号,公众号才会给你发消息
消费者订阅

public class JmsConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String TOPIC_NAME = "Topic";

    public static void main(String[] args) throws JMSException, IOException {
        // 描述:张三(z3)订阅名为TOPIC_NAME的主题
        System.out.println("我是消费者张三");
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、设置订阅ID
        connection.setClientID("z3");
        // 4、创建会话session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        // 6、创建持久化订阅者
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "备注");
        // 7、启动连接
        connection.start();
        Message message = topicSubscriber.receive();

        while(message != null){
            TextMessage textMessage = (TextMessage) message;
            System.out.println("================收到持久化主题"+textMessage.getText());
            // 轮询接收消息receive(long timeout)
            message = topicSubscriber.receive();
        }
        // 8、关闭资源
        session.close();
        connection.close();
    }
}

发布持久化主题

public class JmsProduce {
    //为什么是tcp,看源码!
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String TOPIC_NAME = "Topic";

    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、创建会话session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4、创建接收的对象(主题)
        Topic topic = session.createTopic(TOPIC_NAME);
        // 5、创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        // 6、设置主题持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        // 7、启动连接
        connection.start();
        // 8、通过使用producer产生三条消息发送到队列里面
        for (int i = 0; i < 3; i++) {
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("String类型的msg------------" + i);

            // 通过producer发送给mq
            producer.send(textMessage);

        }
        // 9、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("主题持久化完成!");
    }
}

AMQ页面

image.png

更改订阅中message = topicSubscriber.receive(1000L);重新运行会发现。
image.png

image.png

思考一下,消费者进入离线状态,生产者再发布新消息后,启动该消费者,消息会不会被消费?

总结

  • 一定要先运行一次消费者,将消费者向MQ中注册,类似于订阅了这个主题。
  • 然后在运行生产者发送信息。
  • 只要消费者订阅该主题,无论下次消费者是否在线,都会受到消息。离线的消费者只要上线,会将没收到的消息都接收下来。

JMS可靠性--事务(更注重于生产者)

事务:一组增删改从开始到结束
特性:ACID
原子性(atomicity)
一致性(consistency)
隔离性(isolation)
持久性(durability)
这里面写过

如果生产者设置事务,在session关闭之前要commit

public class QueueProducer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        // 设置开启事务
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        // 7、通过使用producer产生三条消息发送到队列里面
        for(int i = 0;i<3;i++){
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("msg------------" + i);
            // 通过producer发送给mq
            producer.send(textMessage);
        }
        // 8、事务提交
//        session.commit();
        // 9、关闭资源(先进后出,同jdbc)
        producer.close();
        session.close();
        connection.close();
    }
}

运行之后


image.png

就是因为没有session.commit();才会0000。

参照JDBC改造上面代码

public class QueueProducer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        // 设置开启事务
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        // 7、通过使用producer产生三条消息发送到队列里面
        for (int i = 0; i < 3; i++) {
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("msg------------" + i);
            // 通过producer发送给mq
            producer.send(textMessage);
        }
        // 8、事务提交
        try {
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
            session.rollback();
        } finally {
            // 9、关闭资源(先进后出,同jdbc)
            if (producer != null) {
                producer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

消费者开启事务

public class QueueConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息消费者
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(message -> {
            if (null != message && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听消费者消费消息" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 7、保持控制台不灭
        System.in.read();
        // 8、提交
        session.commit();
        // 9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

JMS可靠性--签收(更注重于消费者)

非事务
自动签收(前面有例子)。Session.AUTO_ACKNOWLEDGE
手动签收(设置手动签收后,消息体接收到消息,必须设置acknowledge()代表已签收)。Session.CLIENT_ACKNOWLEDGE
重复签收(不会,没用过)

消费者非事务手动签收

public class QueueConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 非事务、手动签收
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息消费者
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(message -> {
            if (null != message && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听消费者消费消息" + textMessage.getText());
                    // 消息签收(必须签收,否则会造成重复消费)
                    textMessage.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 7、保持控制台不灭
        System.in.read();
        // 8、提交
//        session.commit();
        // 9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容

  • 理论上来说,我们需要保证消息中间件上的消息,只有被消费者确认过以后才会被签收,相当于我们寄一个快递出去,收件人没有...
    Mrsunup阅读 816评论 0 0
  • JMS(Java Message Service) API规范 模式点对点发布订阅 点对点 组成消息队列(Queu...
    宠辱不惊的咸鱼阅读 295评论 0 0
  • 一、 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能...
    步积阅读 56,937评论 10 138
  • 一、JMS简介 1.1 JMS是什么 JMS Java Message Service Java消息服务。是JAV...
    这一刻_776b阅读 616评论 0 0
  • 这些日子是不是过的有点丧? 自己手头上事儿越积越多,每天都过得麻木辛苦又孤独。 别太难过了,其实有很多美好的事情,...
    94美酱阅读 256评论 0 0