1.TTL(Time To Live)
1)消息的过期时间---设置方式
1.通过队列属性设置消息过期时间:所有队列中的消息超过时间未被消费时,都会过期。
2.设置单条消息的过期时间:在发送消息的时候指定消息属性。
如果同时指定了 Message TTL 和 Queue TTL,则小的那个时间生效。
2.死信队列
1)简单介绍
消息在某些情况下会变成死信(Dead Letter)。队列在创建的时候可以指定一个死信交换机 DLX(Dead Letter Exchange)。死信交换机绑定的队列被称为死信队列 DLQ(Dead Letter Queue),DLX 实际上也是普通的交换机,DLQ 也是普通的队列(例如替补球员也是普通球员)。
2)什么情况下消息会变成死信?
1.消息被消费者拒绝并且未设置重回队列:(NACK || Reject ) && requeue== false 2.消息过期 3.队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes(字节数),最先入队的消息会被发送到 DLX。
3)死信队列如何使用?
1.声明原交换机(GP_ORI_USE_EXCHANGE)、原队列(GP_ORI_USE_QUEUE),相互绑定。队列中的消息 10 秒钟过期,因为没有消费者,会变成死信。并指定原队列的死信交换机(GP_DEAD_LETTER_EXCHANGE)。
2.声 明 死 信 交 换 机 ( GP_DEAD_LETTER_EXCHANGE ) 、 死信队列 (GP_DEAD_LETTER_QUEUE),相互绑定
3.最终消费者监听死信队列。
4.生产者发送消息。
3.延迟队列
1)应用场景
我们在实际业务中有一些需要延时发送消息的场景,例如: 1.家里有一台智能热水器,需要在 30 分钟后启动2. 未付款的订单,15 分钟后关闭
2)实现方案
RabbitMQ 本身不支持延迟队列,总的来说有三种实现方案:1.先存储到数据库,用定时任务扫描 2.利用 RabbitMQ 的死信队列(Dead Letter Queue)实现 3.利用 rabbitmq-delayed-message-exchange 插件
3)方案2分析-TTL+DLX 的实现
1.步骤分析:基于消息 TTL,我们来看一下利用死信队列(DLQ)实现延迟队列的步骤:1)创建一个交换机2)创建一个队列,与上述交换机绑定,并且通过属性指定队列的死信交换机。3)创建一个死信交换机 4)创建一个死信队列4)将死信交换机绑定到死信队列 5)消费者监听死信队列
2.消息的流转流程:生产者——原交换机——原队列(超过 TTL 之后)——死信交换机——死信队列——最终消费者
3.使用死信队列实现延时消息的缺点:1) 如果统一用队列来设置消息的 TTL,当梯度非常多的情况下,比如 1 分钟,2分钟,5 分钟,10 分钟,20 分钟,30 分钟......需要创建很多交换机和队列来路由消息。 2)如果单独设置消息的 TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递(比如第一条消息过期 TTL 是 30min,第二条消息 TTL 是 10min。10 分钟后,即使第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递)。3)可能存在一定的时间误差。
4)方案3分析-基于延迟队列插件的实现(Linux)
1.插件的安装:省率
2.插件使用:
1)通过声明一个 x-delayed-message 类型的 Exchange 来使用 delayed-messaging特性。x-delayed-message 是插件提供的类型,并不是 rabbitmq 本身的(区别于 direct、topic、fanout、headers)。
2)生产者:消息属性中指定 x-delay 参数。
4.服务端流控
1)遇到的问题
当 RabbitMQ 生产 MQ 消息的速度远大于消费消息的速度时,会产生大量的消息堆积,占用系统资源,导致机器的性能下降。我们想要控制服务端接收的消息的数量,应该怎么做呢?
2)不能真正地实现服务端限流的方式-设置队列俩个控制长度的属性
3)方式-内存控制
1.RabbitMQ 会在启动时检测机器的物理内存数值。默认当 MQ 占用 40% 以上内存时,MQ 会主动抛出一个内存警告并阻塞所有连接(Connections)。可以通过修改rabbitmq.config 文件来调整内存阈值,默认值是 0.4
2.用命令动态设置,如果设置成 0,则所有的消息都不能发布。
5.消费端限流
1)消息过多导致问题:默认情况下,如果不进行配置,RabbitMQ 会尽可能快速地把队列中的消息发送到消费者。因为消费者会在本地缓存消息,如果消息数量过多,可能会导致 OOM 或者影响其他进程的正常运行。
2)消费端的流量限制的使用原因:在消费者处理消息的能力有限,例如消费者数量太少,或者单条消息的处理时间过长的情况下,如果我们希望在一定数量的消息消费完之前,不再推送消息过来,就要用到消费端的流量限制措施。
3)消费端的流量限制的方式:可以基于 Consumer 或者 channel 设置 prefetch count 的值,含义为 Consumer端的最大的 unacked messages 数目。当超过这个数值的消息未被确认,RabbitMQ 会停止投递新的消息给该消费者。
4)具体项目中使用:SimpleMessageListenerContainer---》container.setPrefetchCount(2); Spring Boot 配置:----》spring.rabbitmq.listener.simple.prefetch=2
5)举例:channel 的 prefetch count 设置为 5。当消费者有 5 条消息没有给 Broker 发送 ACK后,RabbitMQ 不再给这个消费者投递消息。