springboot rabbitmq死信队列与延迟队列实战

在上一篇博文中提到,在消息确认消费的过程中,即消息处理过程中出现了异常,为避免消息重新归入队列又继续异常,也为了避免消息不归入队列而把消息丢弃掉,那么可以采用死信队列来处理该情况,当然这个也要结合实际场景,也不一定非要用死信队列,之前遇到过的场景就没采用死信队列,是这样的场景:同步订单后需要发送订单消息去处理,也没用死信队列,异常可以触发邮件告警,之后丢弃消息,而后处理完异常被丢弃的消息可以调用api触发再一次同步订单,故也没采用死信队列。

死信队列其实也是类似于普通的队列,有交换机、队列、路由等信息,只不过是叫做死信交换机和死信路由以及死信队列,相对特殊了一点,是在正常的队列中绑定了这个特殊的队列的交换机以及路由信息,这样一来正常的队列消息出现特殊的情况下(称为死信消息)可以把这个消息转向这个特殊的队列,即死信队列。

那么什么样的消息才会变成死信消息呢?

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
  • 消息过期;
  • 队列达到最大长度。
    对于第一点,可以针对消息消费过程中出现异常情况,把消息拒绝转向死信队列,对于第二点,可以利用起来变成延迟队列,其实延迟队列也是死信队列的另一种体现,场景是给消息设置时间,消息时间一到即变成死信消息,转向死信队列,故也称为延迟队列,延迟队列在实际生产环境中有更加广泛的应用。
一、死信队列
  • 死信队列流程图


    死信队列流程图.jpg
  • 死信队列代码:


    死信队列代码.png

(1)首先是创建正常的队列以及在这个队列中绑定特殊的队列的交换机以及路由信息,在rabbitmq中是通过在创建队列的过程中增加附加参数x-dead-letter-exchange、x-dead-letter-routing-key,这两个参数是死信交换机和死信路由,我们知道在directExchange交换机类型中,交换机和路由可以为消息指引到队列,那么有了这两个参数,就可以绑定死信队列了。

  • RabbitmqConfig配置类,创建两个队列,一个正常的普通队列directDeadPreQueue,以及死信队列deadQueue,普通队列创建的时候增加额外信息死信交换机x-dead-letter-exchange和死信路由x-dead-letter-routing-key。
@Slf4j
@Configuration
public class RabbitmqConfig {
    //演示死信队列,为directExchange消息模型队列绑定死信队列
    @Bean
    public Queue directDeadPreQueue() {
        //创建死信队列的组成成分map,用于存放组成成分的相关成员
        Map<String, Object> args = new <String, Object>HashMap(2);
        //设死信交换机
        args.put("x-dead-letter-exchange", RabbitMqConstants.DEAD_EXCHANGE);
        //死信队列的路由
        args.put("x-dead-letter-routing-key", RabbitMqConstants.DEAD_ROUTING_KEY);
        return new Queue(RabbitMqConstants.DIRECT_QUEUE_DEAD_PRE, true, false, false, args);
    }
    //交换机
    @Bean
    public DirectExchange directDeadPreExchange() {
        return new DirectExchange(RabbitMqConstants.DIRECT_EXCHANGE_DEAD_PRE, true, false);
    }
    //交换机路由绑定队列
    @Bean
    public Binding directDeadPreBinding() {
        return BindingBuilder.bind(directDeadPreQueue()).to(directDeadPreExchange()).with(RabbitMqConstants.DIRECT_ROUTING_KEY_DEAD_PRE);
    }
    //死信队列
    @Bean
    public Queue deadQueue() {
        return new Queue(RabbitMqConstants.DEAD_QUEUE, true);
    }
    //死信交换机
    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(RabbitMqConstants.DEAD_EXCHANGE, true, false);
    }
    //路由交换机绑定死信队列
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(RabbitMqConstants.DEAD_ROUTING_KEY);
    }
}
  • RabbitMqConstants常量值类
@Data
public class RabbitMqConstants {
    //演示死信队列,为directExchange消息模型队列绑定死信队列
    public static final String DIRECT_QUEUE_DEAD_PRE = "mq.direct.queue.dead.pre";
    public static final String DIRECT_EXCHANGE_DEAD_PRE = "mq.direct.exchange.dead.pre";
    public static final String DIRECT_ROUTING_KEY_DEAD_PRE = "mq.direct.routing.key.dead.pre";
    //死信队列
    public static final String DEAD_QUEUE = "mq.dead.queue";
    public static final String DEAD_EXCHANGE = "mq.dead.exchange";
    public static final String DEAD_ROUTING_KEY = "mq.dead.routing.key";
}

(2)创建完队列之后,启动项目,访问http://127.0.0.1:15672/,查看rabbitmq管理后台,可以看到普通队列新增了DLX、DLK的特性,即设置了死信交换机和死信路由。


普通队列.png
普通队列详情.png

(3)普通队列-生产者-OrdinaryPublisher

