RabbitMQ

削峰填谷、异步解耦

模型 、重要概念

rabbitMq模型

producer ->channel -> exchange ->queue ->channel -> consumer

Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费者
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
channel : 连接通道

生产者发送消息流程:

1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)

消费者接收消息流程:

1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、ack回复

避免消息重复消费

两种方式:消息全局 ID 或者写个唯一标识(如时间戳、UUID 等) :
1.每次消费消息之前根据消息 id 去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局 ID 作为数据库表的主键,防止重复)
2.或者利用 Redis 的 setnx 命令:给消息分配一个全局 ID,消费该消息时,先去 Redis 中查询有没消费记录,无则以键值对形式写入 Redis ,有则不消费该消息。(有可能redis不稳定会出现数据丢失问题)

消息丢失及处理方案

1、producer生产者丢失消息
生产者发送消息由于网络等原因并没有发送到RabbitMq
解决方案:
开启 confirm 模式
开启事务模式
2、broker消息中间件自身丢失消息
RabbitMq收到生产者的消息后还没有来得及持久化到磁盘,又或者创建队列没有持久化以及消息并没有设置为持久化,在Mq故障宕机后都会有消息丢失的情况
解决方案:
1.创建队列queue的时候设置队列持久化
2.mq配置deliveryMode == 2 消息持久化
重点:必须同时设置队列持久化和消息持久化,再结合生产者的confrim模式,才能保证消息准确投递到broker并保证进入磁盘。
3、consumer消费者丢失消息
消费者自动ack配置情况下,业务代码异常或者其他故障消息并没有处理完成也会自动ack。RabbitMq消息ack后就会丢弃,这就导致异常情况下的消息丢失了。
解决方案:
手动ack
关闭RabbitMq自动ack,业务代码成功消费了消息手动调用Mq ack,让Mq丢弃消息;如果业务代码异常则直接nack,让Mq重新推送消息进行处理。当然,在要求比较高的情况下也可以异常数据进入死信队列,保证数据的完整性。

死信队列

如果配置了死信队列,当消息消费发生异常时(下面),rabbitMQ会将消息投入死信队列中;如果没有配置,该消息将会被丢弃。

  • 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  • 消息在队列的存活时间超过设置的生存时间(TTL)时间。
  • 消息队列的消息数量已经超过最大队列长度。
    配置死信队列
    1.配置业务队列,绑定到业务交换机上
    2.为业务队列配置死信交换机和路由key
    3.为死信交换机配置死信队列
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容