RabbitMQ死信队列在SpringBoot中的使用

死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃。

# 概念:

  • 消息会变成死信消息的场景:

    1. 消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝签收,并且重新入队为false。
      1.1 有一种场景需要注意下:消费者设置了自动ACK,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了nack/reject
    2. 消息过期,过了TTL存活时间。
    3. 队列设置了x-max-length最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。
  • 代码编写流程是:

    1. 有一个(n个)正常业务的Exchange,比如为user-exchange
    2. 有一个(n个)正常业务的Queue,比如为user-queue。(因为该队列需要绑定死信交换机,所以需要加俩参数:死信交换机:x-dead-letter-exchange,死信消息路由键:x-dead-letter-routing-key
    3. 进行正常业务的交换机和队列绑定。
    4. 定义一个死信交换机,比如为common-dead-letter-exchange
    5. 将正常业务的队列绑定到死信交换机(队列设置了x-dead-letter-exchange即会自动绑定)。
    6. 定义死信队列user-dead-letter-queue用于接收死信消息,绑定死信交换机。
  • 业务流程是:

    1. 正常业务消息被投递到正常业务的Exchange,该Exchange根据路由键将消息路由到绑定的正常队列。
    2. 正常业务队列中的消息变成了死信消息之后,会被自动投递到该队列绑定的死信交换机上(并带上配置的路由键,如果没有指定死信消息的路由键,则默认继承该消息在正常业务时设定的路由键)。
    3. 死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列。
    4. 消息到达死信队列后,可监听该死信队列,处理死信消息。
  • 死信交换机死信队列也是普通的交换机和队列,只不过是我们人为的将某个交换机和队列来处理死信消息。

  • 流程图

图片.png

# 代码实现

  1. 配置
spring:
  application:
    name: learn-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: deadletter-vh
    connection-timeout: 15000
    # 发送确认
    publisher-confirms: true
    # 路由失败回调
    publisher-returns: true
    template:
      # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
      mandatory: true
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 签收模式为手动签收-那么需要在代码中手动ACK
        acknowledge-mode: manual

app:
  rabbitmq:
    # 队列定义
    queue:
      # 正常业务队列
      user: user-queue
      # 死信队列
      user-dead-letter: user-dead-letter-queue
    # 交换机定义
    exchange:
      # 正常业务交换机
      user: user-exchange
      # 死信交换机
      common-dead-letter: common-dead-letter-exchange
  1. 队列、交换机定义与绑定。
/**
 * 队列与交换机定义与绑定
 *
 * @author futao
 * @date 2020/4/7.
 */
@Configuration
public class Declare {

        /**
     * 用户队列
     *
     * @param userQueueName 用户队列名
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //声明该队列死信消息在交换机的 路由键
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                .build();
    }

    /**
     * 用户交换机
     *
     * @param userExchangeName 用户交换机名
     * @return
     */
    @Bean
    public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
        return ExchangeBuilder
                .topicExchange(userExchangeName)
                .durable(true)
                .build();
    }

    /**
     * 用户队列与交换机绑定
     *
     * @param userQueue    用户队列名
     * @param userExchange 用户交换机名
     * @return
     */
    @Bean
    public Binding userBinding(Queue userQueue, Exchange userExchange) {
        return BindingBuilder
                .bind(userQueue)
                .to(userExchange)
                .with("user.*")
                .noargs();
    }

    /**
     * 死信交换机
     *
     * @param commonDeadLetterExchange 通用死信交换机名
     * @return
     */
    @Bean
    public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return ExchangeBuilder
                .topicExchange(commonDeadLetterExchange)
                .durable(true)
                .build();
    }


   /**
     * 用户队列的死信消息 路由的队列
     * 用户队列user-queue的死信投递到死信交换机`common-dead-letter-exchange`后再投递到该队列
     * 用这个队列来接收user-queue的死信消息
     *
     * @return
     */
    @Bean
    public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) {
        return QueueBuilder
                .durable(userDeadLetterQueue)
                .build();
    }

    /**
     * 死信队列绑定死信交换机
     *
     * @param userDeadLetterQueue      user-queue对应的死信队列
     * @param commonDeadLetterExchange 通用死信交换机
     * @return
     */
    @Bean
    public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) {
        return BindingBuilder
                .bind(userDeadLetterQueue)
                .to(commonDeadLetterExchange)
                .with("user-dead-letter-routing-key")
                .noargs();
    }

}
  • 定义好之后启动程序,springboot会读取Spring容器中类型为Queue和Exchange的bean进行队列和交换机的初始化与绑定。当然也可以自己在RabbitMQ的管理后台进行手动创建与绑定。
  • 查看管理后台


    交换机

    队列

    队列与死信交换机

# 测试

  • 消息生产者
/**
 * @author futao
 * @date 2020/4/7.
 */
@Component
public class DeadLetterSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchange;

    public void send() {
        User user = User.builder()
                .userName("天文")
                .address("浙江杭州")
                .birthday(LocalDate.now(ZoneOffset.ofHours(8)))
                .build();
        rabbitTemplate.convertAndSend(userExchange, "user.abc", user);
    }
}

1. 场景1.1

消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝或者nack,并且重新入队为false。

nack()与reject()的区别是:reject()不支持批量拒绝,而nack()可以.

  • 消费者代码
/**
 * @author futao
 * @date 2020/4/9.
 */
