本文全部手敲-禁止转载内容
1. 体系结构与工作流程
1.1 概念及工作流程
1.1.1 概念
-
RabbitMQ Broker
Broker
可以简单看做是一个RabbitMQ 服务节点,或者RabbitMQ 服务实例,RabbitMQ Broker也可以看做一台服器
(除了在单机上安装多个RabbitMQ 用于调试外的情况外)
-
Producer
消息生产角色(寄件人)
-
Consumer
消费消息的角色(收件人)
1.1.2 RabbitMQ 模型架构
1.1.3 消息运转流程
1.2 Broker内组件介绍
1.2.1 队列 (Queue)
多个消费者可以订阅同一个Queue
RabbitMQ 不支持队列广播消息给所有消费者
Queue 会平均分摊(Round-Robin轮询)消息给所有消费者
1.2.2 交换机 (Exchange)
我们可以暂时理解成生产者将消息投递进了队列,但实际上,这并没有发生,生产者的消息中间还经手了邮递员 (交换机) 的投递,交换机根据你的地址 路由键(RoutingKey) ,并匹配上队列的地址 绑定键(BindingKey) ,之后再进行投递
当然也有不按照地址投递,做专项挂号信投递的邮递员 (交换机) ,直接按照他自己的规则来投递消息
-
fanout 交换机
以广播的方式转发消息到与其绑定的所有的队列
-
direct交换机
只转发消息到交换机自己的路由键和与队列的绑定键完全一致的队列中
下图中,如果发送消息的路由键(RoutingKey)为warning,消息将会投递到队列1和队列2中,如果路由键(RoutingKey)为info或debug,消息将会投递到队列2
[
...(image-edace0-1601114004338)]
-
topic交换机
转发消息到交换机自己的路由键和与队列的绑定键匹配规则吻合的消息队列中,支持模糊匹配
(其中
*
可以匹配一个单词,#
可以匹配多个或零个单词)路由键为 'com.rabbitmq.client' 的消息会同时投递到Queue1和Queue2
路由键为 'com.hidden.client' 的消息会同时投递到Queue2
路由键为 'com.hidden.demo' 的消息会同时投递到Queue2
-
路由键为 'com.rabbitmq.demo' 的消息会同时投递到Queue1
- 路由键为 'java.util.lang' 的消息会被丢弃或返回给生产者
-
headers交换机
会转发消息头中的属性和 Binding 创建指定的属性一致的消息到队列中
相比其他交换机,性能低下,而且不实用
1.2.3 路由键 (RoutingKey)
生产者将消息发给交换机的时候,一般会指定一个 RoutingKey ,用来指定这个消息的路由规则,而这个 RoutingKey 需要和交换器的类型和 绑定键(Binding) 联合使用才能生效
1.2.4 绑定 (Binding)
RabbitMQ 中用来绑定队列和消费者的概念,在绑定的时候一般会指定一个 绑定键(BindingKey)
1.3 RabbitMQ的管理
1.3.1 管理的方式
- 插件 rabbitmq_management web管理页面
rabbitmq_management 提供的 HTTP API
rabbitmqctl CLI 命令行工具
1.3.2 管理的维度
-
多租户 (vhost)
通过后缀地址将不同应用在Broker内分割成不同的虚拟环境沙箱
如下eshop和chat两个项目连接到RabbitMQ后,是看不到对方应用所声明的队列和交换机的:
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="" cid="n123" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit;">"amqp://userName:password@ipAddress:portNumber/eshop"
"amqp://userName:password@ipAddress:portNumber/chat"</pre> -
用户及权限
查看应用,集群信息和重启关闭应用,创建维护集群
查看服务端健康状态和运行信息
2. 消息可靠性保障
2.1 与Broker建立连接
客户端通过ConnectionFactory
类配置好账号地址等访问信息后,获取Connection
,并由Connection
来开启channel(信道)
,后面所有的消息发送消费都是有channel
具体来执行的
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="java" cid="n140" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: Monaco, Consolas, "Andale Mono", "DejaVu Sans Mono", monospace; margin-top: 0px; margin-bottom: 20px; font-size: 0.9rem; display: block; break-inside: avoid; text-align: left; white-space: normal; background: rgb(51, 51, 51); position: relative !important; padding: 10px 10px 10px 30px; width: inherit; color: rgb(184, 191, 198); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:password@ipAddress:portNumber/vhost");
Connection conn = factory.newConnection();
//Connection接口被用来创建一个Channel:
Channel channel = conn.createChannel();
//在创建之后,Channel可以用来发送或者接收消息了。</pre>
2.2 消息的发送和消费
2.2.1 消息的发送
- RabbitMQ 原生API:
public interface Channel extends ShutdownNotifier {
...
public void basicPublish(String exchange,
String routingKey,
boolean mandatory,
BasicProperties props, byte[] body) throws IOException
....
}
2.2.2 消息的消费
2.2.2.1 推模式消费
- RabbitMQ 原生API:
public interface Channel extends ShutdownNotifier {
...
public void basicPublish(String exchange,
String routingKey,
boolean mandatory,
BasicProperties props, byte[] body) throws IOException
....
}
2.2.2.2 拉模式消费
- RabbitMQ 原生API:
public interface Channel extends ShutdownNotifier {
....
public GetResponse basicGet(String queue, boolean autoAck) throws IOException;
....
}
2.3 生产者保障消息到达Broker
2.3.1 消息发送失败处理机制
上一节的内容介绍了,如果消息在Broker中没有投递目标,如何退回或者进入备份交换机,但是如果消息压根没有进入在Broker,消息在发送到Broker的过程中就丢失了,如何处理这种情况?
RabbitMQ提供了两种解决方案
2.3.1.1 事务机制
事务机制可以在生产者将消息提交到RabbitMQ失败的时候,抛出异常,可以让生产者处理发送失败的消息,而不至于直接丢了消息
.....
try {
channel.txSelect(); // 声明事务
// 发送消息
channel.basicPublish("", _queueName,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
channel.txCommit(); // 提交事务
} catch (Exception e) {
channel.txRollback();
//在这里处理发送失败的消息(记录发送失败的消息或重新发送)
} finally {
channel.close();
conn.close();
}
.....
2.3.1.2 发送确认机制
发送确认机制和事务机制相似,但更为轻量,因为确认消息是异步返回的,第一条消息在等确认消息时,第二条消息可以继续发送,效率更高
无论失败还是成功发送,Broker都会返回确认消息
Broker不能保证返回的时间
-
如果消息开启持久化,消息会在刷入磁盘后再返回确认消息
// 开启发送方确认模式
channel.confirmSelect();
String message = "test";
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
if (channel.waitForConfirms()) {
System.out.println("消息发送成功" );
}
2.3.1.3 发送确认和事务的弊端
事务性能低下,会拉低MQ整体QPS
确认机制异步返回,会提升客户端性能,缩短发送消息时间,Broker则还是串行同步等待,没有性能提高
为了提高客户端性能同时提高Broker性能,只能
批量确认
或异步确认
批量确认,若遇到发送失败的情况,需要批量重发,会拉低QPS
//异步确认demo演示
public void AsyncSendConfirmDemo() {
//开启消息确认
channel.confirmSelect();
//消息发送后,无论成功失败,Broker 一定会返回结果
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple)
throws IOException {
//删除本地该条发送记录
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag boolean multiple) throws IOException {
//删除本地该条发送记录
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
//注意这里处理消总重发的场景
// 譬如:重发三次后,记录失败消息,停止重发
}
});
//while(true)模拟一直发送消息的场景
while (true) {
//从信道获取发送消息的序号
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
ConfirmConfig.msg_10B.getBytes());
//将消息序号放入集合中保存
confirmSet.add(nextSeqNo);
}
}
- 事务机制, 普通发送确认, 批量确认, 异步确认 QPS对比
2.3.2 消息投递失败处理机制
-
投递失败的情况
没有消费者
消息转发器没有绑定队列
2.3.2.1 消息的退回
basicPublish
api中的参数 mandatory
设置为 true
时,交换机若无法根据自己类型和路由键找到相应的队列时,
消息就会被退回到生产者设置的回调里
这一过程并非RabbitMQ回调了生产者,而是AMQP协议,也就是发送消息的一个部分
- RabbitMQ 原生API:
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("replyText:"+replyText + "exchange:"+exchange +
"routingKey:"+routingKey + "message:"+new String(body));
}
});
2.3.2.2 备份交换机
消息退回需要在代码中添加退回消息监听的逻辑,如果不想侵入代码,可以使用备份交换机(alternate-exchange),将没有投递对象的消息,转发到备份交换机中
备份交换机和普通交换机没有什么不同
如果备份交换机不存在,没有绑定队列,没有匹配的队列,客户端和Broker都不会报错,消息会丢失
//创建主交换机,并指定myAe交换机为其备份交换机
Map<string, object> args = new HashMap<>();
args.Add("alternate-exchage", "myAe");
channel.ExchangeDeclare(EXCHANGE_NAME, "topic", true, false, args);
//创建主队列和主交换机绑定
channel.QueueDeclare(QUEUE_NAME, true, false, false, null);
channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, BINDING_KEY, null);
//创建备份交换机和备份队列
channel.ExchangeDeclare("myAe", "fanout", true, false, null);
channel.QueueDeclare("unroutedQueue", true, false, false, null);
channel.QueueBind("unroutedQueue", "myAe", null);
2.4 消费者保障消息被正确消费
2.4.1 消费者消费确认-ACK
收到消费者客户端确认再删除当前消息
2.4.1.1 确认消息(acknowledgement)
消费者在订阅队列时,可以指定autoACK参数,当参数为false,Broker会等待消费者调用下方的basicAck方法,显示的返回应答后再删除队列里的这条消息,反之再推送消息给消费者后,会自动从队列删除推送的消息
若Broker没有等到消费者返回确认或拒绝消息,Broker会认为当前消费者已经宕机,会将该消费者状态设置为不可消费状态,并不在向该消费者推送消息,直到该消费者重启后重新上线
public interface Channel extends ShutdownNotifier {
....
//确认消费成功
public void basicAck(long deliveryTag, boolean multiple) throws IOException;
....
}
2.4.1.2 拒绝消息(reject)
- 消费者在消费时,可以调用basicReject或basicNack拒绝一条或多条消息,被拒绝的消息,如果设置了参数requeue=true,则会重新回到队列中,发给下一个消费者,反之则会直接从队列中移除,调用basicRecover方法,消息重新加入队列,并会分配给和之前不同的消费者
public interface Channel extends ShutdownNotifier {
....
//拒绝消息
public void basicReject(long deliveryTag, boolean requeue) throws IOException;
//批量拒绝消息
public void basicNack(long deliveryTag,
boolean multiple, boolean requeue) throws IOException;
....
}
//重新加入队列,并分配给和之前不同的消费者
public RecoverOk basicRecover() throws IOException;
2.4.1.3 分布式ID与重复消息
-
开源解决方案
美团leaf
百度uid-generator
滴滴tinyid
-
分布式ID实现
雪花算法
基于mysql自增主键或计REDIS计数API
UUID
雪花算法:
3. 高可用
3.1 持久化
持久化是指,将发送到Broker中的消息,保存到磁盘上,即使Broker关机或宕机,重启后,消息数据依然存在,,RabbitMQ持久化默认是关闭的,需要通过设置开启
3.1.1 持久化的实现
-
交换器持久化
声明交换器时指定参数durable = true
-
队列持久化
声明队列时指定参数为durable = true
-
消息持久化
BasicProperties中deliveryMode=2 时,消息会被持久化,如果消息开启了持久化,发送确认机制会在消息刷盘后,再返回
//创建消息,将消息内容转为json 设置成消息体
Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
// 设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
发送该消息
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
rabbitTemplate.convertAndSend(exchageName, routingKey, message);
队列不持久化,消息就没办法持久化
消息全部持久化会影响Broker的QPS,选择重要的业务消息持久化
3.1.2 持久化的数据安全
- 持久化不能保证数据安全,如果消息在刷盘之前Broker宕机,消息将丢失
3.2 集群
3.2.1 什么是集群
集群就是多个Broker组成的一个应用架构,但是队列和消息只存放在一台机器上
-
什么是元数据
队列元数据(队列的名称和属性),队列宿主的信息,指向队列宿主的指针
交换器的元数据(交换器的名称及属性)
绑定关系元数据(交换器和队列,交换器和交换器之间的绑定关系)
vhost元数据(为其中的队列,交换器,绑定关系提供命名空间及安全属性)
-
集群的存储的数据
-
所有节点上的数据
- 元数据
-
队列宿主节点上的数据
- 队列数据和消息数据
-
-
为什么队列只存在一台机器上
- 存储空间。如果每个集群节点都拥有所有Queue的完全数据拷贝,集群内冗余数据会非常多
性能。需要将消息复制到每一个集群节点,对于持久化消息,网络和磁盘同步复制的开销都会明显增加。
-
集群的节点工作模式
集群的节点分为硬盘节点(disc节点)和内存节点
集群中至少需要一个硬盘节点,单机的RabbitMQ只有硬盘节点
如果集群中的磁盘节点全部故障,只能基于现有的交换器,队列发送消息,什么都不能操作,直到磁盘节点恢复
-
集群可以容灾吗
集群故障可以容忍单节点的故障,但是如果是队列的宿主节点故障,消息将会丢失
3.2.2 镜像队列
镜像队列会将集群中的队列宿主上队列镜像到其他节点上,当队列宿主故障时,镜像队列节点会重新选举出新的队列宿主机器,来确保集群队列的数据安全
3.2.4 集群的负载均衡
集群每个节点都可以连接客户端,但是访问的请求并不是平分到每一个节点上,所以需要单独的负载均衡来将流量平分到每一台节点上
集群的每个节点,客户端都可以连接,多个节点之间并不会将访问的压力进行负载均衡
-
常见集群负载均衡方案
Haproxy 做集群前置负载均衡
Haproxy + Keepalived
LVS+ Keepalived
4. 常用特性
4.1 死信队列
死信交换机(DLX Dead-Letter-Exchange),本质是普通的交换机,当消息在一个队列中变成死信后,将会被转发到另一个交换机,这个交换机就是死信交换机,与死信交换机绑定的队列,就是死信队列,当消息变成死信需要满足以下任意条件:
消息被拒绝 (channel.basicReject或channel.basicNack拒绝),并设置了requeue参数为false
消息过期
队列达最大长度
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//指定死信发送的Exchange
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
4.2 延时消息
RabbitMQ没有延时消息功能
通过设置队列过期时间,让过期的消息,进入死信队列可以实现延时消息的效果
//定义队列时,设置队列失效时间
//设置死信队列见上一小节
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);
4.3 RPC实现
在发送消息时,指定消息的唯一ID和回调队列,消费者消费后,会向回调队列发送包含该ID的消费回复
//创建一个回调队列
String callbackQueueName = channel.queueDeclare.getQueue();
//为消息创建全局唯一ID
String corrID = UUID.randomUUID.toString();
//在基础属性中指定回调队列名称,和唯一ID
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName)
.correlationID(corrID).build();
channel.basicPublish ("","rpc_queue" ,props,message.getBytes());
5. 生产中常见问题解决思路
5.1 消息积压
-
设置消费者最大积压消息数
//平均分发消息给消费者,均衡利用资源
channel.basicQos(2);
-
增加消费者数量
//通过更多的消费者数量来减少消费积压
-
临时增加拉模式消费,将消息转发到其他RabbitMQ集群中
//同样增加了消费者的数量,但是侵入式代码改动较多
channel.basicGet();
-
shovel转移积压消息到其他集群
//多个集群分散压力减少积压消费
-
调整积压消息队列优先级
//将资源优先处理容易积压消息的队列
//声明队列时指定队列优先级
Map<String,Object> args = new HAshMap<String,Object>();
args.put("x-max-priority",10);
channel.queue.Declare("queue.priority",true,false,false,args);
////发送消息时,指定消息优先级
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties-Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish ("exchange一priority","rk一priority",
properties,("messages").getBytes());
//持续增长的消息积压会耗尽硬件资源,触发内存磁盘预警和流控,拒绝掉一些请求,优先保障RabbitMQ存活下来,隔绝系统之间层与层的压力,熔断流量,保证系统整体可用
5.2 QPS低
不使用事务机制
使用异步的消息发送确认
避免持久化过多没有必要的消息
对于持久化队列,开启惰性队列属性,优先将队列的数据存放在磁盘上,必要时再加载到内存
开启Erlang的HiPE功能
-
单个队列有性能瓶颈,将所有对Broker的操作封装成多个队列
//经常触发流控机制,拒绝掉一些请求,Connection和Channel经常处于flow状态,但是队列处于running状态,拼命的写
5.3 实现顺序消息
RabbitMQ不能保证顺序消费
-
通过GUID(Globally Unique Identifier)类似概念,在业务系统做好去重
- 生成分布式ID作为消息ID,将消息消费的记录保存,每次消费前,判断当前消息ID是否已消费