第五章----SpringBoot+RabbitMQ用死信队列和插件形式实现延迟队列

1. 死信队列之延迟队列

死信队列:用来保存处理失败或者过期的消息,确保消息不被丢失以便排查问题!

延迟队列:顾名思义就是消息在队列中存在一定时间后再被消费。比如下单后半小时没有支付的订单自动取消,比如预约某项功能时提前15分钟提醒,比如希望某一个功能在多长时间后执行等都可以使用延迟队列。

  • RabbitMQ本身是没有延迟队列功能的,但是可以通过死信队列的TTL和DLX模拟延迟队列功能。
  • Time To Live:可以在发送消息时设置过期时间,也可以设置整个队列的过期时间,如果两个同时设置已最早过期时间为准。
  • Dead Letter Exchanges:可以通过绑定队列的死信交换器来实现死信队列。
x-dead-letter-exchange:绑定死信交换器(其实也是普通交换器,与类型无关)
x-dead-letter-routing-key:绑定死信队列的路由键(可选)
x-message-ttl:绑定队列消息的过期时间(可选)
  • 死信队列设计思路
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

进入消息队列:
1. 消息被拒绝,并且requeue= false
2. 消息ttl过期
3. 队列达到最大的长度
死信队列
  • 做延迟队列需要创建一个没有消费者的队列,用了存储消息。然后创建一个真正的消费队列,用来做具体的业务逻辑。当带有TTL的消息到达绑定死信交换器的队列,因为没有消费者所以会一直等到消息过期,然后消息被投递到死信队列也就是真正的消费队列。

  • 新建配置类MQDelayConfig.java,创建支付交换器、支付队列绑定死信队列、他们的绑定关系。无消费者,暂时不知道怎么用注解创建。

  • 设置x-dead-letter-exchange、x-dead-letter-routing-key、x-message-ttl。

package com.fzb.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description 利用死信队列和过期时间模拟延迟队列,没有消费者,所以不能用注解形式
 * Time To Live(TTL)
 * 1. 可以在发送消息时设置过期时间(message.getMessageProperties().setExpiration("5000");)
 * 2. 也可以设置整个队列的过期时间(args.put("x-message-ttl",10000);)
 * 3. 如果两个同时设置已最早过期时间为准
 * Dead Letter Exchanges(DLX)
 * @Author jxb
 * @Date 2019-03-10 10:25:30
 */
@Component
public class MQDelayConfig {

    /**
     * @Description 定义支付交换器
     * @Author jxb
     * @Date 2019-04-02 14:39:31
     */
    @Bean
    private DirectExchange directPayExchange() {
        return new DirectExchange("direct.pay.exchange");
    }

    /**
     * @Description 定义支付队列 绑定死信队列(其实是绑定的交换器,然后通过交换器路由键绑定队列) 设置过期时间
     * @Author jxb
     * @Date 2019-04-02 14:40:24
     */
    @Bean
    private Queue directPayQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //声明死信交换器
        args.put("x-dead-letter-exchange", "direct.delay.exchange");
        //声明死信路由键
        args.put("x-dead-letter-routing-key", "DelayKey");
        //声明队列消息过期时间
        args.put("x-message-ttl", 10000);
        return new Queue("direct.pay.queue", true, false, false, args);
    }

    /**
     * @Description 定义支付绑定
     * @Author jxb
     * @Date 2019-04-02 14:46:10
     */
    @Bean
    private Binding bindingOrderDirect() {
        return BindingBuilder.bind(directPayQueue()).to(directPayExchange()).with("OrderPay");
    }
}
  • 带有过期时间且绑定死信交换器的队列
