第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费

消息队列目前流行的有KafKa、RabbitMQ、ActiveMQ等,它们的诞生无非不是为了解决消息的分布式消费,完成项目、服务之间的解耦动作。消息队列提供者与消费者之间完全采用异步通信方式,极力的提高了系统的响应能力,从而提高系统的网络请求吞吐量。
每一种的消息队列都有它在设计上的独一无二的优势,在实际的项目技术选型时根据项目的需求来确定。

免费教程专题

恒宇少年在博客整理三套免费学习教程专题,由于文章偏多特意添加了阅读指南,新文章以及之前的文章都会在专题内陆续填充,希望可以帮助大家解惑更多知识点。

本章目标

基于SpringBoot项目整合RabbitMQ消息队列,完成DirectExchange(路由键)分布式消息消费。

SpringBoot 企业级核心技术学习专题


专题 专题名称 专题描述
001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件
002 Spring Boot 核心技术章节源码 Spring Boot 核心技术简书每一篇文章码云对应源码
003 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解
004 Spring Cloud 核心技术章节源码 Spring Cloud 核心技术简书每一篇文章对应源码
005 QueryDSL 核心技术 全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技术 全面讲解SpringDataJPA核心技术
007 SpringBoot核心技术学习目录 SpringBoot系统的学习目录,敬请关注点赞!!!

Exchange

RabbitMQ中有三种常用的转发方式,分别是:

DirectExchange:路由键方式转发消息。
FanoutExchange:广播方式转发消息。
TopicExchange:主题匹配方式转发消息。

我们本章先来讲解DirectExchange路由键方式,根据设置的路由键的值进行完全匹配时转发,下面我们来看一张图,形象的介绍了转发消息匹配流程,如下图所示:

DirectExchange

我们可以看到上图,当消息被提供者发送到RabbitMQ后,会根据配置队列的交换以及绑定实例进行转发消息,上图只会将消息转发路由键为KEY的队列消费者对应的实现方法逻辑中,从而完成消息的消费过程。

安装RabbitMQ

因为RabbitMQ是跨平台的分布式消息队列服务,可以部署在任意的操作系统上,下面我们分别介绍在不同的系统下该怎么去安装RabbitMQ服务。

我们本章采用的环境版本如下:

  • RabbitMQ Server 3.6.14
  • Erlang/OTP_X64 20.1

Windows下安装

我们先去RabbitMQ官方网站下载最新版的安装包,下载地址:https://www.rabbitmq.com/download.html,可以根据不同的操作系统选择下载。
我们在安装RabbitMQ服务端时需要Erlang环境的支持,所以我们需要先安装Erlang

  1. 我们通过Erlang官方网站http://www.erlang.org/downloads下载最新的安装包

  2. 我们访问RabiitmQ官方下载地址https://www.rabbitmq.com/download.html下载最新安装包。

因为是国外的网站所以下载比较慢,大家下载时会浪费时间,我已经将安装包分享到了百度网盘,下载地址:安装包下载地址,密码:pexf

  1. 运行安装Erlang

  2. 运行安装RabbitMQ

5.检查服务是否安装完成,RabbitMQ安装完成后会以服务的形式创建,并且随着开机启动,如下所示:

Rabbit服务

Mac OS X 安装

在Mac OS X中我们使用brew工具可以很简单的安装RabbitMQ服务端,步骤如下:

  1. brew更新到最新版本,执行:brew update
  2. 接下来我们安装Erlang,执行:brew install erlang
  3. 最后安装RabbitMQ,执行:brew install rabbitmq

我们通过上面的步骤安装后,RabbitMQ会被自动安装到/usr/local/Cellar/rabbitmq/目录下,下面我们进入cd sbin目录执行:

sudo ./rabbitmq-server

可以直接启动RabbitMQ服务。

Ubuntu 安装

Ubuntu操作系统中,我们可以直接使用APT仓库进行安装,我使用的系统版本是16.04,系统版本并不影响安装。

  1. 安装Erlang,执行命令:sudo apt-get install erlang
  2. 下面我们需要将RabbitMQ的安装源配置信息写入到系统的/etc/apt/sources.list.d配置文件内,执行如下命令:
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
  1. 下面我们更新APT本地仓库的安装包列表,执行命令:sudo apt-get update
  2. 最后安装RabbitMQ服务,执行命令:sudo apt-get install rabbitmq-server

启用界面管理插件

