RabbitMQ-消息可靠性&延迟消息

前置文章:
RabbitMQ-基础使用(Spring AMQP)

零、本文纲要

  • 一、MQ常见问题
  • 二、消息可靠性
    1、消息丢失可能发生的节点
    2、生产者确认机制
    3、消息持久化
    4、消费者确认消息
    5、失败重试机制
  • 三、死信交换机
    1、死信
    2、死信交换机
    3、TTL
    4、死信交换机&TTL代码实现
  • 四、延迟消息
    1、延迟队列
    2、应用场景
    3、延迟队列插件安装
    4、延迟队列代码实现
    5、修改延时消息报异常的处理逻辑
  • 补充:Docker安装RabbitMQ(挂载插件数据卷)

tips:Ctrl + F快速定位所需内容阅读吧。

一、MQ常见问题

  • ① 消息可靠性

确保发送的消息至少被消费一次;

  • ② 延迟消息

实现消息的延迟投递;

  • ③ 消息堆积

处理消息无法及时消费的问题;

  • ④ 高可用

避免单点MQ故障导致整体不可用;

二、消息可靠性

1、消息丢失可能发生的节点

消息丢失.png
  • ① 发送时丢失

Ⅰ 生产者发送的消息未送达exchange;
Ⅱ 消息到达exchange后未到达queue。

  • ② MQ宕机,queue将消息丢失
  • ③ consumer接收到消息后未消费就宕机

2、生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

  • publisher-confirm,发送者确认
    Ⅰ 消息成功投递到交换机,返回ack;
    Ⅱ 消息未投递到交换机,返回nack。
  • publisher-return,发送者回执
    消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。

  • ① application.yml配置

publish-confirm-type:开启publisher-confirm,这里支持两种类型:
simple:同步等待confirm结果,直到超时;
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback;
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback;
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息。

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-returns: true # 开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    template:
      mandatory: true # 定义消息路由失败时的策略。true:则调用ReturnCallback;false:则直接丢弃消息。
  • ② 给RabbitTemplate配置ReturnCallback

注意:每个RabbitTemplate只能配置一个ReturnCallback。

Spring的bean默认为单例,让CommonConfig实现ApplicationContextAware接口,就是为了在Spring准备好容器后给rabbitTemplate对象设置ReturnCallback。

@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware { // Spring容器准备好后通知该类
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。
        rabbitTemplate.setReturnCallback((message, replyCode, replyTest, exchange, routingKey) -> {
            // 记录日志
            log.info("发送消息到队列失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode, replyTest, exchange, routingKey, message.toString());
            // 重发消息代码
        });
    }
}

另外,此处省略了重发消息的代码实现,具体可以根据业务需求编写。

  • ③ 代码实现ConfirmCallback

Ⅰ 获取CorrelationData对象,设置全局唯一ID,区分不同消息;
Ⅱ 设置ConfirmCallback的函数实现,消息发送到MQ成功和异常的处理函数;
注意:此处省略了重发消息的代码实现,具体可以根据业务需求编写。

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 路由键
    String routingKey = "simple";
    // 消息体
    String message = "hello, spring amqp!";
    // 消息ID,封装到
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 添加callback
    correlationData.getFuture().addCallback(
            result -> {
                // 判断结果
                if (result.isAck()){
                    // ack,消息发送成功
                    log.debug("消息发送到交换机成功,ID:{}", correlationData.getId());
                }else {
                    // nack,消息发送失败
                    log.error("消息发送到交换机失败,ID:{},原因{}", correlationData.getId(), result.getReason());
                    // 重发消息
                }
            },
            ex -> {
                // 记录日志
                log.error("消息发送异常,ID:{},原因{}", correlationData.getId(), ex.getMessage());
                // 重发消息
            }
    );
    // 发送消息,此处记得添加correlationData
    rabbitTemplate.convertAndSend("amq.topic", routingKey, message, correlationData);
}
  • ④ 处理消息确认的情形

Ⅰ publisher-comfirm:
消息成功发送到exchange,返回ack;
消息发送失败,没有到达交换机,返回nack;
消息发送过程中出现异常,没有收到回执。

Ⅱ 消息成功发送到exchange,但没有路由到queue,调用ReturnCallback。

3、消息持久化

  • ① 交换机持久化
@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.exchange", true, false);
}

注意:其实直接new也是持久化,默认走如下方法:

默认也是持久化.png
  • ② 消息队列持久化
@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable持久化
    return QueueBuilder.durable("simple.queue").build();
}

注意:直接new Queue("simple.queue");也是持久化的,如下:

消息队列持久化.png
  • ③ 消息体持久化
@Test
public void testDurableMsg(){
    // 路由键
    String routingKey = "simple";
    // 消息体
    String message = "Hello, durable.";
    // 消息持久化
    Message msg = MessageBuilder
            .withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
            .build();
    // 发送消息
    rabbitTemplate.convertAndSend("simple.exchange", routingKey, msg);
}

注意:直接发送普通消息,默认也是持久化的,如下:

public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;

4、消费者确认消息

  • ① 消息确认模式

Ⅰ manual:手动ack,需要在业务代码结束后,调用api发送ack;
Ⅱ auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack;
Ⅲ none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。

  • ② 配置文件配置
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack

5、失败重试机制

  • ① 默认重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环。

  • ② Spring的retry机制

Spring机制重试次数耗尽后,消息会被reject,丢弃。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        retry:
          enabled: true # 开启消费失败重试
          initial-interval: 1000 # 初始失败等待时间为1秒
          multiplier: 3 # 下次失败等待时间的倍数,下次等待时长last*multiplier
          max-attempts: 3 # 最大重试次数
          stateless: true # 无状态;false则为有状态。业务中包含事务则改为false。
  • ③ 失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要由MessageRecoverer接口来处理。

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认就是这种方式;
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队;
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。

