一.相关概念
1.1 RabbitMQ整体架构模型
1.2 核心概念
1.2.1 生产者和消费者
producer 生产消息的一方,生产者生产消息一般包含两个部分消息体和标签
customer 消费消息的一方,链接到RabbitMQ,并订阅到队列上,只消费消息体,消费标签在路由过程中将被丢弃 。
1.2.2 Broker
消息中间件的服务节点
1.2.3 队列
队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
1.2.4 交换器,路由键,绑定
Exchange
交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
RoutingKey
生产者将消息发给交换器的时候, 一般会指定一个 RoutingKey ,用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey)合使用才能最终生效。
Binding key
RabbitM 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一 绑定键 BindingKey ,这样 RabbitMQ 就知 何正确 将消息路由 队列了
说明:direct模式下,routing key和Binding key是同一个值,topic模式下不一定是同一个值(模糊匹配)
1.2.5 交换器类型
常用的交换器类型有fanout,direct, topic,headers 四种。
fanout
把发送到该交换器的消息分发到所有与该交换器绑定的队列中。广播模式
direct
把发送到该交换器的消息分发到routing key和Binding key完全匹配的队列中。
topic
把发送到该交换器的消息分发到routing key和Binding key匹配(通配符模糊匹配)的队列中
匹配规则
1.RoutingKey 为一个点号"."分隔的字符串(被点号"."分隔开的每段独立的字符串称为 个单词 ),如 "com.rabbitmq.client","java.util.coucurrent"
2.BindingKey与RoutingKey一样也是点号"."分隔的字符串;
3.BindingKey 中可以存在两种特殊字符串"*"和"#"用于做模糊匹配, *匹配一个单词,#匹配多个或0个单词
二.客户端
2.1 消息消费
2.1.1 推模式
2.1.2 拉模式
2.2 消费端的确认和拒绝
2.2.1 主动确认(默认)
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用
2.2.2 手动确认
消费后需要手动调用方法确认消息是否消费成功,如下方法
//第二个参数设置自动/手动确认,true表示自动应答,false表示手动应答
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
/*
*消费成功,可以删除消息
*deliveryTag:该消息的index
*multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;
/*
*消费失败
*deliveryTag:该消息的index
*multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息
*requeue:被拒绝的是否重新入队列 注意:如果设置为true ,则会添加在队列的末端
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
/*
* 绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
*/
void basicReject(deliveryTag, true);
三.RabbitMQ进阶
3.1 消息何去何从
mandatory immediate channel basicPublish 方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。 RabbitMQ 提供的备份交换器(Altemate Exchange) 可以将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定〉存储起来,而不用返回给客户端。
3.1.1 mandatory 参数
当mandatory 为ture时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ会调用 Basic.Return 命令将消息返回给生产者 。当 mandatory数设置为 false 时,出现上述情形,则消息直接被丢弃
3.1.2 immediate 参数
imrnediate 参数设为 true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配 所有队列都没有消费者时该消息会通过 Basic.Return 返回至生产者。
3.1.3 备份交换器
交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
//声明确认 Exchange 交换机的备份交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
ExchangeBuilder exchangeBuilder =
ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
//设置该交换机的备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
return (DirectExchange)exchangeBuilder.build();
}
对于备份交换器,总结了以下几种特殊情况:
1.如果设置的备份交换器不存在,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
2.如果备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
3.如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
4.如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效。
3.2 过期时间(TTL)
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
3.2.1 消息过期
rabbitTmplate.convertAndSend("exchange","routingKey",message,correlationData->{
correlationData.getMessageProperties().setExpiration(ttlTime)
return correlationData;
})
3.2.2 队列过期
args.put("x-message-ttl",5000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
两者的区别
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
3.3 死信队列
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列
3.3.1 死信来源
1.消息 TTL 过期
2.队列达到最大长度(队列满了,无法再添加数据到 mq 中)
3.消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
3.4 延时队列
3.4.1 延时队列概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 DLX和TTL 模拟出延迟队列的功能。
3.4.2 使用场景
1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
。。。。
3.5 优先级队列
优先级队列 顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
3.5.1 设置优先级
1.控制台添加
2.队列中添加
Map<String, Object> params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);
3.在消息中添加
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
注意
要让优先级队列生效需要设置队列最大优先值,在发送消息时设置消息的优先级
3.6 持久化
RabbitMQ的持久化分为 个部分:交换器的持久化、队列的持久化和消息的持久化
1.交换器持久化
交换器的持久化是通过在声明交换器时将 durable 参数置为 true 实现的。如果交换器不设置持久化,那么在 RabbitMQ 重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,建议将其置为持久化的。
2.队列持久化
队列的持久化是通过在声明队列时将 durable 参数置为 true 实现的,如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。正所谓"皮之不存,毛将焉附",队列都没有了,消息又能存在哪里呢?
3.消息持久化
通过将消息的投递模式(BasicPropert es 中的 deliveryMode 属性)设置为 即可实现消息的持久化
3.7 生产者确认
当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器 ,何谈持久化?
RabbitMQ 对这个问题,提供了两种解决方式:
1.通过事务机制实现:
2.通过发送方确认 publisher confirm 机制实现。
3.7.1 事务机制
1.客户端发送Tx.Select将信道置为事务模式;
2.Broker 回复Tx.Select-Ok确认己将信道置为事务模式:
3.在发送完消息之后,客户端发送 Tx.Commit 提交事务;
4.Broker回复Tx.Commit-Ok确认事务提交
3.7.2 发送方确认机制
发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法
Channel channel=connection.createChannel();
channel.confirmSelect();
生产者将信道设置成 confirm(确认)模式,一旦信道进入 confirm模式,所有在该信道上面发布的消息都会被指派 个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 会发送一个确认(Basic.Ack) 给生产者(包含消息的唯一 ID) ,这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。 RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号此外 RabbitMQ 可以设置 channel basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都己经得到了处理
3.7.3 异步确认发送
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的
代码实现
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目 只要给到序列号
* 3.支持并发访问
*/
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
/**
* 确认收到消息的一个回调
* 1.消息序列号
* 2.true 可以确认小于等于当前序列号的消息
* false 确认当前序列号消息
*/
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear();
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
/**
* 添加一个异步确认的监听器
* 1.确认收到消息的回调
* 2.未收到消息的回调
*/
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 通过序列号与消息体进行一个关联
* 全部都是未确认的消息体
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) + "ms");
}
}
3.8 RabbitMQ集群
最开始我们介绍了如何安装及运行 RabbitMQ 服务,不过这些是单机版的,无法满足目前真实应用的
要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ
服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞
吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是
解决实际问题的关键
3.8.1 集群搭建
#1.修改 3 台机器的主机名称
vim /etc/hostname
#2.配置各个节点的 hosts 文件,让各个节点都能互相识别对方
vim /etc/hosts
10.211.55.74 node1
10.211.55.75 node2
10.211.55.76 node3
#3.以确保各个节点的 cookie 文件使用的是同一个值在 node1 上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
#4.启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以
#下命令)
rabbitmq-server -detached
#5.在节点 2 执行rabbitmqctl stop_app(rabbitmqctl stop 会将 Erlang 虚拟机关闭,rabbitmqctl stop_app 只
#关闭 RabbitMQ 服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
#只启动应用服务
rabbitmqctl start_app
#6.在节点 3 执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app
#7.集群状态
rabbitmqctl cluster_status
#8.需要重新设置用户
#创建账号
rabbitmqctl add_user admin 123
#设置用户角色
rabbitmqctl set_user_tags admin administrator
#设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
3.8.2 镜像队列
如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,
但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在
一个短暂却会产生问题的时间窗。通过 publisherconfirm 机制能够确保客户端知道哪些消息己经存入磁盘,
尽管如此,一般不希望遇到因单点故障导致的服务不可用
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中
的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。