RabbitMQ提供了界面管理的web插件,我们只需要启用指定的插件就可以了,下面我们来看看Windows操作系统下该怎么启动界面管理插件。
我们使用CMD进入RabbitMQ安装目录C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.14,然后我们进入sbin目录,可以看到目录内存在很多个bat脚本程序,我们找到rabbitmq-plugins.bat,这个脚本程序可以控制RabbitMQ插件启用禁用,我们执行如下脚本命令来启用界面管理插件:

rabbitmq-plugins.bat enable rabbitmq_management

命令行输出内容如下所示:

The following plugins have been enabled:
  amqp_client
  cowlib
  cowboy
  rabbitmq_web_dispatch
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@yuqiyu... started 6 plugins.

可以看到输出的内容RabbitMQ自动启动了6个插件,我们现在访问http://127.0.0.1:15672地址可以直接打开RabbitMQ的界面管理平台,而默认的用户名/密码分别为:guest/guest,通过该用户可以直接登录管理平台。

禁用界面管理插件

我们同样可以禁用RabbitMQ指定插件,执行如下命令:

rabbitmq-plugins.bat disable rabbitmq_management

命令创建输出内容则是相关停止插件的日志,如下:

The following plugins have been disabled:
  amqp_client
  cowlib
  cowboy
  rabbitmq_web_dispatch
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@yuqiyu... stopped 6 plugins.

这样我们再访问http://127.0.0.1:15672就会发现我们无法访问到界面。

构建项目

我们使用idea开发工具创建一个SpringBoot项目,添加依赖,pom.xml配置文件如下所示:

<dependencies>
        <!--rabbitmq依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--web依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--fastjson依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>
        <!--测试依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

我们本章来模拟用户注册完成后,将注册用户的编号通过Provider模块发送到RabbitMQ,然后RabbitMQ根据配置的DirectExchange的路由键进行异步转发。

初始化用户表

下面我们先来创建所需要的用户基本信息表,建表SQL如下所示:

CREATE TABLE `user_info` (
  `UI_ID` int(11) DEFAULT NULL COMMENT '用户编号',
  `UI_USER_NAME` varchar(20) DEFAULT NULL COMMENT '用户名称',
  `UI_NAME` varchar(20) DEFAULT NULL COMMENT '真实姓名',
  `UI_AGE` int(11) DEFAULT NULL COMMENT '用户年龄',
  `UI_BALANCE` decimal(10,0) DEFAULT NULL COMMENT '用户余额'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户基本信息表';

构建 rabbitmq-provider 项目

基于我们上述的项目创建一个Maven子模块,命名为:rabbitmq-provider,因为是直接创建的Module项目,IDEA并没有给我创建SpringApplication启用类。

创建入口类

下面我们自行创建一个Provider项目启动入口程序,如下所示:

/**
 * 消息队列消息提供者启动入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:14
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqProviderApplication
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqProviderApplication.class);

    /**
     * 消息队列提供者启动入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqProviderApplication.class,args);

        logger.info("【【【【【消息队列-消息提供者启动成功.】】】】】");
    }
}
application.properties配置文件

下面我们在src/main/resource目录下创建application.properties并将对应RabbitMQ以及Druid的配置加入,如下所示:

#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true

#数据源配置
spring.datasource.druid.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.druid.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true
spring.datasource.druid.username=root
spring.datasource.druid.password=123456

RabbitMQ内有个virtual-host即虚拟主机的概念,一个RabbitMQ服务可以配置多个虚拟主机,每一个虚拟机主机之间是相互隔离,相互独立的,授权用户到指定的virtual-host就可以发送消息到指定队列。

用户实体

本章数据库操作采用spring-data-jpa,相关文章请访问:第十三章:SpringBoot实战SpringDataJPA,我们基于user_info数据表对应创建实体,如下所示:

@Data
@Table(name = "user_info")
@Entity
public class UserEntity
    implements Serializable
{
    /**
     * 用户编号
     */
    @Id
    @GeneratedValue
    @Column(name = "UI_ID")
    private Long id;
    /**
     * 用户名称
     */
    @Column(name = "UI_USER_NAME")
    private String userName;
    /**
     * 姓名
     */
    @Column(name = "UI_NAME")
    private String name;
    /**
     * 年龄
     */
    @Column(name = "UI_AGE")
    private int age;
    /**
     * 余额
     */
    @Column(name = "UI_BALANCE")
    private BigDecimal balance;
}
用户数据接口

创建UserRepository用户数据操作接口,并继承JpaRepository获得spring-data-jpa相关的接口定义方法。如下所示:

