第四十六章:SpringBoot & RabbitMQ完成消息延迟消费

2018-3-1SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0来构建,JDK最低支持也从原来的1.6也改成了1.8,不再兼容1.8以下的版本,更多新特性请查看官方文档

本章目标

基于SpringBoot整合RabbitMQ完成消息延迟消费。

免费教程专题

恒宇少年在博客整理三套免费学习教程专题,由于文章偏多特意添加了阅读指南,新文章以及之前的文章都会在专题内陆续填充,希望可以帮助大家解惑更多知识点。

构建项目

注意前言

由于SpringBoot的内置扫描机制,我们如果不自动配置扫描路径,请保持下面rabbitmq-common模块内的配置可以被SpringBoot扫描到,否则不会自动创建队列,控制台会输出404的错误信息。

SpringBoot 企业级核心技术学习专题


专题 专题名称 专题描述
001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件
002 Spring Boot 核心技术章节源码 Spring Boot 核心技术简书每一篇文章码云对应源码
003 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解
004 Spring Cloud 核心技术章节源码 Spring Cloud 核心技术简书每一篇文章对应源码
005 QueryDSL 核心技术 全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技术 全面讲解SpringDataJPA核心技术
007 SpringBoot核心技术学习目录 SpringBoot系统的学习目录,敬请关注点赞!!!

我们本章采用2.0.0.RELEASE版本的SpringBoot,添加相关的依赖如下所示:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies>
        <!--rabbbitMQ相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--web相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--spring boot tester-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--fast json依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>
    </dependencies>
......

我们仍然采用多模块的方式来测试队列的Provider以及Consumer

队列公共模块

我们先来创建一个名为rabbitmq-common公共依赖模块(Create New Maven Module)
在公共模块内添加一个QueueEnum队列枚举配置,该枚举内配置队列的ExchangeQueueNameRouteKey等相关内容,如下所示:

package com.hengyu.rabbitmq.lazy.enums;

import lombok.Getter;

/**
 * 消息队列枚举配置
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:33
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Getter
public enum QueueEnum {
    /**
     * 消息通知队列
     */
    MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
    /**
     * 消息通知ttl队列
     */
    MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");
    /**
     * 交换名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
}

可以看到MESSAGE_QUEUE队列配置跟我们之前章节的配置一样,而我们另外新创建了一个后缀为ttl的消息队列配置。我们采用的这种方式是RabbitMQ消息队列其中一种的延迟消费模块,通过配置队列消息过期后转发的形式。

这种模式比较简单,我们需要将消息先发送到ttl延迟队列内,当消息到达过期时间后会自动转发到ttl队列内配置的转发Exchange以及RouteKey绑定的队列内完成消息消费。

下面我们来模拟消息通知的延迟消费场景,先来创建一个名为MessageRabbitMqConfiguration的队列配置类,该配置类内添加消息通知队列配置以及消息通过延迟队列配置,如下所示:

/**
 * 消息通知 - 消息队列配置信息
 *
 * @author:恒宇少年 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:32
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Configuration
public class MessageRabbitMqConfiguration {
    /**
     * 消息中心实际消费队列交换配置
     *
     * @return
     */
    @Bean
    DirectExchange messageDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心延迟消费交换配置
     *
     * @return
     */
    @Bean
    DirectExchange messageTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心实际消费队列配置
     *
     * @return
     */
    @Bean
    public Queue messageQueue() {
        return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
    }


    /**
     * 消息中心TTL队列
     *
     * @return
     */
    @Bean
    Queue messageTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
                // 配置到期后转发的交换
                .withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
                // 配置到期后转发的路由键
                .withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
                .build();
    }

    /**
     * 消息中心实际消息交换与队列绑定
     *
     * @param messageDirect 消息中心交换配置
     * @param messageQueue  消息中心队列
     * @return
     */
    @Bean
    Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
        return BindingBuilder
                .bind(messageQueue)
                .to(messageDirect)
                .with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
    }

    /**
     * 消息中心TTL绑定实际消息中心实际消费交换机
     *
     * @param messageTtlQueue
     * @param messageTtlDirect
     * @return
     */
    @Bean
    public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
        return BindingBuilder
                .bind(messageTtlQueue)
                .to(messageTtlDirect)
                .with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
    }
}

我们声明了消息通知队列的相关ExchangeQueueBinding等配置,将message.center.create队列通过路由键message.center.create绑定到了message.center.direct交换上。

除此之外,我们还添加了消息通知延迟队列ExchangeQueueBinding等配置,将message.center.create.ttl队列通过message.center.create.ttl路由键绑定到了message.center.topic.ttl交换上。

我们仔细来看看messageTtlQueue延迟队列的配置,跟messageQueue队列配置不同的地方这里多出了x-dead-letter-exchangex-dead-letter-routing-key两个参数,而这两个参数就是配置延迟队列过期后转发的ExchangeRouteKey,只要在创建队列时对应添加了这两个参数,在RabbitMQ管理平台看到的队列配置就不仅是单纯的Direct类型的队列类型,如下图所示:

队列类型差异

在上图内我们可以看到message.center.create.ttl队列多出了DLXDLK的配置,这就是RabbitMQ死信交换的标志。
满足死信交换的条件,在官方文档中表示:

Messages from a queue can be 'dead-lettered'; that is, republished to another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with requeue=false,
The TTL for the message expires; or
The queue length limit is exceeded.

  • 该消息被拒绝(basic.reject或 basic.nack),requeue = false
  • 消息的TTL过期
  • 队列长度限制已超出
    官方文档地址

