1. 消息队列
简单理解就是存放消息的队列
优点
- 异步
比如:利用消息队列,可以将请求存入队列即响应给用户,不用等到系统执行完业务逻辑才能响应,即响应和业务逻辑执行不是同步阻塞的,这样响应快、体验好。 - 解耦
这里可以把消息队列想象成插排,电源端只负责供电,想用电的设备插在插排上就行了,不想用就拔掉即可,电源端和需要充电的设备互不影响。
消息队列就是这样,一端只负责将消息存入消息队列,然后需要消息的应用就订阅消息队列,从中取消息使用即可,即使不想用了,也只需要取消订阅即可。也就是消息提供者和消息使用者之间互不干扰实现解耦 - 削峰
常见于秒杀系统,用来限制请求数量,减轻服务器压力。比如建一个定长的消息队列,请求都存入这个队列,一旦超出定长就可以对后续的请求直接反馈给用户名额已满之类的信息。
2. 消息相关概念
2.1 消息代理(message broker)
一般指消息中间件的服务器,用来管理消息并保证将消息发送到正确目的地。
2.2 目的地(destination)
就是需要消息的应用,通信机制有两种:
1)队列(queue):点对点形式消息通信
详细队列中的消息是一次性的,只能被一个应用取出(取出后消息就被从队列中移除),所以,取消息的应用可以有多个,但是最终只有一个应用成功取到消息。
2)主题(topic):发布(publish)/订阅(subscribe)模式的消息通信
消息发布到队列中,所有订阅了队列的应用都可以接收到消息。
2.3 JMS 和 AMQP
JMS(Java Message Service):
是基于 JVM 的消息代理规范,例如 ActiveMQ
AMQP(Advanced Message Queuing Protocol):
高级消息队列协议,也是消息代理的一种规范,能够兼容 JMS,例如 RabbitMQ
3. 几种消息中间件的对比
ActiveMq | RabbitMq | RocketMq | Kafka | |
---|---|---|---|---|
单机吞吐量 | 每秒万级 | 每秒万级 | 10万级 | 10万级 |
时效性 | 毫秒级 | 微秒级 | 毫秒级 | 毫秒级 |
可用性 | 基于主从架构 | 基于主从架构 | 天然支持分布式 | 天然支持分布式 |
消息可靠性 | 较低概率丢失 | 经过配置几乎可以0丢失 | 经过配置几乎可以0丢失 | 经过配置几乎可以0丢失 |
功能 | 及其完备 | 基于erlang语言开发,并发能力强,延时很低 | 接口简单易用,功能完备,扩展性好,比如支持,普通消息(同步发送,异步发送,单向发送),定时延时消息,顺序消息,事物消息 | 功能较为简单,主要支持简单的mq功能,因为功能简单,在大数据以及日志采集方面用处广 |
数据存储 | 每个节点存储着所有的数据 | 每个节点存储着所有的数据 | 每个节点存储着一部分数据,并且每个节点都有数据备份,防数据丢失 | 每个节点存储着一部分数据,并且每个节点都有数据备份,防数据丢失 |
4. SpringBoot 整合 RabbitMQ
4.1 RabbitMQ 核心概念
1)Message
消息,有消息头和消息体组成,消息体不透明。消息头有几个重要属性:
routing-key(路由键):用于指定消息接受者
priority(优先级):当先消息相对于其他消息的优先级
delivery-mode:用于指定是否持久化存储
2)publisher
消息发布者,是我们系统中向交换器发送消息的应用
3)exchange
交换器,接收消息发送者发送的消息,并将消息路由给服务器中的消息队列,有4种类型:direct(默认,点对点),fanout(广播),topic(广播+模糊匹配),headers,只有第一种是点对点的。
4)queue
存储消息的队列
5)binding
交换器和队列的绑定关系
6)connection,channel
connection(连接):就是网络连接
channel(信道):多路复用连接中,独立的数据流通道,是连接中的虚拟连接,与连接是一对多的关系,用来节省网络连接开销
所有 AMQP 命令都是经过信道发送
7)consummer
消费者,就是从消息队列中取消息的客户端应用程序,就是我们系统中需要消息的应用
8)virtual host
虚拟主机,表示一批交换器、消息队列等。虚拟主机是共享身份认证和加密环境的独立服务器域。它是 AMQP 概念的基础,连接时必须指定,RabbitMQ 默认的是 /。
9)broker
消息服务器实体
5. 整合及使用
5.1 安装 RabbitMQ
用 docker 安装 RabbitMQ 比较方便
//拉镜像
docker pull rabbitmq
//启动容器
docker run -d -p 5672:5672 -p 15672:15672 --name myRabbit 镜像id
RabbitMQ 默认是不开启 web 界面的,如需开启,需执行如下命令:
//进入容器
docker exec -it 容器id /bin/bash
//开启功能
rabbitmq-plugins enable rabbitmq_management
5672:客户端访问 RbbitMQ 的端口
15672:web界面端口
默认账号密码都是 guest
5.2 整合到 SpringBoot
5.2.1 引入依赖,配置
引入 spring-boot-start-amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
这个场景启动器又一入了消息和rabbitmq
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>5.2.5.RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.5.RELEASE</version>
<scope>compile</scope>
</dependency>
SpringBoot 默认消息中间件就是 RabbitMQ。
配置主机信息,相关配置都在 spring.rabbitmq 下
spring.rabbitmq.host= rabbitmq服务器IP
其它选项都已经有默认配置了,参考org.springframework.boot.autoconfigure.amqp.RabbitProperties。
另外,org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration 这个自动配置类为容器中注入了一些组件,如:
RabbitTemplate: 用于发送和接收消息
AmqpAdmin:rabbitMQ 的系统管理组件,用于交换器,消息队列等的操作
5.2.2 使用
RabbitTemplate 使用
监听消息队列
RabbitTemplate 可以主动的发送和获取消息,但如果想在消息队列有新的消息存入时就做出某些操作,就需要监听消息队列。两步:
1)在主配置类上加上 @EnableRabbit,开启 RabbitMQ 注解功能
2)@RabbitListener 标注在方法上,属性 queues 用于指明监听的队列,方法参数类型与推送的消息类型一致即可
如果不能在 web 界面操作交换器,队列,绑定等,需要借助 AmqpAdmin。
6. 补充
6.1 应答模式
消息队列中的消息,需要在使用过后删掉。rabbitmq 支持应答模式,也就是消息队列在接收到消费者的信号,确认了消息已经被处理才会删除队列中的消息。应答模式分为自动应答
和手动应答
。
6.1.1 自动应答
消息在发送出去之后,立即删除消息。
缺点:
如果发送过程中,connection、channel关闭,或者消费者挂掉,又或者消费者在收到消息后发成异常没有达到使用消息的目的,这些都无法组织消息队列删除消息,于是就产生了消息丢失。
6.1.2 手动应答
就是有消费者手动发送应答给消息队列,这样可以确保消费者收到消息并且正确执行,避免消息丢失。
6.1.2.1 java示例
public class Producer {
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
/**
* 队列名
* 队列是否持久化
*
/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
工作进程Worker1沉睡1秒钟后接收消息
public class Worker1 {
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Worker1等待接收消息......");
//声明接收消息回调函数 和 取消消息消费时的回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
//沉睡1s
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg = new String(message.getBody(),"UTF-8");
System.out.println("接收到的消息:" + msg);
//手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费");
};
//false表示不自动应答,采用手动应答
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
public class Worker2 {
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Worker2等待接收消息......");
//声明接收消息回调函数 和 取消消息消费时的回调函数
DeliverCallback deliverCallback = (consumerTag, message) -> {
//沉睡1s
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg = new String(message.getBody(),"UTF-8");
System.out.println("接收到的消息:" + msg);
//手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费");
};
//false表示不自动应答,采用手动应答
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
6.1.2.2 SpringBoot 代码示例
1.application.yml里添加手动应答(最后一行):
server:
port: 8082
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /
listener:
simple:
retry:
####开启消费者重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 3000
####开启手动ack
acknowledge-mode: manual
2.消费内容,手动应答:
@Component
public class FanoutSmsConsumer {
@RabbitListener(queues = "fanout_sms_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.out
.println(Thread.currentThread().getName() + ",邮件消费者获取生产者消息msg:" + new String(message.getBody(), "UTF-8")
+ ",messageId:" + message.getMessageProperties().getMessageId());
// 手动ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手动应答
channel.basicAck(deliveryTag, false);
}
}
3.因为application.yml里开启了手动应答,如果不手动应答,消息队列里会一直存在消息。
6.2 队列&消息持久化
rabbitmq 宕机重启后,队列和消息会丢失,所以需要根据需要做持久化。
6.2.1 队列持久化
方法1. java 代码
#声明持久化队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
方法2. 可以在web界面添加队列,然后选择持久化
6.2.2 消息持久化
设置生产者发送消息的持久化:
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
这里的关键是:MessageProperties.PERSISTENT_TEXT_PLAIN,这个配置是的消息可以被持久化到磁盘,但是如果在持计划过程中宕机,则依然会丢失消息,所以还需要进一步确认,也就是下面的发布确认
。
6.2.3 发布确认
发布确认有些类似与消费者的应答,是在消息队列将生产者发送的消息成功持久化到磁盘之后,发送确认消息给生产者,明确表示消息以持久化
- 单个确认
//开启发布确认
channel.confirmSelect();
//...发布消息,channel.basicPublish()
//等待发布确认
boolean flag = channel.waitForConfirms();
- 批量确认
同上,批量调用 channel.basicPublish() 之后,调用 channel.waitForConfirms();由于是发布一批之后再进行确认,效率高于单个发布确认,但是无法确认发布失败的消息是哪一个。 - 异步确认
须对 channel 绑定监听回调函数,
//开启发布确认
channel.confirmSelect();
/**
*重载的方法 channel.addConfirmListener(ConfirmCallback ackCallback) 和 channel.addConfirmListener(ConfirmCallback ackCallback,ConfirmCallback nackCallback)
*ConfirmCallback是一个函数式接口,抽象方法 handle(long deliveryTag, boolean multiple),可在回调中写在记得逻辑
*/
channel.addConfirmListener(ackCallback, nackCallback)
//...发布消息,channel.basicPublish()
//不须显示发布确认
6.3 分发机制
rabbitmq 默认是轮训分发消息,可在消费者端使用 channel.basicQos(1)
设置为不公平分发
另外可以通过预取值的设定,配置消费者的消费比例,上面的 1,代表的是不公平分发,但是大于 1 就代表预取值,例如:一个消费者设置的是 2,另一个消费者设置的是 4,那么就会进行 2:4 的比例分配给两个消费者,也就是发两条给第一个,然后再发给4条给第二个,在反过头来给一个发2条。。。但是也有个问题,例如消息队列中有很多消息了,但是第二个消费者消费的消息还没达到 4 条,那么就会消息阻塞(虽然有很多待处理消息,消费者1仍然无法获取消息)
6.4 交换机类型
- direct
“直接连接交换机”,直连类型,交换机根据路由键
匹配(精确匹配,一字不差才行),将消息发送给对应的队列,如果多个消费者通过同一个路由键监听同一个队列,则消息会轮训发送,也就是说同一条消息只有一个消费者可以接受到 - fanout
“无路由交换机”,广播模式,队列和交换机直接绑定,无需路由键(路由键为空字符转),生产者发送消息由交换机接收,然后分发给交换机的所有队列,同一条消息可以被所有订阅的消费者收到 - topic
“主题路由匹配交换机”,主题模式,是 fanout 的加强版,交换机可以根据路由键模糊匹配,分发给对应的队列(* 匹配一个单词,#匹配多个单词) - headers
6.5 死信队列
无法被消费的消息称为死信,对应的队列称为死信队列。
应用场景:为确保一些消息不被丢失,在消费异常后,可以推送到死信队列保存;例如订单支付未在指定时间内完成。
消费者 x-dead-letter-exchange,设置死信交换机
x-dead-letter-routing-key,设置死信路由
-
消息存活时间超时
生产者中设置,参考下图
- 队列满了
消费者 x-max-length 可设置队列最大长度 -
由于处理能力等,导致消息被拒
其中,requeue 为是否放到原队列。
以上三种情况,都可以将消息放到死信队列中。
6.6 延迟队列
和死信队列类似,如上面的死信队列结构图,没有消费者c1,如果设置了时限,则所有的消息都会延迟然后放到死信队列去执行,达到延迟效果。
实现方式:
- 直接在创建队列时,就设置延时时间(x-message-ttl),缺点是不灵活,不同的延迟时间就需要不同的队列
- 创建队列时不设置延时,在发送消息时设置延时,这样可以有生产者指定延时,比上种方法通用,通过 convertAndSend 中的 message 参数设置,缺陷是如果有两条消息如果第一条消息时间很长,第二条时间很多,时间短的消息并不能先执行。
-
上面两种方法都依赖死信队列,借助 rabbit_message_delay_exchange 插件,可以不用死信队列,它是在交换机实现的延迟,安装插件后创建 x-delay-exchange 类型的交换机(和direct,fannout并列的),用 CustomExchange 配置,无需死信队列