/**
 * 用户数据接口定义
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:35
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public interface UserRepository
    extends JpaRepository<UserEntity,Long>
{
}
用户业务逻辑实现

本章只是简单完成了数据的添加,代码如下所示:

/**
 * 用户业务逻辑实现类
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:37
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Service
@Transactional(rollbackFor = Exception.class)
public class UserService
{
    @Autowired
    private UserRepository userRepository;
    /**
     * 消息队列业务逻辑实现
     */
    @Autowired
    private QueueMessageService queueMessageService;

    /**
     * 保存用户
     * 并写入消息队列
     * @param userEntity
     * @return
     */
    public Long save(UserEntity userEntity) throws Exception
    {
        /**
         * 保存用户
         */
        userRepository.save(userEntity);
        /**
         * 将消息写入消息队列
         */
        queueMessageService.send(userEntity.getId(), ExchangeEnum.USER_REGISTER, QueueEnum.USER_REGISTER);

        return userEntity.getId();
    }

在上面业务逻辑实现类内出现了一个名为QueueMessageService消息队列实现类,该类是我们定义的用于发送消息到消息队列的统一入口,在下面我们会详细讲解。

用户控制器

创建一个名为UserController的控制器类,对应编写一个添加用户的请求方法,如下所示:

/**
 * 用户控制器
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:41
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@RestController
@RequestMapping(value = "/user")
public class UserController
{
    /**
     * 用户业务逻辑
     */
    @Autowired
    private UserService userService;

    /**
     * 保存用户基本信息
     * @param userEntity
     * @return
     */
    @RequestMapping(value = "/save")
    public UserEntity save(UserEntity userEntity) throws Exception
    {
        userService.save(userEntity);
        return userEntity;
    }
}

到这我们添加用户的流程已经编写完成了,那么我们就来看下消息队列QueueMessageService接口的定义以及实现类的定义。

消息队列方法定义接口

创建一个名为QueueMessageService的接口并且继承了RabbitTemplate.ConfirmCallback接口,而RabbitTemplate.ConfirmCallback接口是用来回调消息发送成功后的方法,当一个消息被成功写入到RabbitMQ服务端时,就会自动的回调RabbitTemplate.ConfirmCallback接口内的confirm方法完成通知,QueueMessageService接口如下所示:

/**
 * 消息队列业务
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:50
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public interface QueueMessageService
    extends RabbitTemplate.ConfirmCallback
{
    /**
     * 发送消息到rabbitmq消息队列
     * @param message 消息内容
     * @param exchangeEnum 交换配置枚举
     * @param queueEnum 队列配置枚举
     * @throws Exception
     */
    public void send(Object message, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception;
}

接下来我们需要实现该接口内的所有方法,并做出一些业务逻辑的处理。

消息队列业务实现

创建名为QueueMessageServiceSupport实体类实现QueueMessageService接口,并实现接口内的所有方法,如下所示:

/**
 * 消息队列业务逻辑实现
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:52
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
public class QueueMessageServiceSupport
    implements QueueMessageService
{
    /**
     * 消息队列模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(Object message, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception {
        //设置回调为当前类对象
        rabbitTemplate.setConfirmCallback(this);
        //构建回调id为uuid
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //发送消息到消息队列
        rabbitTemplate.convertAndSend(exchangeEnum.getValue(),queueEnum.getRoutingKey(),message,correlationId);
    }

    /**
     * 消息回调确认方法
     * @param correlationData 请求数据对象
     * @param ack 是否发送成功
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回调id:" + correlationData.getId());
        if (ack) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败:" + cause);
        }
    }
}

convertAndSend方法用于将Object类型的消息转换后发送到RabbitMQ服务端,发送是的消息类型要与消息消费者方法参数保持一致。

confirm方法内,我们仅仅打印了消息发送时的id,根据ack参数输出消息发送状态。

在上面代码中我们注入了RabbitTemplate消息队列模板实例,而通过该实例我们可以将消息发送到RabbitMQ服务端。那么这个实例具体在什么地方定义的呢?我们带着这个疑问来创建下面的模块,我们需要将RabbitMQ相关的配置抽取出来作为一个单独的Module存在。

构建 rabbitmq-common 项目

该模块项目很简单,只是添加RabbitMQ相关的配置信息,由于Module是一个子模块所以继承了parent所有的依赖,当然我们用到的RabbitMQ相关依赖也不例外。

配置rabbitmq

在创建配置类之前,我们先来定义两个枚举,分别存放了队列的交换信息、队列路由信息,

  • ExchangeEnum (存放了队列交换配置信息)
/**
 * rabbitmq交换配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:13:56
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum ExchangeEnum
{
    /**
     * 用户注册交换配置枚举
     */
    USER_REGISTER("user.register.topic.exchange")
    ;
    private String value;

    ExchangeEnum(String value) {
        this.value = value;
    }
}
  • QueueEnum (存放了队列信息以及队列的路由配置信息)