public class OrdinaryPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMsg(Order order) {
        try {
            //设置交换机、路由键,发送消息
            rabbitTemplate.convertAndSend(RabbitMqConstants.DIRECT_EXCHANGE_DEAD_PRE, RabbitMqConstants.DIRECT_ROUTING_KEY_DEAD_PRE, order);
            log.info("普通队列-生产者,发送消息:{}", order);
        } catch (Exception e) {
            log.error("普通队列-生产者,发送消息异常,消息:{},异常:", order, e);
        }
    }
}

(4)普通队列-消费者-OrdinaryConsumer,在监听到消息并且处理消息过程中故意抛除以0的异常,这样一来消息就被拒绝了。

public class OrdinaryConsumer {
    @RabbitListener(queues = RabbitMqConstants.DIRECT_QUEUE_DEAD_PRE, containerFactory = "singleListenerContainerManual")
    public void consumeMsg(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
        try {
            log.info("普通队列-消费者,监听到消息:{},准备处理业务逻辑。", order);
            int i = 1 / 0;
            channel.basicAck(tag, false);
        } catch (Exception e) {
            log.error("普通队列-消费者,监听到消息:{},发生异常,消息不再归入队列中,转向死信队列,异常e:", order, e);
            //channel.basicNack(tag, false, false);
            channel.basicReject(tag, false);
        }
    }
}

(5)执行test方法

    @Test
    public void testDeadPrePublish() {
        Order order = new Order();
        order.setOrdernum("1234567");
        ordinaryPublisher.sendMsg(order);
    }
test方法.png
test结果.png

这个时候由于我们的代码注释掉了监听死信队列,故打开rabbitmq管理后台可以看到死信队列中存着死信消息:


死信队列.png
死信队列详情.png

可以有两种种方式来解决这个问题,当然问题的根本是把异常处理完,然后消息重新消费。我们假设现在已经把1/0这个异常代码修复了即注释掉了。

  • 方式一,在rabbitmq管理后台取出该消息并且在rabbitmq管理后台发送该消息到普通队列中
    手工取出异常消息:{"ordernum":"1234567"}


    手工取出异常消息.png

    发送异常消息到原先的普通队列mq.direct.queue.dead.pre进行再次消费


    发送异常消息到原先的普通队列.png
  • 方式二,在代码中监听死信队列
    在代码中监听该死信队列,动态配置一个配置值,标识是否修复了异常信息,如果修复了,那么发送消息到原先的普通队列进行消费,如果未修复,那么消息重新归入死信队列,直到修复了异常。
    监听死信队列代码DeadQueueConsumer:
public class DeadQueueConsumer {
    @Autowired
    private OrdinaryPublisher ordinaryPublisher;
    //为方便演示,写死在这里,实际可以用配置中心apollo或者阿里云naocs动态刷新该值,即修复bug之后刷新该值为true
    private Boolean dynamicRepairSign = false;
    //可以注释掉监听,在rabbitmq管理后台取出该消息,等到异常处理完之后把该消息丢回原先的队列进行处理。
    @RabbitListener(queues = RabbitMqConstants.DEAD_QUEUE, containerFactory = "singleListenerContainerManual")
    public void consumeMsg(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
        log.info("死信队列监听到消息:{}", order);
        if (dynamicRepairSign) {
            //修复完异常之后发送消息到原先队列进行消费
            ordinaryPublisher.sendMsg(order);
            channel.basicAck(tag, false);
        } else {
            channel.basicReject(tag, true);
        }
    }
}

这样的话也可以不用像方式一那样去rabbitmq管理后台取出死信队列中的消息,然后再把消息手工发送到原先的队列中,当然死信队列也不是一定要用到,要视实际场景而定。

二、延迟队列
  • 延迟队列流程图:


    延迟队列流程图.jpg
  • 延迟队列代码:


    延迟队列代码.png
  • 前面提到,延迟队列是死信队列的特殊情况,因为消息设置了TTL时间,消息过期了变成死信,继而可以利用该特性来做我们想做的事情,监听死信队列,在死信队列中对死信消息做业务处理,所以也称之为延迟队列。

  • 由于其特性,消息在某段时间过后进行消费处理,延迟队列在实际开发中应用很广泛,比如12306或者其他电商等平台,下单之后,30分钟内如果未付款,那么自动取消该订单并且释放相应的库存,类似于这样的场景,用延迟队列是个很好的选择,可能有人会说,那么也可以通过用定时器的方式来实现该功能,但是定时器是轮询地去查数据库,如果在订单量很大或者像12306这样,每隔一段时间去查询一次数据库,还有多少订单未付款并且到了30分钟,这样的话会给系统数据库造成很大的压力,有可能还会压垮系统奔溃掉。

  • 死信队列是在普通的队列中新增两个附加参数,即死信交换机和死信路由,那么延迟队列其实实现起来也很简单,由于消息过期不消费也会变成死信,那么在发送消息的时候设置消息过期时间,同时不对该普通队列进行监听消费,那么该消息不就一定会过期变成死信消息了,继而最后消息被转向了延迟队列中。