队列
  • 生产者,为消息设置过期时间setExpiration("15000");
    /**
     * @Description 支付队列、绑定死信队列,测试消息延迟功能
     * @Author jxb
     * @Date 2019-04-02 14:07:25
     */
    @RequestMapping(value = "/directDelayMQ", method = {RequestMethod.GET})
    public List<User> directDelayMQ() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
            rabbitTemplate.convertAndSend("direct.pay.exchange", "OrderPay", user,
                    message -> {
                        // 设置5秒过期
                        message.getMessageProperties().setExpiration("15000");
                        return message;
                    },
                    correlationData);
            System.out.println(user.getName() + ":" + sdf.format(new Date()));
        }
        return users;
    }
  • 消费者,声明真正消费的队列、交换器、绑定
    /**
     * @Description 延迟队列
     * @Author jxb
     * @Date 2019-04-04 16:34:28
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.delay.queue"), exchange = @Exchange(value = "direct.delay.exchange"), key = {"DelayKey"})})
    public void getDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 模拟执行任务
        System.out.println("这是延迟队列消费:" + user.getName() + ":" + sdf.format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

测试结果,因为消息配置的是15秒后到期,而队列配置了10秒到期,所以最终按照时间短的计算。

延迟队列

思考: 如果先放入一条A消息过期时间是10秒,再放入一个b消息过期时间是5秒,那延迟队列是否可以先消费b消息?

答案是否定的,因为队列就会遵循先进先出的规则,b消息会等a消息过期后,一起消费,这就是所谓的队列阻塞。由这个问题我们引出插件形式来实现延迟队列

2. 用rabbitmq-delayed-message-exchange插件实现延迟队列

下载插件地址

  • 强烈建议安装erlang20+版本和RabbitMQ3.7+版本,另插件版本要和RabbitMQ版本一致。

  • 解压成.ez的文件,上传到RabbitMQ安装目录的plugins文件夹下,停止服务器,开启插件,启动服务器。

1. 查看yum 安装的软件路径
   查找安装包:rpm -qa|grep rabbitmq
   查找位置: rpm -ql rabbitmq-server-3.6.15-1.el6.noarch
   卸载yum安装:yum remove rabbitmq-server-3.6.15-1.el6.noarch
2. 上传到plugins文件夹
3. 停止服务器
   service rabbitmq-server stop
4. 开启插件
   rabbitmq-plugins enable rabbitmq_delayed_message_exchange
   (关闭插件)
   rabbitmq-plugins disable rabbitmq_delayed_message_exchange
5. 启动服务器
   service rabbitmq-server start
6. 查看插件
   rabbitmq-plugins list
  • 生产者,设置Header属性x-delay过期时间
    /**
     * @Description 插件延迟队列功能
     * @Author jxb
     * @Date 2019-04-02 14:07:25
     */
    @RequestMapping(value = "/directPluginDelayMQ", method = {RequestMethod.GET})
    public List<User> directPluginDelayMQ() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
            rabbitTemplate.convertAndSend("direct.plugin.delay.exchange", "PluginDelayKey", user,
                    message -> {
                        // 设置5秒过期
                        message.getMessageProperties().setHeader("x-delay",5000);
                        return message;
                    },
                    correlationData);
            System.out.println(user.getName() + ":" + sdf.format(new Date()));
        }
        return users;
    }
  • 消费者,设置x-delayed-message类型的交换器,增加参数x-delayed-type为direct
    /**
     * @Description 插件延迟队列
     * @Author jxb
     * @Date 2019-04-04 16:34:28
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.plugin.delay.queue"), exchange = @Exchange(value = "direct.plugin.delay.exchange",type = "x-delayed-message",arguments = {@Argument(name="x-delayed-type",value = "direct")}), key = {"PluginDelayKey"})})
    public void getPDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 模拟执行任务
        System.out.println("这是插件延迟队列消费:" + user.getName() + ":" + sdf.format(new Date()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
  • 插件形式交换器
交换器

注:用代码是创建一个:CustomExchange自定义交换器,类型一定要设置成:x-delayed-message

注:如果配置了发送回调ReturnCallback,插件延迟队列则会回调该方法,因为发送方确实没有投递到队列上,只是在交换器上暂存,等过期时间到了 才会发往队列。

消息暂存在交换器
  • SpringBoot集成RabbitMQ常用配置(非本系列用)
#rabbitmq
spring.rabbitmq.host=192.168.89.168
spring.rabbitmq.port=5672
spring.rabbitmq.username=fzb
spring.rabbitmq.password=fzb2019
spring.rabbitmq.virtual-host=fzb_host
#消费者数量
spring.rabbitmq.listener.simple.concurrency=10
#最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#消费者每次从队列获取的消息数量。写多了,如果长时间得不到消费,数据就一直得不到处理
spring.rabbitmq.listener.simple.prefetch=1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
#消费者消费失败,自动重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#启用发送重试 队列满了发不进去时启动重试
spring.rabbitmq.template.retry.enabled=true 
#1秒钟后重试一次
spring.rabbitmq.template.retry.initial-interval=1000 
#最大重试次数 3次
spring.rabbitmq.template.retry.max-attempts=3
#最大间隔 10秒钟
spring.rabbitmq.template.retry.max-interval=10000
#等待间隔 的倍数。如果为2  第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
spring.rabbitmq.template.retry.multiplier=1.0

做一个有趣的人,让生活更好玩一些

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354