@Slf4j
@Component
public class Consumer {

    /**
     * 正常用户队列消息监听消费者
     *
     * @param user
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user}")
    public void userConsumer(User user, Message message, Channel channel) {
        log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user));
        try {
            //参数为:消息的DeliveryTag,是否批量拒绝,是否重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.info("拒绝签收...消息的路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey());
        } catch (IOException e) {
            log.error("消息拒绝签收失败", e);
        }
    }

    /**
     * @param user
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
    public void userDeadLetterConsumer(User user, Message message, Channel channel) {
        log.info("接收到死信消息:[{}]", JSON.toJSONString(user));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("死信队列签收消息....消息路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey());
        } catch (IOException e) {
            log.error("死信队列消息签收失败", e);
        }
    }
}
  • 可以看到,正常消息被NACK之后最后到了死信队列,且路由键发生了变化。
    死信消息

    1. 场景1.2

消费者设置了自动签收,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了nack/reject

  • application.yml中需要更改一些配置
spring:
  application:
    name: learn-rabbitmq
  rabbitmq:
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 自动签收
        acknowledge-mode: auto
        retry:
          enabled: true
          # 第一次尝试时间间隔
          initial-interval: 10S
          # 两次尝试之间的最长持续时间。
          max-interval: 10S
          # 最大重试次数(=第一次正常投递1+重试次数4)
          max-attempts: 5
          # 上一次重试时间的乘数
          multiplier: 1.0
  • 消费者代码
/**
 * @author futao
 * @date 2020/4/9.
 */
@Slf4j
@Configuration
public class AutoAckConsumer {

    /**
     * 正常用户队列消息监听消费者
     *
     * @param user
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user}")
    public void userConsumer(User user) {
        log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user));
        throw new RuntimeException("模拟发生异常");
    }

    /**
     * @param user
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
    public void userDeadLetterConsumer(User user) {
        log.info("接收到死信消息并自动签收:[{}]", JSON.toJSONString(user));
    }
}
  • 测试结果:


    image.png

    image.png
  • 从测试结果可以看出,消息如果未被正常消费,则进行重试,如果最终还未被正常消费,则会被投递到死信队列。

initial-interval,max-interval这两个参数啥作用不知道,现在测试的结果是一直都会取最短的那个时间作为下次投递时间...

2. 测试场景 2

消息过期,过了TTL存活时间。

  • 需要修改队列定义,设置队列消息的过期时间x-message-ttl.
    /**
     * 用户队列
     *
     * @param userQueueName 用户队列名
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //声明该队列死信消息在交换机的 路由键
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                //该队列的消息的过期时间-超过这个时间还未被消费则路由到死信队列
                .withArgument("x-message-ttl", 5000)
                .build();
    }
  • user-queue的消费者注释,使消息无法被消费,直到消息在队列中的时间达到设定的存活时间。

    ttl

  • 根据日志可以看到,消息在5S后会被投递到死信队列。


    image.png
  • 注意:可以给队列设置消息过期时间,那么所有投递到这个队列的消息都自动具有这个属性。还可以在消息投递之前,给每条消息设定指定的过期时间。(当两者都设置了,则默认取较短的值)

下面测试给每条消息设置指定的过期时间:

  • 修改消息生产者:
/**
 * @author futao
 * @date 2020/4/7.
 */
@Slf4j
@Component
public class DeadLetterSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchange;

    public void send(String exp) {
        User user = User.builder()
                .userName("天文")
                .address("浙江杭州")
                .birthday(LocalDate.now(ZoneOffset.ofHours(8)))
                .build();
        log.info("消息投递...指定的存活时长为:[{}]ms", exp);
        rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties messageProperties = message.getMessageProperties();
                //为每条消息设定过期时间
                messageProperties.setExpiration(exp);
                return message;
            }
        });
    }
}
image.png
  • 从测试结果可以看出,每条消息都在指定的时间投递到了死信队列。

【坑】重点注意!!!:RabbitMQ对于消息过期的检测:只会检测最近将要被消费的那条消息是否到达了过期时间,不会检测非末端消息是否过期。造成的问题是:非末端消息已经过期了,但是因为末端消息还未过期,非末端消息处于阻塞状态,所以非末端消息不会被检测到已经过期。使业务产生与预期严重不一致的结果。

  • 对上面的问题进行测试:(第一条消息的过期时间设置成10S,第二条消息设置成5S)


    image.png
  • 从测试结果可以看出,id为1的消息存活时长为10S,id为2的消息存活时间为5S。但是只有当第一条消息(id=1)过期之后,id=2的消息到达队列末端,才会被检测到已经过期。

3. 测试场景3

队列设置了x-max-length最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。

  • 修改队列定义
  /**
     * 用户队列
     *
     * @param userQueueName 用户队列名
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //声明该队列死信消息在交换机的 路由键
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                //队列最大消息数量
                .withArgument("x-max-length", 2)
                .build();
    }
image.png
  • 向队列中投递消息


    image.png
  • 从结果可以看出,当投递第3条消息的时候,RabbitMQ会把在最靠经被消费那一端的消息移出队列,并投递到死信队列。


    image.png

    队列中将始终保持最多两个消息。

# 其他:

# 相关:

SpringBoot RabbitMQ实现消息可靠投递

# TODO:

  • 消费端限流保护
  • 延迟队列
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351