一. 引言
ActiveMQ扩展出:
API 接受发送
MQ 的高可用
MQ 的集群容错配置
MQ 的持久化
延时发送
签收机制
Spring/SpringBoot 整合
1、为什么使用MQ?
解决了耦合调用、异步模型、抵御洪峰流量,保护了主业务,削峰。解耦,削峰,异步!!!
如下的case,没有消息中间件之前,A、B、C、D四个人来问问题,老师一次只能处理一个人的问题,处理完A之后,才能处理B,如果每个人五分钟,那么D就要等待15分钟。添加消息中间件之后,要求学生按照老师的要求将问题都写在一张纸上,先交给班长,班长再交给老师,降低了老师和学生之间的耦合度(解耦)、本来冲向老师的流量都冲向了班长,保护了主业务(削峰)、班长收集好学生的问题之后,待老师处理完之后再送给学生返回(异步)。
2、微服务架构后会有哪些问题?
微服务架构后,链式调用是我们在写程序时候的一般流程,为了完成一个整体功能会将其拆分为多个函数(或子模块),比如模块A调用模块B,模块B调用模块C,模块C调用模块D。但是在大型分布式应用中,系统间的RPC交互繁杂,一个功能背后调用上百个接口并非不可能,从单机架构过度到分布式微服务架构的通例。这种架构会出现哪种问题???
- 1、系统之间耦合比较严重
- 2、面对大流量并发时,容易被冲垮
- 3、等待同步存在性能问题
问题一:每新增一个下游功能,都要对上游的相关接口进行改造:(不行!我作为厨师做了一个宫保鸡丁,我管你大堂经理给谁呢)
举个例子:加入系统A要发送数据给系统B和C,发送给每个系统的数据可能有差异,因此系统A对要发送给每个系统的数据进行了组装,然后逐一发送。当代码上线后又新增了一个需求:把数据也发送给D,新上了一个D系统也要接收A系统的数据。此时就需要修改A系统,让他感知到D的存在,同时把数据处理好再给D。在这个过程会看到,每接入一个下游系统,都要对A系统进行代码改造,开发联调的效率很低。整体架构图如下:
问题二:每个接口的吞吐能力是有限的,这个上限能力如果是堤坝,当大流量来临时,容易被冲垮。举个栗子秒杀业务:上游系统发起下单购买操作,我就是一下单操作,下游系统完成秒杀业务逻辑:(读取订单 > 库存检查 > 库存冻结 > 余额检查 > 余额冻结 > 订单生成 > 余额扣减 > 库存扣减 > 生成流水 > 余额解冻 > 库存解冻)
问题三:RPC接口基本上是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于链路种最慢的那个接口。比如A调用B/C/D都是50ms,但此时B又调用了B1,花费2000ms,那么直接就拖累了整个服务性能。
根据上述的几个问题,在设计系统时可以明确要达到的目标:
1、要做到系统解耦,当新的模块接进来,可以做到代码改动最小;能够解耦。
2、设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮;能够削峰。
3、强弱以来梳理能将非关键词调用链路的操作异步化提升整体系统的吞吐能力;能够异步。
3、ActiveMQ的定义
面向消息的中间件(message-oriented-middleware)MOM能够很好的解决以上问题。是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布系统的集成,通过提供消息传递和消息排队模型分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等功能。
大致过程如下:
发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给接收者,在这个过程中,发送和接受是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系。尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。
4、ActiveMQ的特点
一、采用异步处理模式
消息发送者可以发送一个消息而无需等待响应,消息发送者将消息发送到一条虚拟的通道(主题或队列)上;消息接收者则订阅或监听该通道。一条消息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出同步回应。整个过程都是异步的。
也就是说,一个系统跟另外一个系统之间进行通信的时候,假如希望系统A发送一个消息给系统B,让他处理。但是系统A不关注系统B到底怎么处理或者有没有处理好,所以系统A把消息发送给MQ,然后就不管这条消息的“死活”了,接着系统B从MQ里消费出来处理即可。至于怎么处理,是否处理完毕,什么时候处理,都是系统B的事儿,与系统A无关。
二、应用系统之间的解耦
发送者和接受者不必了解对方,只需要确认消息;
发送者和接受者不必同时在线。
二、ActiveMQ的安装
ActiveMQ 的官网 : http://activemq.apache.org
Linux三种查看后台进程的方法:
ps -ef|grep activemq|grep -v grep // grep -v grep 可以不让显示grep 本来的信息
netstat -anp|grep 61616 // activemq 的默认后台端口是61616
lsof -i:61616
安装步骤:
2.1官网下载
2.2/opt目录下面
2.3解压缩apache-activemq-5.15.9-bin.tar.gz
2.4在根目录下mkdir /myactiveMQ
2.5cp -r apache-activemq-5.15.9 /myactiveMQ/
2.6普通启动mq:./activemq start/stop
2.7activemq的默认进程端口是61616
2.8带运行日志的启动方式 ./activemq start > /myactivemq/myrunmq.log
2.9安装成功后访问:192.168.x.x:8161抵达ActiveMQ前台显示页面
安装过程中遇到的问题见:https://www.jianshu.com/p/a52c39859808
以上两个Destination
在点对点的消息传递时,目的地称为 队列 queue
在发布订阅消息传递中,目的地称为 主题 topic
三、ActiveMQ的初步使用
1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--消息队列连接池-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.16.1</version>
</dependency>
1、创建生产者
public class producer {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.115:61616";
public static final String QUEUE_NAME = "queue01"; // 1对1 的队列
public static void main(String[] args) throws JMSException {
// 1 按照给定的url创建连接工程,这个构造器采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂连接 connection 和 启动
Connection connection = activeMQConnectionFactory.createConnection();
// 启动
connection.start();
// 3 创建会话 session
// 两个参数,第一个事务, 第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地 (两种 : 队列/主题 这里用队列)
Queue queue = session.createQueue(QUEUE_NAME);
// 5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("MSG" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("消息发布到MQ完成");
}
}
public class consumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.115:61616";
public static final String QUEUE_NAME = "queue01"; // 1对1 的队列
public static void main(String[] args) throws JMSException {
// 1 按照给定的url创建连接工程,这个构造器采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂连接 connection 和 启动
Connection connection = activeMQConnectionFactory.createConnection();
// 启动
connection.start();
// 3 创建会话 session
// 两个参数,第一个事务, 第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地 (两种 : 队列/主题 这里用队列)
Queue queue = session.createQueue(QUEUE_NAME);
// 5 创建消息的生产者
MessageConsumer messageConsumer = session.createConsumer(queue);
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if (null != textMessage) {
System.out.println("消费者接受到消息" + textMessage.getText());
} else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
System.out.println("消息发布到MQ完成");
}
}
加了时间就是过时不候,不加就是不离不弃,NoWait盲猜是只限此时,如果取不到消息,直接返回null。
//TODO消费者消费成功前台页面显示
这里的一点经验: activemq 好像自带负载均衡,当先启动两个队列(Queue)的消费者时,在启动生产者发出消息,此时的消息平均的被两个消费者消费。 并且消费者不会消费已经被消费的消息(即为已经出队的消息)
四、JMS
1、JMS基本信息
Topic队列模式:
(1)生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
(2)生产者和消费者之间有时间上的相关性,订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
(3)生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求,持久订阅允许消费者消费它在未处于激活状态时发送的消息,一句话,好比我们的微信公众号订阅。
2、什么是javaEE?
JAVAEE 是一套使用Java 进行企业级开发的13 个核心规范工业标准 , 包括:
(1)JDBC(Java Database) 数据库连接
(2)JNDI(Java Naming and Directory Interfaces)Java的命名和目录接口
(3)EJB(Enterprise JavaBean)
(4)RMI(Remote Method Invoke)远程方法调用 一般使用TCP/IP 协议
(5)Java IDL(interface Description Language) 接口定义语言
(6)JSP (Java Server Pages)
(7)Servlet
(8)XML (Extensible Markup Language)可扩展标记语言
(9)JMS (Java Message Service)Java 消息服务
(10)JTA (Java Transaction API)Java 事务API
(11)JTS (Java Transaction Service)Java 事务服务
(12)JavaMail
(13)JAF (Java Bean Activation Framework)
3、什么是JMS(Java消息服务):
Java消息服务指的是两个用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发,在javaEE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务器组件关联起来以达到解耦、异步、削峰的目的。
4、JMS组成的四大元素
JMS provider:实现JMS 的消息中间件,也就是MQ服务器
JMS producer:消息生产者,创建和发送消息的客户端
JMS consumer:消息消费者,接收和处理消息的客户端
JMS message:JMS 消息,分为消息头、消息属性、消息体(重要的)
4.1、消息头( 5 个常用的消息头):
① JMSDestination:设置是队列还是主题
② JMSDeliveryMode:设置是持久还是非持久。面试题可能会问到:如何保证消息的可靠性,持久化?
一条持久化的消息:应该被传送"一次仅仅一次",这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。一条非持久消息:最多会传送一次,这意味着服务器出现故障,该消息将永远丢失。
③ JMSExpiration:过期时间,默认永不过期。
④ JMSPriority:优先级,默认是4级,有0~9 ,5-9 是紧急的,0-4 是普通的。
⑤ JMSMessageId:唯一的消息ID,由消息中间件产生。
4.2、消息体
封装具体的消息数据,发送和接收的消息类型必须一致
①TextMessage:普通字符串消息,包含一个String(常用)
②Mapmessage:Map 类型的消息,k->Stringv -> Java 基本类型常用)
③BytesMessage:二进制数组消息,包含一个byte[]
④ StreamMessage:Java 数据流消息,用标准流操作来顺序的填充读取
⑤ObjectMessage:对象消息,包含一个可序列化的Java 对象
4.3、消息属性
作用:识别、去重、重点标注
如果需要除消息头字段以外的值,那么可以使用消息属性,如下:可以设置Boolean、int、String...的属性,也是以k-v键值对存储的。消费者在接收时也直接get出来就行了。
比如说想给某个消息设置一个vip,只拿他自己...
5、JMS的可靠性
(1)持久性
// 在队列为目的地的时候持久化消息
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 队列为目的地的非持久化消息
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
持久化的消息:
服务器宕机后消息依旧存在,只是没有入队,当服务器再次启动,消息就会被消费。
非持久化的消息:
服务器宕机后消息永远丢失。 而当没有注明是否是持久化还是非持久化时,默认是持久化的消息
。
消息生产者:
public class producerTopic {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.115:61616";
public static final String TOPIC_NAME = "topic-01"; // 1对1 的队列
public static void main(String[] args) throws JMSException {
// 1 按照给定的url创建连接工程,这个构造器采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂连接 connection 和 启动
Connection connection = activeMQConnectionFactory.createConnection();
// 3 创建会话 session
// 两个参数,第一个事务, 第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建消息生产者
Topic topic = session.createTopic(TOPIC_NAME);//类似于公众号的名字
MessageProducer messageProducer = session.createProducer(topic);
//设置为持久化
messageProducer.setDeliveryDelay(DeliveryMode.PERSISTENT);
// 5 发布订阅|要先进行持久化再启动了
connection.start();
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("msg--persist" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println("主题发布到MQ完成");
}
}
消息消费者:
public class consumerTopic {
public static final String ACTIVEMQ_URL = "tcp://192.168.1.115:61616";
public static final String TOPIC_NAME = "topic-01"; // 1对1 的队列
public static void main(String[] args) throws JMSException {
System.out.println("**********zs");
// 1 按照给定的url创建连接工程,这个构造器采用默认的用户名密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂连接 connection 和 启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("marry");//表明有一个叫张三的用户订阅
// 3 创建会话 session
// 两个参数,第一个事务, 第二个签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地 (两种 : 队列/主题 这里用队列)
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark...");
connection.start();
Message message = topicSubscriber.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到持久化的" + textMessage.getText());
message = topicSubscriber.receive(5000L);
}
session.close();
connection.close();
}
}
持久化的消息:
服务器宕机后消息依旧存在,只是没有入队,当服务器再次启动,消息就会被消费。
非持久化的消息:
服务器宕机后消息永远丢失。 而当没有注明是否是持久化还是非持久化时,默认是持久化的消息
。
队列(queue)持久化:默认是持久化消息的,此模式保证这些消息只会被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久化消息传递至目标后,消息服务在向消费者传送它们之前不会丢失消息。
主题(topic)持久化:默认就是非持久化的,让主题的订阅支持化的意义在于:对于订阅了公众号的人来说,当用户手机关机,在开机后任就可以接受到关注公众号之前发送的消息。(这里不能看成是关注非关注)一定要先运行一次消费者,等于向MQ注册,我关注了这个公众号,如果我取关了,一个月后又关注了,这一个月的消息统统推送给你~
(2)事务
事务偏生产者,签收偏消费者
createSession的第一个参数为true 为开启事务,开启事务之后必须在将消息提交,才可以在队列中看到消息,要不然这个事务就是没结束。
事务开启的意义在于,如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性。 对于消息消费者来说,开启事务的话,可以避免消息被多次消费,以及后台和服务器数据的不一致性。
举个栗子:
如果消息消费的 createSession 设置为 ture ,但是没有 commit ,此时就会造成非常严重的后果,那就是在后台看来消息已经被消费,但是对于服务器来说并没有接收到消息被消费,此时就有可能被多次消费。开启了事务注意一定要commit!!!生产事务开启,只有commit后才能将全部消息变为已消费。
(3)签收(俗称ack)
Session.AUTO_ACKNOWLEDGE 自动签收,默认
Session.CLIENT_ACKNOWLEDGE 手动签收
手动签收需要acknowledge
textMessage.acknowledge();
对于开启事务时,设置手动签收和自动签收没有多大的意义,都默认自动签收,也就是说事务的优先级更高一些。可能是提交事务时,底层代码自动调用了ack。关闭了事务,第二个签收参数就起效。如果没有签收,就会重复的接收数据。
(4)签收和事务的关系
在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。非事务性会话中,消息何时被确认取决于创建会话的应答模式(acknowledgement mode/是否签收)。
6、JMS小结
1、JMS点对点总结
点对点模型是基于队列的,生产者发消息到队列,消费者从对列接收到消息,队列的存在使得消息的异步传输成为可能。和我们平时给朋友发送短信类似。
①如果在Session关闭时有部分消息已被收到但还没有被签收,那当消费者下次连接到相同队列时,这些消息还会被再次接收。
②队列可以长久保存消息直到消费者收到消息。消费者不需要因为担心会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势。
2、JMS发布订阅总结
JMS Pub/Sub 模型定义了如何向一个内容发布和订阅消息,这些节点被称为topic,主题可以被认为是消息的传输中介,发布者发布消息到主题,订阅者从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。
3、持久订阅
客户端先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户再次连接到MQ时会根据消费者的ID得到所有当自己处于离线时发送到主题的消息。
非持久订阅状态下,不能恢复或重新派送一个未签收的消息,持久订阅才能恢复或重新派送一个未签收的消息。
4、非持久订阅
非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能收到发送到某个主题的消息。如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远也不会收到。
一句话,非持久订阅下,先要订阅注册才能接受到发布,只给订阅者发布消息
五、ActiveMQ的broker
1、是什么
broker 就是实现了用代码形式启动 ActiveMQ ,将 MQ 内嵌到 Java 代码中,可以随时启动,节省资源,提高了可靠性。就是将 MQ 服务器作为了 Java 对象
2、怎么用
使用多个配置文件启动 activemq
//复制一份原来的配置文件
cp activemq.xml activemq02.xml
//以active02 启动mq 服务器
./activemq start xbean:file:/myactivemq/apache-activemq-5.15.9/conf/activemq02.xml
3、嵌入式Broker
①引入pom
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
②撸代码
public class EmbedBroker {
public static void main(String[] args) throws Exception {
// broker 服务
BrokerService brokerService = new BrokerService();
// 把小型 activemq 服务器嵌入到 java 代码
brokerService.setUseJmx(true);
// 原本的是 192.…… 是linux 上的服务器,而这里是本地windows 的小型mq 服务器
brokerService.addConnector("tcp://loaclhost:61616");
//开启
brokerService.start();
}
}
六、Spring整合ActiveMQ
1.pom
<!-- activeMQ jms 的支持 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency> <!-- pool 池化包相关的支持 -->
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.9</version>
</dependency>
<!-- aop 相关的支持 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
2.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://camel.apache.org/schema/spring"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<context:commponent-scan base-package="com.at.activemq"/>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.17.3:61616"></property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<!-- 队列目的地 -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="spring-active-queue"></constructor-arg>
</bean>
<!-- jms 的工具类 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsFactory"/>
<property name="defaultDestination" ref="destinationQueue"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
</beans>
3.生产者
@Service
public class SpringMQ_producer {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
SpringMQ_producer producer = (SpringMQ_producer) ctx.getBean("springMQ_Producer");
producer.jmsTemplate.send((session) -> {
TextMessage textMessage = session.createTextMessage("spring 和 activemq 的整合");
return textMessage;
});
System.out.println(" *** send task over ***");
}
}
4.消费者
@Service
public class Spring_MQConsummer {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
Spring_MQConsummer sm = (Spring_MQConsummer)ac.getBean("spring_MQConsummer");
String s = (String) sm.jmsTemplate.receiveAndConvert();
System.out.println(" *** 消费者消息"+s);
}
}
可以在spring 中设置监听器,不用启动消费者,就可以自动监听到消息,并处理,如下:
七、SpringBoot整合ActiveMQ
1、队列
①配置pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.boot.activemq</groupId>
<artifactId>boot_mq_produce</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compoler.source>1.8</maven.compoler.source>
<maven.compoler.target>1.8</maven.compoler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
②配置yml
server:
port:7777
spring:
activemq:
broker-url: tcp://192.168.111.136:61616 #自己的MQ服务器地址,用自己的
user: admin
password: admin
jms:
pub-sub-domain: false #false表示Queue true表示Topic,不写默认false
#自己定义的队列名称
myqueue: boot-activemq-queue
③配置bean
@Component
@EnableJms
public class ConfigBean {
//读取在配置文件中配置的myqueue,队列的名字不写死
@Value("${myqueue}")
private String myQueue;
@Bean
public Queue queue() {
return new ActiveMQQueue(myQueue);
}
}
④生产者代码
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(queue, "*********" + UUID.randomUUID().toString().substring(0, 6));
}
}
想要调一次,就往消息服务器上发一次消息,就要在config里面开启一个注解@EnableJms,代表开启了jms适配的注解
⑤启动类
@SpringBootApplication
public class MainApp_Produce {
public static void main(String[] args) {
SpringApplication.run(MainApp_Produce.class, args);
}
}
⑥要求每3s发送一条消息
修改生产Service,添加一个定时生产的方法
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg() {
jmsMessagingTemplate.convertAndSend(queue, "*********" + UUID.randomUUID().toString().substring(0, 6));
}
//每隔三秒定时投送一条消息
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled() {
jmsMessagingTemplate.convertAndSend(queue, "Scheduled*********" + UUID.randomUUID().toString().substring(0, 6));
}
}
修改主启动类,添加注解@EnableScheduling
@SpringBootApplication
@EnableScheduling //主启动类开启定时投送功能
public class MainApp_Produce {
public static void main(String[] args) {
SpringApplication.run(MainApp_Produce.class, args);
}
}
以上两种点一次发一次和间隔定投足以解决工作中的大部分问题,消费者微服务的配置大概与生产者相同,不需要config的配置,如下消费者业务代码
⑦消费者代码
@Component
public class Queue_Consumer {
//springboot不需要再编写监听器,只需要加一个注解JmsListener即可时刻收到生产者的消息
@JmsListener(destination = "${myqueue}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("**********收到消息" + textMessage.getText());
}
}
2、主题
①pom同上不变
②yml
server:
port:6666
spring:
activemq:
broker-url: tcp://192.168.111.136:61616 #自己的MQ服务器地址,用自己的
user: admin
password: admin
jms:
pub-sub-domain: true #false表示Queue true表示Topic,不写默认false
#自己定义的队列名称
myTopic: boot-activemq-topic
③启动类
@SpringBootApplication
@EnableScheduling
public class Main_Topic_Produce {
public static void main(String[] args) {
SpringApplication.run(Main_Topic_Produce.class, args);
}
}
④配置Bean
@Component
public class ConfigBean {
@Value("${myTopic}")
private String topicName;
@Bean
public Topic topic() {
return new ActiveMQTempTopic();
}
}
⑤生产者代码
@Component
public class Topic_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 3000)
public void produce() {
jmsMessagingTemplate.convertAndSend(topic, "主题消息:"+UUID.randomUUID().toString().substring(0, 6));
}
}
消费者微服务的配置大概和生产者相同,如下是消费者代码
⑥消费者代码
@Component
public class Topic_Consumer {
@JmsListener(destination = "${myTopic}")
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("消费者接收到主题:" + textMessage.getText());
}
}
八、ActiveMQ的传输协议
1、面试题
①默认的端口61616怎么修改
②生产上的链接协议如何配置的,TCP吗?
2、配置
设置协议的地方是activemq 的activemq.xml
配置文件
如上默认是使用 openwire 也就是 tcp 协议,默认的Broker 配置,TCP 的Client 监听端口 61616 ,在网络上传输数据,必须序列化数据,消息是通过一个 write protocol 来序列化为字节流。默认情况 ActiveMQ 会把 wire protocol 叫做 Open Wire ,它的目的是促使网络上的效率和数据快速交互 。
3、有哪些协议
ActiveMQ 支持的协议有 TCP 、 UDP、NIO、SSL、HTTP(S) 、VM
参考(https://activemq.apache.org/configuring-version-5-transports.html)
#使用tcp 的一些优化方案:
tcp://hostname:port?key=value
它的参数详情参考:http://activemq.apache.org/tcp-transport-reference
1.NIO协议和TCP协议类似,但是更偏底层操作,它允许开发人员对同一资源有更多的client调用和服务端有更多的负载。
2.适合NIO 使用的场景:
①当有大量的Client 连接到Broker 上 , 使用NIO 比使用 tcp 需要更少的线程数量,所以使用 NIO
②可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
3.NIO链接的URI形式:nio://hostname:port?key=value
4.Transport Connector配置示例,参考官网:http://activemq.apache.org/configuring-version-5-transports.html
4、NIO案例演示
先停掉activemq,然后在activemq.xml添加以下配置
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumCon nections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnect ions=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConn ections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnect ions=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnection s=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true"/> <!-- 这是添加的 -->
</transportConnectors>
使用 NIO 协议后,代码修改量极小,只需同时将消息生产者和消费者的 URL 修改即可:
//public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
public static final String ACTIVEMQ_URL = "nio://192.168.17.3:61618";
如果不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络的IO模型。(OpenWire,STOMP,AMQP......),所以为了首先提高单节点的网络吞吐性能,需要明确指定Active的网络IO模型,BIO中B其实就是Blocked(阻塞),BIO就是阻塞的IO,NIO就是非阻塞的IO。
5、NIO案例增强
URI 格式以 nio 开头,表示这个端口使用 tcp 协议为基础的NIO 网络 IO 模型,但这样设置它只支持 tcp 、 nio 的连接协议。如何让它支持多种协议?
Starting with version 5.13.0, ActiveMQ supports wire format protocol detection. OpenWire, STOMP, AMQP, and MQTT can be automatically detected. This allows one transport to be shared for all 4 types of clients.
配置方法: http://activemq.apache.org/auto
使用 auto 的方式就相当于四合一协议 : STOMP AMQP MQTT TCP NIO
<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>
<transportConnector name="auto" uri="auto://localhost:5671?auto.protocols=default,stomp"/>
将activemq.xml的配置该为如下:
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000
&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corelPoolSize=20
&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50"/>
此时再启动activemq
public static final String ACTIVEMQ_URL = "nio://192.168.17.3:61608";
其实就是一个端口适配多种协议,同样代码只需修改 URI ,对于 nio 和 tcp 的只需要修改如上URI即可,但不代表使用其他协议代码相同,因为底层配置不同,其他协议如果使用需要去修改代码
九、ActiveMQ的消息存储和持久化
1、什么是MQ的持久化
官网 : http://activemq.apache.org/persistence
为了避免意外宕机以后丢失消息,需要重启后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制有JDBC、AMQ、KahaDB和LevelDB,无论采用哪种方式,消息的存储逻辑都是一致的。
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。消息中心启动以后首先要检查指定的存储位置。如果有未发送成功的消息,则需要把消息发送出去。
2、几种MQ持久化的方式
①AMQ (了解)
AMQ是基于文件存储形式,写入快、易恢复 默认 32M 在 ActiveMQ 5.3 之后不再适用。
②KahaDB :
(1)KahaDB在5.4 之后基于日志文件的持久化插件,默认持久化插件,提高了性能和恢复能力
(2)KahaDB的参数配置参考官网:https://activemq.apache.org/kahadb
(3)KahaDB 的存储原理
KahaDB 的属性配置 : http://activemq.apache.org/kahadb
KahaDB是默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用
一个事务日志
和仅仅用一个索引文件
来存储他所有的地址。KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
db-<数字>.log: 存储数据,一个存满会再次创建 db-2 db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档
db.data: 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-<x>.log 里的消息
db.free: 存储空闲页 ID 有时会被清除
db.redo: 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引
lock: 顾名思义就是锁 ,表示获取到当前读取权限的broker。
(一点题外话:就像mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。)
四类文件 + 一把锁 ==>> KahaDB
③JDBC消息存储
(1)修改配置文件,默认 kahaDB
##修改之前:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
##修改之后:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
(2)在activemq 的lib 目录下添加 jdbc 的jar 包 ,如果用了druid还要导入druid的jar包
(3)改配置文件 : activemq.xml,配置位置如下</broker>之后,<import>之前,使其连接自己windows 上的数据库,并在本地创建名为activemq 的数据库
(4)让linux 上activemq 可以访问到 mysql,之后产生消息。ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock
activemq_msgs:
用于存储消息,Queue和Topic都存储在这个表中
activemq_acks:
用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
activemq_lock:
在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,此表用于记录当前哪一个broker作为Master Broker
如果新建数据库OK+上述配置OK+代码运行OK,上述三表会自动生成,万一情况,手动建表SQL如下:
-- auto-generated definition
create table ACTIVEMQ_ACKS
(
CONTAINER varchar(250) not null comment '消息的Destination',
SUB_DEST varchar(250) null comment '如果使用的是Static集群,这个字段会有集群其他系统的信息',
CLIENT_ID varchar(250) not null comment '每个订阅者都必须有一个唯一的客户端ID用以区分',
SUB_NAME varchar(250) not null comment '订阅者名称',
SELECTOR varchar(250) null comment '选择器,可以选择只消费满足条件的消息,条件可以用自定义属性实现,可支持多属性AND和OR操作',
LAST_ACKED_ID bigint null comment '记录消费过消息的ID',
PRIORITY bigint default 5 not null comment '优先级,默认5',
XID varchar(250) null,
primary key (CONTAINER, CLIENT_ID, SUB_NAME, PRIORITY)
)
comment '用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存';
create index ACTIVEMQ_ACKS_XIDX
on ACTIVEMQ_ACKS (XID);
-- auto-generated definition
create table ACTIVEMQ_LOCK
(
ID bigint not null
primary key,
TIME bigint null,
BROKER_NAME varchar(250) null
);
-- auto-generated definition
create table ACTIVEMQ_MSGS
(
ID bigint not null
primary key,
CONTAINER varchar(250) not null,
MSGID_PROD varchar(250) null,
MSGID_SEQ bigint null,
EXPIRATION bigint null,
MSG blob null,
PRIORITY bigint null,
XID varchar(250) null
);
create index ACTIVEMQ_MSGS_CIDX
on ACTIVEMQ_MSGS (CONTAINER);
create index ACTIVEMQ_MSGS_EIDX
on ACTIVEMQ_MSGS (EXPIRATION);
create index ACTIVEMQ_MSGS_MIDX
on ACTIVEMQ_MSGS (MSGID_PROD, MSGID_SEQ);
create index ACTIVEMQ_MSGS_PIDX
on ACTIVEMQ_MSGS (PRIORITY);
create index ACTIVEMQ_MSGS_XIDX
on ACTIVEMQ_MSGS (XID);
(5)小总结
在点对点类型中:当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中;当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中,而且点对点类型中一旦被consumer消费就从broker中删除。点对点会在数据库的数据表 ACTIVEMQ_MSGS 中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除 。
但是对于主题,订阅方式接受到的消息,会在 ACTIVEMQ_MSGS 存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息。并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。
(6)开发中的坑
注意点1:要将使用到的jar文件房知道ActiveMQ安装目录下的lib中。包括mysql_jdbc和对应的连接池的jar包
注意点2:在jdbcPersistenceAdapter标签中设置了createTableOnStartup属性为true时候,第一次启动ActiveMQ时,将会自动创建数据表,但是启动之后可以更改为false
下划线坑爹:java.lang.IllageStateException:BeanFactory not initialized or aleady closed:这是因为您的操作机器中有_符号,更改机器名后重启就行。
④LevalDB (了解)
LeavelDB : 希望作为以后的存储引擎,5.8 以后引进,也是基于文件的本地数据存储形式,但是比 KahaDB 更快,它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自带的 LeavelDB 索引
题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB引擎,因为activemq 官网本身没有定论,LeavelDB之后又出了可复制的LeavelDB比LeavelDB 更性能更优越,但需要基于 Zookeeper 所以这些官方还没有定论,默认就使用 KahaDB
⑤JDBC Message store with ActiveMQ Journal
这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高。
3、几种持久化方式总结
① jdbc效率低,kahaDB效率高,jdbc+Journal效率较高。
② 持久化消息主要指的是:MQ所在服务器宕机了消息不会丢试的机制。
③ 持久化机制演变的过程:
从最初的AMQ Message Store方案到ActiveMQ V4版本退出的High Performance Journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。ActiveMQ5.3版本又推出了对KahaDB的支持(5.4版本后被作为默认的持久化方案),后来ActiveMQ 5.8版本开始支持LevelDB,到现在5.9提供了标准的Zookeeper+LevelDB集群化方案。
十、ActiveMQ的多节点集群
1、引入消息队列后如何保证高可用?
2、是什么:
基于ZooKeeper和LevelDB搭建ActiveMQ集群。集群仅供主备方式的高可用集群功能,避免单点故障。
3、zookeeper+replicated+leveldb-store的主从集群(三种方式)
①基于shareFileSystem共享文件
②基于JDBC
③基于可复制的LevelDB
集群官网参考: http://activemq.apache.org/replicated-leveldb-store
这幅图的意思就是 当 Master 宕机后,zookeper 监测到没有心跳信号, 则认为 master 宕机了,然后选举机制会从剩下的 Slave 中选出一个作为新的 Master
6.在jetty.xml修改端口
7.hostname名字映射
vim /etc/hosts
8、activemq集群配置
首先三台服务器的路径
然后将broker的名字改为相同
最后进行三个节点的持久化配置
三个节点的持久化配置参考官网:https://activemq.apache.org/replicated-leveldb-store.html
修改持久化配置:
<persistenceAdapter>
<replicatedLevelDB
directory="{activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:63631"
zkAddress="localhost:2191,localhost:2192,localhost:2193"
zkPassword="123456"
sync="local_disk"
zkPath="/activemq/leveldb-stores"
hostname="wh-mq-server"
/>
</persistenceAdapter>
三台服务器都要修改如上的配置,只不过将端口号改了就好
9.修改各节点的消息端口
10、按顺序启动三个Activemq节点,首先是zk集群已经成功启动运行了的
主要说zookeper 复制三份后改配置文件,并让之自动生成 myid 文件,并将zk的端口改为之前表格中对应的端口 。修改conf 下的配置文件zoo.cfg
#配置如下
tickTime=2000
initLimit=10
syncLimit=5
clientPort=2191 // 自行设置
server.1=192.168.17.3:2888:3888
server.2=192.168.17.3:2887:3887
server.3=192.168.17.3:286:3886
dataDir=/zk_server/data/log1 // 自行设置
修改完成后就可以启动zk了,批处理启动zk的脚本如下
#!/bin/sh
cd /zk_server/zk_01/bin
./zkServer.sh start
cd /zk_server/zk_02/bin
./zkServer.sh start
cd /zk_server/zk_03/bin
./zkServer.sh start
编写这个 zk_batch.sh 之后
#命令即可让它变为可执行脚本
chmod 700 zk_batch.sh
#启动了三个zk 的服务
./zk_batch.sh start 即可
同理可以写一个批处理关闭zk 服务的脚本和 批处理开启mq 服务 关闭 mq 服务的脚本。
#完成上述之后连接zk 的一个客户端
./zkCli.sh -server 127.0.0.1:2191
#如果要查看哪一个是master,可以使用如下命令
get /activemq/leveldb-stores/00000000003
此次验证表明 00000003 的节点状态是master (即为63631 的那个mq 服务) 而其余的(00000004 00000005) activemq 的节点是 slave。此时集群已经顺利搭建成功 !
测试可用性
如上所示:此次测试表明只有 8161 的端口可以使用 经测试只有 61 可以使用,也就是61 代表的就是master
测试集群可用性:
生产者和消费者都修改如下代码:
public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.17.3:61616,
tcp://192.168.17.3:61617,tcp://192.168.17.3:61618)?randomize=false";
public static final String QUEUE_NAME = "queue_cluster";
至此:activemq集群测试成功
十一、ActiveMQ的高级特性和大厂面试重点
1、引入消息队列后 如何保证高可用性
持久化、事务、签收、 以及带复制的 Leavel DB + zookeeper 主从集群搭建
2、 异步投递 Async send
(官网参考)https://activemq.apache.org/async-sends
对于一个慢消费者,使用同步有可能造成堵塞,消息消费较慢时适合用异步发送消息。activemq 支持同步异步发送的消息,默认异步。如果没有使用事务,且发送的是持久化消息,这样是同步的,每次发送都会阻塞一个生产者直到 broker 发回一个确认,这样做保证了消息的安全送达,但是会阻塞客户端,造成很大延时 。
在高性能要求下,可以使用异步提高producer的性能。但会消耗较多的client端内存,也不能完全保证消息发送成功。在useAsyncSend = true 情况下需要容忍消息丢失的可能。
三种开启的方式如下:(参考自官网)
-
1、 Configuring Async Send using a Connection URI(
url 后面加参数
)
You can use the Connection Configuration URI to configure async sends as follows
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
-
2、Configuring Async Send at the ConnectionFactory Level(
开启ActiveMQConnectionFactory 的Async 为true
)
You can enable this feature on the ActiveMQConnectionFactory object using the property.
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
-
3、Configuring Async Send at the Connection Level(
将ActiveMQConnection 设Async 为true
)
Configuring the dispatchAsync setting at this level overrides the settings at the connection factory level.You can enable this feature on the ActiveMQConnection object using the property.
((ActiveMQConnection)connection).setUseAsyncSend(true);
3、 异步发送如何确保发送成功
异步发送消息丢失的情况场景是: UseAsyncSend 为 true 使用 producer(send)持续发送消息,消息不会阻塞,生产者会认为所有的 send 消息均会被发送到 MQ ,如果MQ 突然宕机,此时生产者端尚未同步到 MQ 的消息均会丢失。
所以:正确的异步发送方法需要接收回调 。同步发送和异步发送的区别就在于——同步发送send 不阻塞就代表消息发送成功。异步发送需要接收回执并又客户端在判断一次是否发送
//其他代码不变
activeMQConnectionFactory.setUseAsyncSend(true);
……
for (int i = 1; i < 4 ; i++) {
textMessage = session.createTextMessage("msg--" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"-- orderr");
String msgid = textMessage.getJMSMessageID();
messageProducer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
// 发送成功怎么样
System.out.println(msgid+"has been successful send ");
}
@Override
public void onException(JMSException e) {
// 发送失败怎么样
System.out.println(msgid+" has been failure send ");
}
});
}
4、延迟投递和定时投递
第一步:如官网所说,设置schedulerSupport为true
第二步:修改代码如下
long delay = 3 * 1000 ;
long perid = 4 * 1000 ;
int repeat = 7 ;
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("delay msg--" + i);
// 消息每过 3 秒投递,每 4 秒重复投递一次 ,一共重复投递 7 次
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
messageProducer.send(textMessage);
}
完整代码:
package com.activemq.demo;
import org.apache.activemq.*;
import javax.jms.*;
import java.util.UUID;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
long delay = 10*1000;
long period = 5*1000;
int repeat = 3 ;
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
// 延迟的时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
// 重复投递的时间间隔
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
// 重复投递的次数
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
// 此处的意思:该条消息,等待10秒,之后每5秒发送一次,重复发送3次。
messageProducer.send(textMessage);
}
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
messageProducer.close();
session.close();
connection.close();
}
}
}
5、activemq的消息重试机制
5.2、具体哪些情况会导致activemq的消息重发
- 1、客户端用了事务,并且调用rollback()。
- 2、客户端用了事务,但是未commit()。
- 3、客户端在CLIENT_ACK情况下,调用了session的recover()。
- 4、投递失败,标记有毒消息,DLQ死性队列。
5.3、RedeliveryPolicy(消息重试)属性说明
修改重试次数为3(默认是6)。更多的设置请参考官网文档。
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "dead01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 修改默认参数,设置消息消费重试3次
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("***消费者接收到的消息: " + textMessage.getText());
//session.commit();
}catch (Exception e){
e.printStackTrace();
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
在spring中使用重发机制
5.3、请说说消息重发时间间隔和重发次数
- 处理失败指的是MessageListener的onMessage方法里抛出RuntimeException。
- Message头里有两个相关字段:Redelivered默认为false,redeliveryCounter默认为0。
- 消息先由broker发送给consumer,consumer调用listener,若是处理失败,本地redeliveryCounter++,给broker一个特定应答,broker端的message里redeliveryCounter++,延迟一点时间继续调用,默认1s。超过6次,则给broker另外一个特定应答,broker就直接发送消息到DLQ。内存
- 若是失败2次,consumer重启,则broker再推过来的消息里,redeliveryCounter=2,本地只能再重试4次即会进入DLQ。
- 重试的特定应答发送到broker,broker即会在内存将消息的redelivered设置为true,redeliveryCounter++,可是这两个字段都没有持久化,即没有修改存储中的消息记录。因此broker重启时这两个字段会被重置为默认值。
默认是:间隔每1秒钟重发一次,共计6次。6次的意思是,当消息重试机制被触发,第一次,第二次....第六次消费者都能接收到消息,但是当第七(失败以后1+6=7次)次发送消息的时候就会被发送到死信队列中,不在进行接收消息。
5.4、有毒消息Poison ACK 谈谈你的理解
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费者会个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。
6、死信队列
6.1是什么
官网文档: http://activemq.apache.org/redelivery-policy
死信队列:异常消息规避处理的集合,主要处理失败的消息。
在业务逻辑中,如果一个订单系统没有问题,则使用正常的业务队列,当出现问题,则加入死信队列 ,此时可以选择人工干预还是机器处理 。
6.2死信队列的配置(一般采用默认的)
死信队列默认是全部共享的,但是也可以设置独立的死信队列
7、幂等性,即为保证消息不被重复消费
幂等性,即为保证消息不被重复消费。在网络传输延迟中,会造成MQ的重试中,在重试中,可能会造成重复消费。
解决方案:
如果这个消息是做数据库的插入操作,给这个消息做一个唯一主键,就算是出现了重复消费和情况,就会导致主键冲突。实在不行,准备一个第三方的服务去做消费记录,以redis为例,给消息分配一个id,只要是消费过该消息,会以<id,message>以k-v形式写入redis,先去redis中查询有无消费记录即可。