RabbitMQ可靠消息和死信队列

一、消息确认机制

1.1 概念

保证消息不丢失,可靠抵达,可以使用的方式如下

  • 使用事务消息:通过事务保证消息不丢失,但是性能下降250倍
  • 确认机制:publisher: confirmCallback 确认模式publisher: returnCallback 为投递到queue退回模式consumer: ack机制
    image.png


1.2. 具体实现

2.1 从publisher到exchange

原理:消息只要被broker接受就会执行confirmCallback,如果是cluster模式,需要所有的broker接受到才会调用confirmCallback。

yml配置

spring.rabbitmq.publisher-confirm-type=correlated

收信后回调操作:

    /**
     * 设置消息接受的回调方法
     */
    @PostConstruct
    public void initRabbitTemplate() {

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 设置消息抵达exchange的回调方法
             * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
             * @param ack 消息是否成功收到
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...correlationData[" + correlationData + "]==>ack[" + ack + "]");
            }
        });

    }

CorrelationData:用来表示当前消息的唯一性,可以在发送消息时进行设置 rabbitTemplate.convertAndSend("test-exchange", "test.java", car, new CorrelationData(UUID.randomUUID().toString()));

2.2 从exchange到queue

原理:我们要保证exchange中的消息要投递到目标queue中,需要开启return退回模式。

yml配置

spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

exchange发送到queue失败后回调操作:

    /**
     * 设置消息接受的回调方法
     */
    @PostConstruct
    public void initRabbitTemplate() {

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有投递给指定的队列,就触发这个失败回调
             * @param message 投递失败的消息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 这个消息发送给那个交换机
             * @param routingKey 消息使用的是哪个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Fail Message[" + message + "]-->replyCode[" + replyCode + "]-->replyText[" + replyText + "]-->exchange[" + exchange + "]-->routingKey[" + routingKey + "]");
            }
        });

    }
  • 可以在回调方法中将发送失败的信息存储到MySQL中

2.3 从queue到consumer

原理:
消费者获取到消息,成功处理,可以回复Ack给Broker

  • basic.ack用于肯定回复;broker将移除此消息
  • basic.nack用于否定回复;可以指定broker是否丢弃消息,可以批量
  • basic.reject用于否定回复;同上,但不能批量

默认情况下Broker的消息发送给queue后会自动Ack,删除该消息。但是如果无法确定此消息是否被处理完成或成功处理。我们可以手动开启ack模式:

  • 消息处理成功,ack(),接受下一个消息,broker删除该消息
  • 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错后ack
  • 消息一直没有调用ack/nack方法,broker认为此消息被其他consumer处理,不会投递给别人,此时consumer断开,消息不会被broker删除,会投递给别人。

配置

spring.rabbitmq.listener.simple.acknowledge-mode=manual


二、延时队列和死信队列

2.1 延时队列

消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。

  • 对队列设置TTL:队列没有消费者连接的过期时间。
  • 对消息设置TTL:超时后该消息就是死信。

如果队列和消息都设置了TTL,那么会取最小的。通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者效果相同。


2.2 死信队列

死信情况包括如下三种:

  • 消息被consumer拒收(unack或reject)且requeue是false;
  • 消息TTL超时未消费;
  • 队列长度满了,排在前面的消息会被丢弃或发送到死信exchange中。
  • 超过最大重试次数

DLE(Dead Letter Exchange)
死信exchange是一种普通的exchange,只是所有的死信都会自动从队列中发送到该exchange中。


2.3 应用场景

场景:比如未支付订单,超时一定时间后,系统自动取消订单并释放占有的物品。

解决方案:

  • Spring的schedule定时或xxl中间件定时任务:消耗系统内存、增加数据库压力、存在较大的时间误差;
  • RabbitMQ的消息TTL和死信Exchange结合

如果订单超时时间为1小时,即为消息设置1小时的TTL时间且为队列设置DLE和死信路由键,消息超时过期后发送到DLE,根据死信路由键路由到死信队列中,从死信队列中获得的消息就是超时订单。

实现一:
给队列中的所有消息设定TTL时间,到达队列的时间时开始计时。

image.png

        // 创建信息过期时间为1m的队列
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange"); // 指定死信交换机的名字
        arguments.put("x-dead-letter-routing-key", "order.release.order"); // 指定死信的路由键
        arguments.put("x-message-ttl", 10000); // TTL时间设为1m
        Queue delayQueue = new Queue("order.delay.queue", true, false, false, arguments);
        amqpAdmin.declareQueue(delayQueue);