/**
 * 队列配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:05
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum QueueEnum
{
    /**
     * 用户注册枚举
     */
    USER_REGISTER("user.register.queue","user.register")
    ;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;

    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}

创建名为UserRegisterQueueConfiguration的实体类用于配置本章用到的用户注册队列信息,如果你得项目中使用多个队列,建议每一个业务逻辑创建一个配置类,分开维护,这样不容易出错。配置信息如下:

/**
 * 用户注册消息队列配置
 * ========================
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:16:58
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Configuration
public class UserRegisterQueueConfiguration {
    /**
     * 配置路由交换对象实例
     * @return
     */
    @Bean
    public DirectExchange userRegisterDirectExchange()
    {
        return new DirectExchange(ExchangeEnum.USER_REGISTER.getValue());
    }

    /**
     * 配置用户注册队列对象实例
     * 并设置持久化队列
     * @return
     */
    @Bean
    public Queue userRegisterQueue()
    {
        return new Queue(QueueEnum.USER_REGISTER.getName(),true);
    }

    /**
     * 将用户注册队列绑定到路由交换配置上并设置指定路由键进行转发
     * @return
     */
    @Bean
    public Binding userRegisterBinding()
    {
        return BindingBuilder.bind(userRegisterQueue()).to(userRegisterDirectExchange()).with(QueueEnum.USER_REGISTER.getRoutingKey());
    }
}

该配置类大致分为如下三部分:

  • 配置交换实例
    配置DirectExchange实例对象,为交换设置一个名称,引用ExchangeEnum枚举配置的交换名称,消息提供者与消息消费者的交换名称必须一致才具备的第一步的通讯基础。

  • 配置队列实例
    配置Queue实例对象,为消息队列设置一个名称,引用QueueEnum枚举配置的队列名称,当然队列的名称同样也是提供者与消费者之间的通讯基础。

  • 绑定队列实例到交换实例
    配置Binding实例对象,消息绑定的目的就是将Queue实例绑定到Exchange上,并且通过设置的路由Key进行消息转发,配置了路由Key后,只有符合该路由配置的消息才会被转发到绑定交换上的消息队列。

我们的rabbitmq-common模块已经编写完成。

添加 rabbitmq-provider 依赖 rabbitmq-common

下面我们回到rabbitmq-provider模块,修改pom.xml配置文件,如下所示:

<dependencies>
        <!--添加common模块依赖-->
        <dependency>
            <groupId>com.hengyu</groupId>
            <artifactId>rabbitmq-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <!--mysql依赖-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--druid数据源依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.5</version>
        </dependency>
        <!--data jpa依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
    </dependencies>

可以看到我们将rabbitmq-common模块添加到了rabbitmq-provider模块的pom配置文件内,完成了模块之间的相互依赖,这样我们rabbitmq-provider就自动添加了对应的消息队列配置。

构建rabbitmq-consumer

我们再来创建一个rabbitmq-consumer队列消息消费者模块,用于接受消费用户注册消息。

创建入口类

同样我们先来创建一个SpringApplication入口启动类,如下所示:

