Springboot与RabbitMQ上手之TTL+DLX实现延迟队列(六)

前言

目的主要是学习RabbitMQ的TTL+DLX实现延迟队列,大概会简单介绍学习为主:毕竟还是要来演示Springboot整合RabbitMQ注解的方式来使用。

一. TTL

TTL,Time to Live的简称,即过期时间。RabbitMQ 可以对消息和队列设置TTL。
消息的 TTL 指的是消息的存活时间,RabbitMQ 支持消息、队列两种方式设置 TTL,分别如下:
消息设置 TTL:对消息的设置是在发送时进行 TTL 设置,通过 x-message-ttl 或expiration 字段设置,单位为毫秒,代表消息的过期时间,每条消息的 TTL 可不同。
队列设置 TTL:对队列的设置是在消息入队列时计算,通过 x-expires 设置,队列中的所有消息都有相同的过期时间,当超过了队列的超时设置,消息会自动的清除。
注意:如果以上两种方式都做了设置,消息的 TTL 则以两者之中最小的那个为准。

二.死信队列

DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信( dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况:
1). 消息被拒绝(Basic. Reject/Basic.Nack),并且设置requeue参数为false;
2). 消息过期;
3). 队列达到最大长度。

2.1 DLX 交换机

DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息以进行相应的处理
####### 2.2 DLX声明
声明步骤如下:
第一步:声明正常的交换机,队列,路由键
第二步:在正常的队列里设置过期时间,绑定死信队列交换机名称以及设置发送给死信队列交换机路由键
第三步:声明死信交换机,队列,路由键
上面的做法是为了设置一个过期的正常队列路由到死信队列上面,让任意的队列去监听消费它。


image.png
    /* 设置队列过期时间*/
    @Bean
    public Exchange queueTimeOutExchange(){
        return new DirectExchange("queueTimeOut.exchange.test",true,false);
    }

    // 声明过期的队列并定义队列名称
    @Bean
    public Queue queueTimeOutQueue(){
        // 消息过期 特殊的args
        Map<String,Object> args  = new HashMap<>(16);
        args.put("x-message-ttl",20000);
        // 绑定死信交换机名称
        args.put("x-dead-letter-exchange", "dead.exchange.test");
        // 置发送给死信交换机的routingKey
        args.put("x-dead-letter-routing-key", "key.queue.dead");
        // 设置队列可以存储的最大消息数量
        args.put("x-max-length", 10);
        return new Queue("queueTimeOut.queue.test"
                ,true,false,false,args);
    }

    @Bean
    public Binding queueTimeOutBinding(){
        return new Binding("queueTimeOut.queue.test",
                Binding.DestinationType.QUEUE,
                "queueTimeOut.exchange.test",
                "key.queuetime.out",null);
    }

    /* 死信队列*/
    @Bean
    public Exchange deadExchange(){
        return new DirectExchange("dead.exchange.test",true,false);
    }

    // 声明过期的队列并定义队列名称
    @Bean
    public Queue deadQueue(){

        return new Queue("dead.queue.test"
                ,true,false,false);
    }

    @Bean
    public Binding deadBinding(){
        return new Binding("dead.queue.test",
                Binding.DestinationType.QUEUE,
                "dead.exchange.test",
                "key.queue.dead",null);
    }
image.png

上图标记的队列就是TTL+DLX。



上面标记之前为这个队列设置的特性

三,实现简单实例

3.1 生产端

service

    // 设置 死信队列
    public void deadMessag() throws JsonProcessingException;

impl

    /**
     * 測試死信隊列
     */
    @Override
    public void deadMessag() throws JsonProcessingException {
        /* 使用MessageProperties传递的对象转换成message*/
        MessageProperties messageProperties = new MessageProperties();
        OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
        orderMessageDTO.setProductId(100);
        orderMessageDTO.setPrice(new BigDecimal("20"));
        orderMessageDTO.setOrderId(1);
        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
        Message message = new Message(messageToSend.getBytes(),messageProperties);
        // 发送端确认是否确认消费
        CorrelationData correlationData = new CorrelationData();
        // 唯一ID
        correlationData.setId(orderMessageDTO.getOrderId().toString());
        rabbitTemplate.convertAndSend("dead.exchange.test","key.queue.dead",message,correlationData);
    }

controller

@RestController
@Slf4j
@RequestMapping("/api")
public class SendController {
    @GetMapping("/deadQueue")
    public void deadQueue() throws JsonProcessingException {
        directService.deadMessag();
    }
}
3.2 消费端

service

    // 监听死信队列, 死信队列可以在任何的队列上被指定,实际上就是设置某个队列的属性
    public void deadQueueListenter(Message message, Channel channel) throws IOException;

impl

   /**
     * 监听死信队列
     */
    @RabbitListener(
            containerFactory = "rabbitListenerContainerFactory",
            bindings = {
                    @QueueBinding(
                            value = @Queue(name = "dead.queue.test"),
                            exchange = @Exchange(name = "dead.exchange.test",
                                    type = ExchangeTypes.DIRECT),
                            key = "key.queue.dead"
                    )
            }
    )
    @Override
    public void deadQueueListenter(@Payload Message message, Channel channel) throws IOException {
        log.info("========direct死信队列,业务场景超时===========");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                String message = new String(body, "UTF-8");
                log.info("[x] Received '" + message + "'");
                // 手动 false ack
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        //. 设置 Channel 消费者绑定队列
        channel.basicConsume("dead.queue.test",false,consumer);
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容