第四十二章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息多消费者消费

在上一章第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费我们讲解到了RabbitMQ消息队列的DirectExchange路由键消息单个消费者消费,源码请访问SpringBoot对应章节源码下载查看,消息队列目的是完成消息的分布式消费,那么我们是否可以为一个Provider创建并绑定多个Consumer呢?

免费教程专题

恒宇少年在博客整理三套免费学习教程专题,由于文章偏多特意添加了阅读指南,新文章以及之前的文章都会在专题内陆续填充,希望可以帮助大家解惑更多知识点。

本章目标

基于SpringBoot平台整合RabbitMQ消息队列,完成一个Provider绑定多个Consumer进行消息消费。

SpringBoot 企业级核心技术学习专题


专题 专题名称 专题描述
001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件
002 Spring Boot 核心技术章节源码 Spring Boot 核心技术简书每一篇文章码云对应源码
003 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解
004 Spring Cloud 核心技术章节源码 Spring Cloud 核心技术简书每一篇文章对应源码
005 QueryDSL 核心技术 全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技术 全面讲解SpringDataJPA核心技术
007 SpringBoot核心技术学习目录 SpringBoot系统的学习目录,敬请关注点赞!!!

构建项目

我们基于上一章的项目进行升级,我们先来将Chapter41项目Copy一份命名为Chapter42

构建 rabbitmq-consumer-node2

基于我们复制的Chapter42项目,创建一个Module子项目命名为rabbitmq-consumer-node2,用于消费者的第二个节点,接下来我们为rabbitmq-consumer-node2项目创建一个入口启动类RabbitmqConsumerNode2Application,代码如下所示:

/**
 * 消息队列消息消费者节点2入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:15
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqConsumerNode2Application
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerNode2Application.class);

    /**
     * rabbitmq消费者启动入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqConsumerNode2Application.class,args);

        logger.info("【【【【【消息队列-消息消费者节点2启动成功.】】】】】");
    }
}

为了区分具体的消费者节点,我们在项目启动成功后打印了相关的日志信息,下面我们来编写application.properties配置文件信息,可以直接从rabbitmq-consumer子项目内复制内容,复制后需要修改server.port以及spring.application.name,如下所示:

#端口号
server.port=1112
#项目名称
spring.application.name=rabbitmq-consumer-node2


#rabbitmq相关配置
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true

因为我们是本地测试项目,所以需要修改对应的端口号,防止端口被占用。

创建用户注册消费者

复制rabbitmq-consumer子项目内的UserConsumer类到rabbitmq-consumer-node2子项目对应的package内,如下所示:

/**
 * 用户注册消息消费者
 * 分布式节点2
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:20
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {

    /**
     * logback
     */
    private Logger logger = LoggerFactory.getLogger(UserConsumer.class);

    @RabbitHandler
    public void execute(Long userId)
    {
        logger.info("用户注册消费者【节点2】获取消息,用户编号:{}",userId);

        //...//自行业务逻辑处理
    }
}

为了区分具体的消费者输出内容,我们在上面UserConsumer消费者消费方法内打印了相关日志输出,下面我们同样把rabbitmq-consumer子项目内UserConsumer的消费方法写入相关日志,如下所示:

@RabbitHandler
    public void execute(Long userId)
    {
        logger.info("用户注册消费者【节点1】获取消息,用户编号:{}",userId);

        //...//自行业务逻辑处理
    }

到目前为止我们的多节点RabbitMQ消费者已经编写完成,下面我们来模拟多个用户注册的场景,来查看用户注册消息是否被转发并唯一性的分配给不同的消费者节点。

运行测试