实现二:
给发布者发送的每条消息设施TTL时间,缺点是前一个过期处理之后才会处理后续的过期数据,存在过期数据处理不及时的情况。

image.png

    @Test
    public void sendOrderCreate() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("60000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        Order book = new Order(1, "book");
        rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", book, messagePostProcessor, new CorrelationData(UUID.randomUUID().toString()));
    }

注意点:如果创建队列成功后,代码中修改队列的属性,是不会覆盖原有属性的,需要删除后再次创建队列。

2.4 实现

2.4.1 流程

①定义交换机(普通信息和死信共用)、延时队列(参数指定死信交换器、死信路由和延时时间)和死信队列。
②绑定交换器和延时队列与死信队列。
③发送消息到交换机,路由到延时队列。
④不消费延时队列信息,信息过期后发送到死信交换器,交换器通过死信路由发送到死信队列。
⑤消费死信队列获得死信信息。

2.4.2 代码

依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置类

@Configuration
public class RabbitMqConfig {

    @Resource
    private AmqpAdmin amqpAdmin;

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 注入并使用指定的消息转换器(将序列化传输转变为json传输)
     *
     * @return
     */
//    @Bean
//    public Jackson2JsonMessageConverter converter(){
//        return new Jackson2JsonMessageConverter();
//    }

    /**
     * 设置消息接受的回调方法
     */
    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 设置消息抵达exchange的回调方法
             * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
             * @param ack 消息是否成功收到
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...correlationData[" + correlationData + "]==>ack[" + ack + "]");
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有投递给指定的队列,就触发这个失败回调
             * @param message 投递失败的消息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 这个消息发送给那个交换机
             * @param routingKey 消息使用的是哪个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Fail Message[" + message + "]-->replyCode[" + replyCode + "]-->replyText[" + replyText + "]-->exchange[" + exchange + "]-->routingKey[" + routingKey + "]");
            }
        });
    }


    /**
     * 定义交换机,参数说明
     * 1. name 交换机名称
     * 2. durable 是否持久化,如果持久化,mq重启后交换机还在
     * 3. autoDelete 自动删除,交换机没有绑定队列则删除,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
     * 4. arguments 参数,可以设置一个队列的扩展参数,比如设置存活时间
     */
    @PostConstruct
    public void createExchange() {
        // 创建交换机(同时作为订单信息交换机和死信交换机)
        DirectExchange directExchange = new DirectExchange("order-event-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
    }

    /**
     * 定义队列,参数说明
     * 1. queue 队列名称
     * 2. durable 是否持久化,如果持久化,mq重启后队列还在
     * 3. exclusive 是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除(如果将此参数设置true可用于临时队列的创建)
     * 4. autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
     * 5. arguments 参数,可以设置一个队列的扩展参数,比如设置存活时间
     */
    @PostConstruct
    public void createQueue() {
        // 创建信息过期时间为1m的队列
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange"); // 指定死信交换机的名字
        arguments.put("x-dead-letter-routing-key", "order.release.order"); // 指定死信的路由键
        arguments.put("x-message-ttl", 10000); // TTL时间设为1m
        Queue delayQueue = new Queue("order.delay.queue", true, false, false, arguments);
        amqpAdmin.declareQueue(delayQueue);

        // 创建死信队列
        Queue releaseQueue = new Queue("order.release.order.queue", true, false, false);
        amqpAdmin.declareQueue(releaseQueue);
    }

    /**
     * 定义队列,参数说明
     * 1. destination 绑定的队列或者交换机的名字
     * 2. destinationType 需要绑定的类型
     * 3. exchange 交换机的名字
     * 4. routingKey 路由键
     * 5. arguments 参数,可以设置一个队列的扩展参数
     */
    @PostConstruct
    public void createBinding() {
        // 交换机order-event-exchange与订单创建队列order.delay.queue绑定,路由键为order.create.order
        Binding createBinding = new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
        amqpAdmin.declareBinding(createBinding);

        // 交换机order-event-exchange与死信队列order.release.order.queue绑定,路由键为order.release.order
        Binding releaseBinding = new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
        amqpAdmin.declareBinding(releaseBinding);
    }

}

监听死信队列