/**
 * 消息队列消息消费者入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:15
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqConsumerApplication
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerApplication.class);

    /**
     * rabbitmq消费者启动入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqConsumerApplication.class,args);

        logger.info("【【【【【消息队列-消息消费者启动成功.】】】】】");
    }
}
application.properties配置文件

配置文件的消息队列配置信息要与rabbitmq-provider配置文件一致,如下所示:

spring.application.name=rabbitmq-consumer
#启动端口
server.port=1111
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true

我们修改了程序启动的端口号,为了我们下面进行测试的时候不出现端口占用的情况。

如果RabbitMQ配置信息与rabbitmq-provider不一致,就不会收到消费消息。

用户注册消息消费者

创建名为UserConsumer类,用于完成消息监听,并且实现消息消费,如下所示:

/**
 * 用户注册消息消费者
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:20
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {

    @RabbitHandler
    public void execute(Long userId)
    {
        System.out.println("用户:" + userId+",完成了注册");

        //...//自行业务逻辑处理
    }
}

在消息消费者类内,有两个陌生的注解:

  • @RabbitListener
    RabbitMQ队列消息监听注解,该注解配置监听queues内的队列名称列表,可以配置多个。队列名称对应本章rabbitmq-common模块内QueueEnum枚举name属性。
  • @RabbitHandler
    RabbitMQ消息处理方法,该方法的参数要与rabbitmq-provider发送消息时的类型保持一致,否则无法自动调用消费方法,也就无法完成消息的消费。

运行测试

我们接下来在rabbitmq-provider模块src/test/java下创建一个测试用例,访问用户注册控制器请求路径,如下所示:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqProviderApplication.class)
public class UserTester
{
    /**
     * 模拟mvc测试对象
     */
    private MockMvc mockMvc;

    /**
     * web项目上下文
     */
    @Autowired
    private WebApplicationContext webApplicationContext;

    /**
     * 所有测试方法执行之前执行该方法
     */
    @Before
    public void before() {
        //获取mockmvc对象实例
        mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
    }

    /**
     * 测试添加用户
     * @throws Exception
     */
    @Test
    public void testUserAdd() throws Exception
    {
        mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
                .param("userName","yuqiyu")
                .param("name","恒宇少年")
                .param("age","23")
        )
                .andDo(MockMvcResultHandlers.log())
                .andReturn();
    }
}

调用测试用例时会自动将参数保存到数据库,并且将用户编号发送到RabbitMQ服务端,而RabbitMQ根据交换配置以及队列配置转发消息到消费者实例。

启动 rabbitmq-consumer

我们先来把rabbitmq-consumer项目启动,控制台输出启动日志如下所示:

.....
51.194  INFO 2340 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'rabbitConnectionFactory' has been autodetected for JMX exposure
2017-12-03 16:58:51.196  INFO 2340 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'rabbitConnectionFactory': registering with JMX server as MBean [org.springframework.amqp.rabbit.connection:name=rabbitConnectionFactory,type=CachingConnectionFactory]
2017-12-03 16:58:51.216  INFO 2340 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2017-12-03 16:58:51.237  INFO 2340 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#443ff8ef:0/SimpleConnection@4369ac5c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62107]
2017-12-03 16:58:51.287  INFO 2340 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-03 16:58:51.290  INFO 2340 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : Started RabbitmqConsumerApplication in 2.354 seconds (JVM running for 3.026)
2017-12-03 16:58:51.290  INFO 2340 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息队列-消息消费者启动成功.】】】】】

该部分启动日志就是我们配置的RabbitMQ初始化信息,我们可以看到项目启动时会自动与配置的RabbitMQ进行关联:

[delegate=amqp://guest@127.0.0.1:5672/, localPort= 62107]
运行测试用例

接下来我们执行rabbitmq-provider项目的测试用例,来查看控制台的输出内容如下所示:

......
 回调id:e08f6d82-57bc-4c3f-9899-31c4b990c5be
消息发送成功
......

已经可以正常的将消息发送到RabbitMQ服务端,并且接收到了回调通知,那么我们的rabbitmq-consumer项目是不是已经执行了消息的消费呢?我们打开rabbitmq-consumer控制台查看输出内容如下所示:

用户:2,完成了注册

看以看到已经可以成功的执行UserConsumer消息监听类内的监听方法逻辑,到这里消息队列路由一对一的方式已经讲解完了。

总结

本章主要讲解了RabbitMQ在不同操作系统下的安装方式,以及通过三个子模块形象的展示了消息的分布式处理,整体流程:rabbitmq-provider -> RabbitMQ服务端 -> rabbitmq-consumer,消息的转发是非常快的,RabbitMQ在收到消息后就会检索当前服务端是否存在该消息的消费者,如果存在将会马上将消息转发。

本章源码已经上传到码云:
SpringBoot配套源码地址:https://gitee.com/hengboy/spring-boot-chapter
SpringCloud配套源码地址:https://gitee.com/hengboy/spring-cloud-chapter

作者个人 博客
使用开源框架 ApiBoot 助你成为Api接口服务架构师

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,585评论 18 139
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    预流阅读 584,376评论 51 785
  • 1.RabbitMQ概述 简介: MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法...
    梁朋举阅读 49,302评论 0 47
  • RabbitMQ 原理介绍及安装部署 标签:RabbitMQ 安装 简介 RabbitMQ 是一个用 Erlang...
    神仙CGod阅读 8,550评论 0 60
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,343评论 2 34