覆盖原有策略实现:
Ⅰ 自定义异常消息交换机、队列、绑定:

@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}

@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}

@Bean
public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}

Ⅱ 覆盖原有策略实现:

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}

三、死信交换机

1、死信

  • ① 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false;
  • ② 消息是一个过期消息,超时无人消费;
  • ③ 要投递的队列消息堆积满了,最早的消息可能成为死信。

2、死信交换机

如果队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey。

3、TTL

TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信。

TTL的两种情形:
Ⅰ 消息所在的队列设置了存活时间;
消息本身设置了存活时间。

4、死信交换机&TTL代码实现

  • ① 使用注解的形式声明一组死信交换机、队列,并绑定,如下:
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "dl.queue", durable = "true"),
        exchange = @Exchange(name = "dl.direct"),
        key = "dl"
))
public void listenDlQueue(String msg){
    log.info("接收到dl.queue的延迟消息:{}", msg);
}
  • ② 给队列设置超时时间,在声明队列时配置x-message-ttl属性:
@Bean
public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
}

@Bean
public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化
            .ttl(10000) // 设置队列的超时时间为10s
            .deadLetterExchange("dl.direct") // 指定死信交换机
            .deadLetterRoutingKey("dl") // 指定死信RoutingKey
            .build();
}

@Bean
public Binding ttlBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("tl");
}
  • ③ 给TTL队列发送消息
@Test
public void testTTLMsg(){
    // 路由键
    String routingKey = "tl";
    // 消息体
    String message = "Hello, TTL.";
    // 消息持久化
    Message msg = MessageBuilder
            .withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
            .setExpiration("5000") // 5s
            .build();
    // 发送消息
    rabbitTemplate.convertAndSend("ttl.direct", routingKey, msg);
}

注意:此处我们设置了queue的超时时间,以及msg的超时时间,最后MQ会以其中较短的时间来实现

四、延迟消息

1、延迟队列

利用TTL结合死信交换机,实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

2、应用场景

  • ① 延迟发送短信;
  • ② 用户下单,如果用户在15 分钟内未支付,则自动取消;
  • ③ 预约工作会议,20分钟后自动通知所有参会人员。

3、延迟队列插件安装

  • ① TTL+死信队列

详见死信交换机章节内容。

  • ② 延迟队列插件

插件安装指南:Scheduling Messages with RabbitMQ
官方插件社区:Community Plugins — RabbitMQ
插件下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

Ⅰ 下载插件
Ⅱ 将插件上传到数据卷目录/var/lib/docker/volumes/mq-plugins/_data
数据卷地址查看指令:docker volume inspect mq-plugins
Ⅲ 安装插件
进入容器内部:docker exec -it mq bash
启动插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

启动插件.png

4、延迟队列代码实现

  • ① 基于@RabbitListener的实现

DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayQueue(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}
  • ② 基于Java代码的实现

delayed() // 设置delay属性为true

@Bean
public DirectExchange delayedExchange(){
    return ExchangeBuilder
            .directExchange("delay.direct") // 指定交换机类型及名称
            .delayed() // 设置delay属性为true
            .durable(true) // 持久化
            .build();
}

@Bean
public Queue delayedQueue(){
    return new Queue("delay.queue");
}

@Bean
public Binding delayedBinding(){
    return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
}
  • ③ 向延迟队列发送消息
@Test
public void testDelayedMsg(){
    // 路由键
    String routingKey = "delay";
    // 消息体
    String message = "Hello, Delay.";
    // 消息延迟设置
    Message msg = MessageBuilder
            .withBody(message.getBytes(StandardCharsets.UTF_8)) //消息体
            .setHeader("x-delay", 10000)
            .build();
    // 发送消息
    rabbitTemplate.convertAndSend("delay.direct", routingKey, msg);
}
向延迟队列发送消息.png

5、修改延时消息报异常的处理逻辑

  • ① ReturnCallback处理逻辑

添加延时时间属性值的判断,该属性大于0则是延迟消息,不报错误提示。

// 判断是否是延迟消息
if (message.getMessageProperties().getReceivedDelay() > 0) {
    // 是一个延迟消息,忽略这个错误提示
    return;
}
  • ② 测试
修改延时消息报异常的处理逻辑.png
  • ③ ReturnCallback完整代码
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware { // Spring容器准备好后通知该类

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback:每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置。
        rabbitTemplate.setReturnCallback((message, replyCode, replyTest, exchange, routingKey) -> {
            // 判断是否是延迟消息
            if (message.getMessageProperties().getReceivedDelay() > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            // 记录日志
            log.info("发送消息到队列失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode, replyTest, exchange, routingKey, message.toString());
            // 重发消息代码
        });
    }

}

补充:Docker安装RabbitMQ(挂载插件数据卷)

  • ① 拉取镜像
docker pull rabbitmq:3.8-management
  • ② 运行容器

-e RABBITMQ_DEFAULT_USER=test设置用户名为test;
-e RABBITMQ_DEFAULT_PASS=123321设置密码为123456;
-v mq-plugins:/plugins挂载数据卷;
--name mq容器名mq;
--hostname mq1主机名mq1;
-p 15672:15672管理界面端口(此处前面的端口是我们设置的,后面的是需要被映射的,下同);
-p 5672:5672MQ端口(内部使用);

docker run \
 -e RABBITMQ_DEFAULT_USER=test \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

五、结尾

以上即为RabbitMQ-消息可靠性&延迟消息的全部内容,感谢阅读。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容