RabbitMQ丢消息的解决方案

3种丢消息的场景

  • 发送消息到交换机或队列时,丢消息(设置2个回调)
  • 消息到MQ软件,MQ因宕机而要重启,丢消息(交换机、队列、消息的durable属性,要设置为持久化,Spring的RabbitTemplate默认就是将这3个都持久化的,一般不需要去改)
  • 消费者没有正常消费消息,丢消息(默认消费方是阅后即焚的,所以消息从队列出队给消费方后,队列中就没有这个消息了,消费方没有正常去消费,消息就丢失了)

发送消息到交换机或队列时,丢消息

  • 开启发送消息后的回调配置
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
  • 设置RabbitTemplate的2个回调

  • setConfirmCallback,消息发送给交换机后回调,成功时ack参数为true,失败则为false

  • setReturnCallback,消息从交换机投递给队列失败时,才会回调,所以要在这个回调时,记录日志

@Slf4j
@Configuration
// 实现ApplicationContextAware接口,可以从已有的spring上下文取得已实例化的bean
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate实例
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        
        // 设置confirm callback,投递消息到交换机成功或失败,都会回调此方法
        // 注:如果投递成功,方法的ack参数为true,失败则为false
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息投递完成,ack = {}, cause = {}", ack, cause);
            }
        });
        
        // 设置return callback,从交换机投递到队列失败时,才会回调该方法
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                // 记录日志
                log.info("消息投递失败,replyCode = {},replyText = {},exchange = {},routingKey = {},message = {}", replyCode, replyText, exchange, routingKey, message.toString());
            }
        });
    }
}

MQ软件重启,丢消息

  • 在Java代码中,创建交换机、队列时,就会设置为持久化,属性名为durable,在RabbitMQ的后台中看,有一个大写的D,就是设置了持久化的了,一般我们都会设置为持久化,保证MQ重启不丢消息

消费者没有正常消费消息,丢消息

默认消费者收到消息后,MQ就会将消息从队列中删除,也就是阅后即焚,我们需要设置MQ的确认模式,一般我们可以设置为auto自动或manual手动,以下以手动为例

手动模式

  • 在消费者的application.yml文件中,配置以下内容
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 确认模式有3种,manual、auto、none
  • 确认模式
    • manual:手动ack,需要在业务代码结束后,调用api发送ack。
    • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
    • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
    • 即,none是失败后什么都不处理,auto是类似事务机制,出现异常时返回nack,消息回滚到MQ,没有异常则返回ack,消息才从MQ中删除。manual是手动自己判断业务是否正常执行,成功则手动返回ack
@Component
@Slf4j
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(Message msg, Channel channel) throws Exception {
        System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
        // 模拟异常
        System.out.println(1 / 0);
        
        // 业务执行正常,才回复ack
        // 参数一:deliveryTag,也就是消息的标识,从msg中获取
        // 参数二:multiple,如果MQ是集群,true则是需要通知集群中的所有MQ
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
        
        log.debug("消息处理完成!");
    }
}

自动模式

  • 将消费者的确认模式,修改为auto
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 确定模式:auto,为自动ack
  • 重新运行消费者,会发现消费者消费出现异常,然后将消息归还给MQ,然后消费者的Listener监听队列又有消息,又从队列中拿出来消息,导致出现的无限死循环!!

重试次数

  • 因此不能无限重试,我们应该限制重试的次数,以及重试完毕后的失败策略(例如重试了3、5次后,还是失败,则将消息投递到一个特定的错误消息交换机,然后再投递到错误消息队列)

  • 配置spring的retry机制,当消费者消费出现异常时,进行本地重试,而不是无限制的requeue重新入队到MQ队列中,其中enabled属性为开启失败重试,max-attempts为最大重试次数

  • 本地重试:也就是消息消费过程中,出现异常,不会将消息requeue到队列,而是在消费者本地进行重试,就不会出现频繁requeue,给MQ造成不必要的压力

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初始的失败等待时长为1秒 2  4  8  16  32
          multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
  • 最后,重试达到最大次数后,Spring会返回ack,消息会被丢弃,所以我们还需要配置失败策略

失败策略

  • 默认,重试达到最大重试次数后,消息会丢失,这个是Spring的内部机制决定的,默认的失败处理策略是丢弃消息,我们可以配置策略的实现,策略接口为MessageRecoverer,有3种策略实现,分别是:

    • RejectAndDontRequeueRecoverer,到达最大重试次数后,直接reject,丢弃消息,默认就是这种
    • ImmediateRequeueMessageRecoverer,到达最大重试次数后,返回nack,消息重新requeue入队
    • RepublishMessageRecoverer,到达最大重试次数后,将失败消息投递到指定的交换机
  • 比较优雅的方式是选用RepublishMessageRecoverer,例如使用这种策略方式,当到达到最大重试次数后,将消息投递到一个错误消息交换机,然后交换机再投递到一个专门存放错误消息的错误队列,后续人工再集中处理

  • 在消费方,定义处理错误的交换机和队列

// 错误消息交换机
@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}

// 错误消息队列
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}

// 绑定错误交换机和错误消息队列
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder
    // 队列
    .bind(errorQueue)
    // 交换机
    .to(errorMessageExchange)
    // 设置routingKey
    .with("error");
}
  • 定义失败策略,指定错误消息交换机和routingKey
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
  • 此时,再次启动生产者,发送消息,消费者重试3次失败后,执行失败策略,将消息投递到了error错误队列,在RabiitMQ的控制台中,点击get message按钮,就能获取到消息内容,以及错误消息的堆栈

完整代码

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容