我们打开上一章编写的UserTester测试类,为了模拟多用户注册请求,我们对应的创建一个内部线程类BatchRabbitTester,在线程类内编写注册请求代码,如下所示:

 /**
     * 批量添加用户线程测试类
     * run方法发送用户注册请求
     */
    class BatchRabbitTester implements Runnable
    {
        private int index;
        public BatchRabbitTester() { }

        public BatchRabbitTester(int index) {
            this.index = index;
        }


        @Override
        public void run() {
            try {
                mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
                        .param("userName","yuqiyu" + index)
                        .param("name","恒宇少年" + index)
                        .param("age","23")
                )
                        .andDo(MockMvcResultHandlers.log())
                        .andReturn();
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }

为了区分每一个注册信息是否都已经写入到数据库,我们为BatchRabbitTester添加了一个有参的构造方法,将for循环的i值对应的传递为index的值。下面我们来编写对应的批量注册的测试方法,如下所示:

 /**
     * 测试用户批量添加
     * @throws Exception
     */
    @Test
    public void testBatchUserAdd() throws Exception
    {
        for (int i = 0 ; i < 10 ; i++) {
            //创建用户注册线程
            Thread thread = new Thread(new BatchRabbitTester(i));
            //启动线程
            thread.start();
        }
        //等待线程执行完成
        Thread.sleep(2000);
    }

我们循环10次来测试用户注册请求,每一次都会创建一个线程去完成发送注册请求逻辑,在方法底部添加了sleep方法,目的是为了阻塞测试用例的结束,因为我们测试用户完成方法后会自动停止,不会去等待其他线程执行完成,所以这里我们阻塞测试主线程来完成发送注册线程请求逻辑。

执行批量注册测试方法

我们在执行测试批量注册用户消息之前,先把rabbitmq-consumerrabbitmq-consumer-node2两个消费者子项目启动,项目启动完成后可以看到控制台输出启动成功日志,如下所示:

rabbitmq-consumer:
2017-12-10 17:10:36.961  INFO 15644 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : Started RabbitmqConsumerApplication in 2.405 seconds (JVM running for 3.39)
2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息队列-消息消费者启动成功.】】】】】

rabbitmq-consumer-node2:
2017-12-10 17:11:31.679  INFO 13812 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1112 (http)
2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : Started RabbitmqConsumerNode2Application in 2.419 seconds (JVM running for 3.129)
2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : 【【【【【消息队列-消息消费者节点2启动成功.】】】】】

接下来我们来运行testBatchUserAdd方法,查看测试控制台输出内容如下所示:

2017-12-10 17:15:02.619  INFO 14456 --- [       Thread-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#528df369:0/SimpleConnection@39b6ba57 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60936]
 回调id:194b5e67-6913-474a-b2ac-6e938e1e85e8
消息发送成功
 回调id:e88ce59c-3eb9-433c-9e25-9429e7076fbe
消息发送成功
 回调id:3e5b8382-6f63-450f-a641-e3d8eee255b2
消息发送成功
 回调id:39103357-6c80-4561-acb7-79b32d6171c9
消息发送成功
 回调id:9795d227-b54e-4cde-9993-a5b880fcfe39
消息发送成功
 回调id:e9b8b828-f069-455f-a366-380bf10a5909
消息发送成功
 回调id:6b5b4a9c-5e7f-4c53-9eef-98e06f8be867
消息发送成功
 回调id:619a42f3-cb94-4434-9c75-1e28a04ce350
消息发送成功
 回调id:6b720465-b64a-4ed9-9d8c-3e4dafa4faed
消息发送成功
 回调id:b4296f7f-98cc-423b-a4ef-0fc31d22cb08
消息发送成功

可以看到确实已经成功的发送了10条用户注册消息到RabbitMQ服务端,那么是否已经正确的成功的将消息转发到消费者监听方法了呢?我们来打开rabbitmq-consumer子项目的启动控制台查看日志输出内容如下所示:

2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息队列-消息消费者启动成功.】】】】】
2017-12-10 17:15:02.695  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:20
2017-12-10 17:15:02.718  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:22
2017-12-10 17:15:02.726  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:26
2017-12-10 17:15:02.729  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:21
2017-12-10 17:15:02.789  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:28

可以看到成功的接受了5条对应用户注册消息内容,不过这里具体接受的条数并不是固定的,这也是RabbitMQ消息转发权重内部问题。
下面我们打开rabbitmq-consumer-node2子项目控制台查看日志输出内容如下所示:

2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : 【【【【【消息队列-消息消费者节点2启动成功.】】】】】
2017-12-10 17:15:02.708  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:25
2017-12-10 17:15:02.717  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:23
2017-12-10 17:15:02.719  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:24
2017-12-10 17:15:02.727  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:27
2017-12-10 17:15:02.790  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:29

同样获得了5条用户注册消息,不过并没有任何规律可言,编号也不是顺序的。

所以多节点时消息具体分发到哪个节点并不是固定的,完全是RabbitMQ分发机制来控制。

总结

本章完成了基于SpringBoot平台整合RabbitMQ单个Provider对应绑定多个Consumer来进行多节点分布式消费者消息消费,实际生产项目部署时完全可以将消费节点分开到不同的服务器,只要消费节点可以访问到RabbitMQ服务端,可以正常通讯,就可以完成消息消费。

本章源码已经上传到码云:
SpringBoot配套源码地址:https://gitee.com/hengboy/spring-boot-chapter
SpringCloud配套源码地址:https://gitee.com/hengboy/spring-cloud-chapter

作者个人 博客
使用开源框架 ApiBoot 助你成为Api接口服务架构师

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容