@Service
public class RabbitListenerService {

    @RabbitListener(queues = "order.release.order.queue")
    public void consumer1(Message message, Order order, Channel channel) throws IOException {
        System.out.println(order);

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,false);
    }

}

发送信息

    @Test
    public void sendOrderCreate() {
        Order book = new Order(1, "book");
        rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order",book, new CorrelationData(UUID.randomUUID().toString()));
    }


三、 硬件

image.png

3.1 内存

默认如果rabbitmq使用超过物理内存的40%,会报警并阻塞所有队列。
可以通过配置文件或输入命令来改变默认的配置
命令方式:

  1. 如果使用内存超过90MB则报警:rabbitmqctl set_vm_memory_high_watermark absolute 90MB
  2. 如果使用内存超过物理内存的40%则报警:rabbitmqctl set_vm_memory_high_watermark 0.4

3.2 磁盘

磁盘剩余空间低于阈值时,同样会阻塞生产者,避免因非持久化的消息持续换页导致服务器磁盘耗尽而崩溃。

  1. 如果磁盘空间剩余小于100GB则报警:rabbitmqctl set_disk_free_limit 100GB
  2. 如果磁盘空间剩余小于内存的1.5倍,则报警:rabbitmqctl set_disk_free_limit memory_limit 1.5

3.3 内存换页

在某个broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入到磁盘中,其中持久化的消息本身就在磁盘胡中有一个副本,所以在转移过程中持久化的消息会先从内存中清除掉。

默认情况下,内存到达的阈值是50%时进行换页,即在默认情况下物理内存使用超过0.4*0.5=0.2时,进行换页。
可以通过设置vm_memory_high_watermark_paging_ratio进行调整。

官网配置


四、附

常用配置

spring:
  rabbitmq:
    host: 192.168.32.207
    port: 5672
    # addresses: 192.168.32.207:5672,... # 配置集群的地址
    username: guest
    password: guest
    virtual-host: /transaction_demo
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        #retry:
          #enabled: true # 开启重试
          #max-attempts: 10 # 最大重试次数
          #initial-interval: 2000m # 重试间隔时间

消费者retry重试消费消息后要放入死信队列,就不能将acknowledge-mode设为manual,否则消息一直是unack状态。

完整配置

spring:
  rabbitmq:
    host: 192.168.32.207
    # addresses: # 配置集群的地址
    port: 5672
    username: guest
    password: guest
    virtual-host: /transaction_demo
    publisher-confirm-type: correlated
    publisher-returns: true
    requested-heartbeat: # 指定心态超时,单位秒,0为不指定:默认60s
    connection-timeout: # 连接超时,单位毫秒,0表示无穷大,不超时
    cache:
      channel:
        size: # 缓存中保存的channel数量
        checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创新一个新channel
      connection:
        size: # 缓存的链接数,只有是CONNECTION模式时生效
        mode: # 连接工厂缓存模式:CHANNEL和CONNECTION
    template:
      mandatory: true
      receive-timeout: # receive()操作的超时时间
      reply-timeout: # sendAndReceive()操作的超时时间
      retry:
        enabled: # 发送重试是否可用
        max-attempts: #最大重试次数
        initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
        multiplier: # 应用于上一重试间隔的乘数
        max-interval: # 最大重试时间间隔
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启重试
          max-attempts: 10 # 最大重试次数
          initial-interval: 2000ms # 重试间隔时间
          multiplier: # 应用于上一重试间隔的乘数
          max-interval: # 最大重试时间间隔
          stateless: # 重试是有状态or无状态
        auto-startup: # 是否启动时自动启动容器
        concurrency: # 最小的消费者数量
        max-concurrency: # 最大的消费者数量
        prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
        transaction-size: # 指定一个事务处理的消息数量,最好小于等于prefetch的数量
        default-requeue-rejected: # 决定被拒绝的消息是否重新入队,默认是true(与参数acknowledge-mode有关系)
        idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒


面试题

RabbitMQ为什么需要信道,为什么不是TCP直接通信:

  1. TCP的创建和销毁开销大,创建要三次握手,销毁要四次分手。每个线程都开一个TCP连接,造成底层操作系统处理繁忙;
  2. 信道的原理是一条线程一个信道,多条线程多条信道同用一条TCP连接,一条TCP连接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能瓶颈。


五、RabbitMQ集群搭建

......待续

©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容