(1)首先是创建正常的队列以及在这个队列中绑定特殊的队列的交换机以及路由信息,像死信队列一样在创建队列的过程中增加附加参数x-dead-letter-exchange、x-dead-letter-routing-key,死信交换机和死信路由,另外为了实现延迟队列,需要再增加额外的参数,消息过期时间TTL,x-message-ttl参数,最后死信交换机和死信路由通过绑定关系绑定延迟队列,结合消息的TTL达到延迟的消费的作用。

  • RabbitmqConfig配置类创建普通队列和延迟队列,普通队列的消息设置了TTL时间为30s
@Slf4j
@Configuration
public class RabbitmqConfig {
    //延迟队列
    @Bean
    public Queue delayQueuePre() {
        //创建延迟队列的组成成分map,用于存放组成成分的相关成员
        Map<String, Object> args = new <String, Object>HashMap(16);
        //设置消息过期之后的死信交换机(真正消费的交换机)
        args.put("x-dead-letter-exchange", RabbitMqConstants.DELAY_EXCHANGE);
        //设置消息过期之后死信队列的路由(真正消费的路由)
        args.put("x-dead-letter-routing-key", RabbitMqConstants.DELAY_ROUTING_KEY);
        //设定消息的TTL,单位为ms,在这里指的是30s
        args.put("x-message-ttl", 30000);
        return new Queue(RabbitMqConstants.DELAY_QUEUE_PRE, true,false,false, args);
    }
    //直连传输directExchange消息模型-交换机
    @Bean
    public DirectExchange delayExchangePre() {
        return new DirectExchange(RabbitMqConstants.DELAY_EXCHANGE_PRE, true, false);
    }
    //直连传输directExchange消息模型-路由交换机绑定队列
    @Bean
    public Binding delayBindingPre() {
        return BindingBuilder.bind(delayQueuePre()).to(delayExchangePre()).with(RabbitMqConstants.DELAY_ROUTING_KEY_PRE);
    }
    //延迟队列(真正处理消息的队列)
    @Bean
    public Queue delayQueue() {
        return new Queue(RabbitMqConstants.DELAY_QUEUE, true);
    }
    //死信交换机(真正处理消息的交换机)
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(RabbitMqConstants.DELAY_EXCHANGE, true, false);
    }
    //死信交换机、死信路由绑定延迟队列
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(RabbitMqConstants.DELAY_ROUTING_KEY);
    }
}
  • RabbitMqConstants常量值
@Data
public class RabbitMqConstants {
    //演示延迟队列,为directExchange消息模型队列绑定延迟队列
    public static final String DELAY_QUEUE_PRE = "mq.direct.queue.delay.pre";
    public static final String DELAY_EXCHANGE_PRE = "mq.direct.exchange.delay.pre";
    public static final String DELAY_ROUTING_KEY_PRE = "mq.routing.key.delay.pre";
    //延迟队列
    public static final String DELAY_QUEUE = "mq.delay.queue";
    public static final String DELAY_EXCHANGE = "mq.delay.exchange";
    public static final String DELAY_ROUTING_KEY = "mq.delay.routing.key";
}

(2)创建完队列之后,启动项目,访问http://127.0.0.1:15672/,查看rabbitmq管理后台

普通队列、延迟队列.png
普通队列详情绑定延迟队列.png
延迟队列详情.png

(3)发送消息到普通队列中,并且不对该队列进行监听消费消息,让该消息自动达到过期时间转向延迟队列中

  • 普通队列-生产者-DelayQueuePrePublisher
public class DelayQueuePrePublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发送消息
     */
    public void sendMsg(Order order) {
        try {
            //设置延迟队列交换机、延迟队列路由键,消息实体并且发送消息
            rabbitTemplate.convertAndSend(RabbitMqConstants.DELAY_EXCHANGE_PRE, RabbitMqConstants.DELAY_ROUTING_KEY_PRE, order);
            log.info("延迟队列消息发送成功,消息:{},发送时间:{}", order, LocalDateTime.now());
        } catch (Exception e) {
            log.error("延迟队列消息发送异常,消息:{},异常e:", order, e);
        }
    }
}

(4)监听延迟队列,当普通队列中的消息30秒过期了之后变成死信消息,会转向被该队列监听到

  • 延迟队列-消费者-DelayQueueConsumer
public class DelayQueueConsumer {
    @RabbitListener(queues = RabbitMqConstants.DELAY_QUEUE, containerFactory = "singleListenerContainerAuto")
    public void consumeMsg(Order order) {
        try {
            log.info("延迟队列-30s时间到达后,真正消费消息的队列,监听消息:{},当前时间:{}", order, LocalDateTime.now());
        } catch (Exception e) {
            log.error("延迟队列-30s时间到达后,真正消费消息的队列,监听消息:{},处理发生异常e:", order, e);
        }
    }
}

(5)运行test方法:

    @Test
    public void testDelayPublish() {
        Order order = new Order();
        order.setOrdernum("1234567");
        delayQueuePrePublisher.sendMsg(order);
    }

从打印日志可以看出,发送消息时间是:2021-03-02 01:25:14.976,消费消息时间是:2021-03-02 01:25:44.992,相差是30秒(4毫秒误差就不计啦),从而实现了延迟队列的功能。


发送消息时间.png
消费消息时间.png

参考资料:
《rabbitmq实战指南》
《分布式中间件实战》

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容