我们需要满足上面的其中一种方式就可以了,我们采用满足第二个条件,采用过期的方式。

队列消息提供者

我们再来创建一个名为rabbitmq-lazy-provider的模块(Create New Maven Module),并且在pom.xml配置文件内添加rabbitmq-common模块的依赖,如下所示:

<!--添加公共模块依赖-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>

配置队列

resource下创建一个名为application.yml的配置文件,在该配置文件内添加如下配置信息:

spring:
  #rabbitmq消息队列配置信息
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /hengboy
    publisher-confirms: true

消息提供者类

接下来我们来创建名为MessageProvider消息提供者类,用来发送消息内容到消息通知延迟队列,代码如下所示:

/**
 * 消息通知 - 提供者
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:40
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Component
public class MessageProvider {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
    /**
     * RabbitMQ 模版消息实现类
     */
    @Autowired
    private AmqpTemplate rabbitMqTemplate;

    /**
     * 发送延迟消息
     *
     * @param messageContent 消息内容
     * @param exchange       队列交换
     * @param routerKey      队列交换绑定的路由键
     * @param delayTimes     延迟时长,单位:毫秒
     */
    public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {
        if (!StringUtils.isEmpty(exchange)) {
            logger.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
            // 执行发送消息到指定队列
            rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
                // 设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            });
        } else {
            logger.error("未找到队列消息:{},所属的交换机", exchange);
        }
    }
}

由于我们在 pom.xml配置文件内添加了RabbitMQ相关的依赖并且在上面application.yml文件内添加了对应的配置,SpringBoot为我们自动实例化了AmqpTemplate,该实例可以发送任何类型的消息到指定队列。
我们采用convertAndSend方法,将消息内容发送到指定ExchangeRouterKey队列,并且通过setExpiration方法设置过期时间,单位:毫秒。

编写发送测试

我们在test目录下创建一个测试类,如下所示:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {
    /**
     * 消息队列提供者
     */
    @Autowired
    private MessageProvider messageProvider;

    /**
     * 测试延迟消息消费
     */
    @Test
    public void testLazy() {
        // 测试延迟10秒
        messageProvider.sendMessage("测试延迟消费,写入时间:" + new Date(),
                QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),
                QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),
                10000);
    }
}

注意:@SpringBootTest注解内添加了classes入口类的配置,因为我们是模块创建的项目并不是默认创建的SpringBoot项目,这里需要配置入口程序类才可以运行测试。

在测试类我们注入了MessageProvider消息提供者,调用sendMessage方法发送消息到消息通知延迟队列,并且设置延迟的时间为10秒,这里衡量发送到指定队列的标准是要看MessageRabbitMqConfiguration配置类内的相关Binding配置,通过ExchangeRouterKey值进行发送到指定的队列。

到目前为止我们的rabbitmq-lazy-provider消息提供模块已经编写完成了,下面我们来看看消息消费者模块。

队列消息消费者

我们再来创建一个名为rabbitmq-lazy-consumer的模块(Create New Maven Module),同样需要在pom.xml配置文件内添加rabbitmq-common模块的依赖,如下所示:

<!--添加公共模块依赖-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>

当然同样需要在resource下创建application.yml并添加消息队列的相关配置,代码就不贴出来了,可以直接从rabbitmq-lazy-provider模块中复制application.yml文件到当前模块内。

消息消费者类

接下来创建一个名为MessageConsumer的消费者类,该类需要监听消息通知队列,代码如下所示:

/**
 * 消息通知 - 消费者
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午5:00
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    @RabbitHandler
    public void handler(String content) {
        logger.info("消费内容:{}", content);
    }
}

@RabbitListener注解内配置了监听的队列,这里配置内容是QueueEnum枚举内的queueName属性值,当然如果你采用常量的方式在注解属性上是直接可以使用的,枚举不支持这种配置,这里只能把QueueName字符串配置到queues属性上了。
由于我们在消息发送时采用字符串的形式发送消息内容,这里在@RabbitHandler处理方法的参数内要保持数据类型一致!

消费者入口类

我们为消费者模块添加一个入口程序类,用于启动消费者,代码如下所示:

/**
 * 【第四十六章:SpringBoot & RabbitMQ完成消息延迟消费】
 * 队列消费者模块 - 入口程序类
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:55
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);
    }
}

测试

我们的代码已经编写完毕,下面来测试下是否完成了我们预想的效果,步骤如下所示:

1. 启动消费者模块
2. 执行RabbitMqLazyProviderApplicationTests.testLazy()方法进行发送消息到通知延迟队列
3. 查看消费者模块控制台输出内容

我们可以在消费者模块控制台看到输出内容:

2018-03-04 10:10:34.765  INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer      : 消费内容:测试延迟消费,写入时间:Sun Mar 04 10:10:24 CST 2018

我们在提供者测试方法发送消息的时间为10:10:24,而真正消费的时间则为10:10:34,与我们预计的一样,消息延迟了10秒后去执行消费。

总结

终上所述我们完成了消息队列的延迟消费,采用死信方式,通过消息过期方式触发,在实际项目研发过程中,延迟消费还是很有必要的,可以省去一些定时任务的配置。

本章源码已经上传到码云:
SpringBoot配套源码地址:https://gitee.com/hengboy/spring-boot-chapter
SpringCloud配套源码地址:https://gitee.com/hengboy/spring-cloud-chapter

作者个人 博客
使用开源框架 ApiBoot 助你成为Api接口服务架构师

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容