JMS是什么
JMS是JavaEE其中的一个模块
JavaEE是一套使用Java进行企业级应用开发,大家一致遵循的13个核心规范工业标准。JavaEE平台提供了一个机遇组件的方法来加快设计、开发、装配以及部署企业应用程序。
1、JDBC(数据库连接)
2、JNDI(Java的命名和目录接口)
3、EJB(快被spring全家桶干掉了)
4、RMI(远程方法调用)
5、Java IDL/CORBA(接口定义语言/公用对象请求代理程序体系结构)
6、JSP
7、Servlet
8、XML
9、JMS(Java消息服务)
10、JTA(Java事务的API)
11、JTS(Java事务服务)
12、JavaMail(不了解)
13、JAF(不了解)
JMS(Java消息服务)是两个应用程序之间进行异步通讯的API,它为标准协议和消息服务提供了一组通用接口(创建、发送、读取消息等等),用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行实时通信时,他们之间并不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦、削峰、异步的效果。
JMS的组成结构和特点
1、JMS provider:具体的中间件(activeMQ、kafka、rocketMQ等等MQ)
2、JMS producer:生产者
3、JMS consumer:消费者
4、JMS message:消息(消息头、消息属性、消息体)
重点说一下JMS message
消息分为:消息头、消息体、消息属性
消息头(常用的属性)
JMSDestination:目的地一般指的是队列或主题
JMSDeliveryMode:持久或非持久模式
JMSExpiration:消息过期时间(默认永不过期)
JMSPriority:优先级
JMSMessageID:消息的唯一标识
在消息头上设置一些属性:过期时间、目的地、是否重发、时间戳等;通过send方法发送消息(send里面还可以设置目的地、消息、持久化、过期时间等)
持久和非持久模式
持久:传递一条消息,无论JMS提供者(消息服务)是否出现故障,该条消息不会丢失,他在服务器恢复后会再次传递。
非持久:消息最多会传递一次,一旦服务器出现故障,该条消息永远丢失。优先级:JMS优先级划分为10个等级,0到9,默认等级为4。
JMS不要求MQ严格按照这10个优先级发送消息,但必须保证加急的消息要先与普通级的消息有限到达。(大白话:三条消息4、8、9,加急的消息8、9可能8的先到达,但无论是8还是9都会比4先到达)
消息体(前两种常用)
TextMessage:普通字符串消息
MapMessage:Map类型消息,key为String
BytesMessage:二进制数组
StreamMessage:流
ObectMessage:祖宗,可序列化
消息属性(消息体的增强)
如果需要除消息头字段以外的值,那么可以设置消息属性
有识别、去重、重点标注的用途
代码演示消息头、消息属性
主题消费者
public class JmsConsumer {
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列还是主题)
// destination目的地(queue队列、topic主题)
//Destination destination = session.createQueue(QUEUE_NAME);
Topic topic = session.createTopic(TOPIC_NAME);
// 6、创建消息消费者
MessageConsumer consumer = session.createConsumer(topic);
// 7、通过监听的方式来消费消息
consumer.setMessageListener(message -> {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("监听消费者消费消息" + textMessage.getText());
if(textMessage.getStringProperty("vip") != null){
// 打印消息体属性
System.out.println("======" + textMessage.getStringProperty("vip"));
}
} catch (JMSException e) {
e.printStackTrace();
}
}
// 接收MapMessage
if (null != message && message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("监听消费者消费消息" + mapMessage.getString("key"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 8、保持控制台不灭
System.in.read();
// 9、关闭资源
consumer.close();
session.close();
connection.close();
}
}
主题生产者
public class JmsProduce {
//为什么是tcp,看源码!
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列还是主题)
// destination目的地(queue队列、topic主题)
//Destination destination = session.createQueue(QUEUE_NAME);
Topic topic = session.createTopic(TOPIC_NAME);
// 6、创建消息生产者
MessageProducer producer = session.createProducer(topic);
// 7、通过使用producer产生三条消息发送到队列里面
for(int i = 0;i<3;i++){
// 逐一创建消息
TextMessage textMessage = session.createTextMessage("String类型的msg------------" + i);
// 为textMessage消息体设置属性
if(i == 1){
textMessage.setStringProperty("vip","我是vip");
}
// 通过producer发送给mq
producer.send(textMessage);
// 创建MapMessage
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("key","Map类型的msg-----"+i);
producer.send(mapMessage);
}
// 8、关闭资源
producer.close();
session.close();
connection.close();
System.out.println("主题生产完成!");
}
}
JMS可靠性
- persistent:持久性
- 事务
- Acknowledge:签收
JMS可靠性--持久性
非持久化设置
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
服务器宕机,消息丢失
测试方法(队列):1、设置非持久化。2、启动生产者。3、观察AMQ页面。4、执行./activemq stop(停止AMQ服务)。5、执行./activemq start,启动成功后再观察AMQ页面。
执行结果:3030变为0000
持久化设置
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
服务器宕机,消息存在
测试方法(队列):同上
执行结果:3030变为3000,启动消费者后可以被正常消费
默认持久模式
持久化topci
与队列持久化方式不同,主题需要消费者持久化订阅,类似于微信公众号,只有关注了公众号,公众号才会给你发消息
消费者订阅
public class JmsConsumer {
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
public static final String TOPIC_NAME = "Topic";
public static void main(String[] args) throws JMSException, IOException {
// 描述:张三(z3)订阅名为TOPIC_NAME的主题
System.out.println("我是消费者张三");
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、设置订阅ID
connection.setClientID("z3");
// 4、创建会话session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(主题)
Topic topic = session.createTopic(TOPIC_NAME);
// 6、创建持久化订阅者
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "备注");
// 7、启动连接
connection.start();
Message message = topicSubscriber.receive();
while(message != null){
TextMessage textMessage = (TextMessage) message;
System.out.println("================收到持久化主题"+textMessage.getText());
// 轮询接收消息receive(long timeout)
message = topicSubscriber.receive();
}
// 8、关闭资源
session.close();
connection.close();
}
}
发布持久化主题
public class JmsProduce {
//为什么是tcp,看源码!
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
public static final String TOPIC_NAME = "Topic";
public static void main(String[] args) throws JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、创建会话session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建接收的对象(主题)
Topic topic = session.createTopic(TOPIC_NAME);
// 5、创建消息生产者
MessageProducer producer = session.createProducer(topic);
// 6、设置主题持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 7、启动连接
connection.start();
// 8、通过使用producer产生三条消息发送到队列里面
for (int i = 0; i < 3; i++) {
// 逐一创建消息
TextMessage textMessage = session.createTextMessage("String类型的msg------------" + i);
// 通过producer发送给mq
producer.send(textMessage);
}
// 9、关闭资源
producer.close();
session.close();
connection.close();
System.out.println("主题持久化完成!");
}
}
AMQ页面
更改订阅中message = topicSubscriber.receive(1000L);重新运行会发现。
思考一下,消费者进入离线状态,生产者再发布新消息后,启动该消费者,消息会不会被消费?
总结
- 一定要先运行一次消费者,将消费者向MQ中注册,类似于订阅了这个主题。
- 然后在运行生产者发送信息。
- 只要消费者订阅该主题,无论下次消费者是否在线,都会受到消息。离线的消费者只要上线,会将没收到的消息都接收下来。
JMS可靠性--事务(更注重于生产者)
事务:一组增删改从开始到结束
特性:ACID
原子性(atomicity)
一致性(consistency)
隔离性(isolation)
持久性(durability)
这里面写过
如果生产者设置事务,在session关闭之前要commit
public class QueueProducer {
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
// 设置开启事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列)
Queue queue = session.createQueue(QUEUE_NAME);
// 6、创建消息生产者
MessageProducer producer = session.createProducer(queue);
// 7、通过使用producer产生三条消息发送到队列里面
for(int i = 0;i<3;i++){
// 逐一创建消息
TextMessage textMessage = session.createTextMessage("msg------------" + i);
// 通过producer发送给mq
producer.send(textMessage);
}
// 8、事务提交
// session.commit();
// 9、关闭资源(先进后出,同jdbc)
producer.close();
session.close();
connection.close();
}
}
运行之后
就是因为没有session.commit();才会0000。
参照JDBC改造上面代码
public class QueueProducer {
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
// 设置开启事务
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列)
Queue queue = session.createQueue(QUEUE_NAME);
// 6、创建消息生产者
MessageProducer producer = session.createProducer(queue);
// 7、通过使用producer产生三条消息发送到队列里面
for (int i = 0; i < 3; i++) {
// 逐一创建消息
TextMessage textMessage = session.createTextMessage("msg------------" + i);
// 通过producer发送给mq
producer.send(textMessage);
}
// 8、事务提交
try {
session.commit();
} catch (Exception e) {
e.printStackTrace();
session.rollback();
} finally {
// 9、关闭资源(先进后出,同jdbc)
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
}
消费者开启事务
public class QueueConsumer {
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 两个参数,事务、签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 5、创建接收的对象(队列还是主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 6、创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(message -> {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("监听消费者消费消息" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 7、保持控制台不灭
System.in.read();
// 8、提交
session.commit();
// 9、关闭资源
consumer.close();
session.close();
connection.close();
}
}
JMS可靠性--签收(更注重于消费者)
非事务
自动签收(前面有例子)。Session.AUTO_ACKNOWLEDGE
手动签收(设置手动签收后,消息体接收到消息,必须设置acknowledge()代表已签收)。Session.CLIENT_ACKNOWLEDGE
重复签收(不会,没用过)
消费者非事务手动签收
public class QueueConsumer {
public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.106:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
// 1、创建连接工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
// 2、通过连接工厂,获取连接connection
Connection connection = activeMQConnectionFactory.createConnection();
// 3、启动连接
connection.start();
// 4、创建会话session
// 非事务、手动签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 5、创建接收的对象(队列还是主题)
Queue queue = session.createQueue(QUEUE_NAME);
// 6、创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(message -> {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("监听消费者消费消息" + textMessage.getText());
// 消息签收(必须签收,否则会造成重复消费)
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 7、保持控制台不灭
System.in.read();
// 8、提交
// session.commit();
// 9、关闭资源
consumer.close();
session.close();
connection.close();
}
}