SpringBoot整合RabbitMQ——交换机和队列的管理和绑定

本博文从某个角度来说,应该是rabbitMQ应用开发的知识点梳理,使用目前最广泛流行的SpringBoot来集成rabbitMQ的功能开发,在开发的过程中讲解rabbitMQ的各种特性、技能点以及开发过程中需要注意的地方

上一篇博文Centos7下RabbitMQ的搭建我们已经搭建好RabbitMQ的服务器,并且简单的介绍了MQ服务中主要的几个概念,各个概念的主要作用以及为了实现消息的发送和接收机制,各个概念之间的依存关系。也简单的介绍了消息生产和消费的业务流程图,大致了的了解了RabbitMQ的整体的概念模型,但是talk is cheap, show me the code,所以为了更加真实的了解如何使用代码来实现的,下面我们就SpringBoot来整合RabbitMQ的开发环境,并且简单了解具体的技术参数的运用

上一篇博文:

SpringBoot集成RabbitMQ

SpringBoot集成RabbitMQ非常简单,只需要在maven依赖中添加依赖即可

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

然后再application.yml文件中添加rabbitMQ的配置信息

    spring:
      rabbitmq:
        addresses: 192.168.56.105:5672
        username: root
        password: root
        virtual-host: /

这样一个简单的rabbitMQ的开发环境就搭建好了,下面我们开始编写交换机创建和队形注册rabbitMQ的相关功能,最好是能独立成一个简单通用的jar被其他项目复用,下面我们来新建个项目,有两个子module,其中一个是消费者,另外一个是生产者。具体的目录结构如下:
具体的项目可以参考我的gitee项目:Gitee-rabbitmq

[图片上传失败...(image-e8534f-1561823748958)]

其中common主要是存放消费者和生产者共同的代码,比如一些常量和通用配置类的信息

连接RabbitMQ

系统应该在一启动的时候就连接RabbitMQ,我们这里不采用SpringBoot默认提供的RabbitTemplate,这样不方便我们自定义扩展的属性,所以我们手动注册ConnectionFactory,并且提供RabbitAdmin来进行交换机、队列以及路由绑定的管理

    @Configuration
    public class RabbitMQConfig {
        /**
         * rabbitMQ服务器的地址
         */
        @Value("${spring.rabbitmq.addresses:192.168.56.105:5672}")
        private String addresses;
        /**
         * rabbitMQ用户名
         */
        @Value("${spring.rabbitmq.username:root}")
        private String username;
        /**
         * rabbitMQ密码
         */
        @Value("${spring.rabbitmq.password:root}")
        private String password;
        /**
         * rabbitMQ虚拟机 这里默认 /
         */
        @Value("${spring.rabbitmq.virtual-host:/}")
        private String virtualHost;
    
        /**
         * 注册rabbitMQ的Connection
         *
         * @return
         */
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
            cachingConnectionFactory.setAddresses(this.addresses);
            cachingConnectionFactory.setUsername(this.username);
            cachingConnectionFactory.setPassword(this.password);
            cachingConnectionFactory.setVirtualHost(this.virtualHost);
            return cachingConnectionFactory;
        }
    
        /**
         * 注册rabbitAdmin 方便管理
         *
         * @param connectionFactory
         * @return
         */
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    }

交换机的管理

Exchange和Queueshi AMQP中的high-level层面的构建模块,应用程序应该提供对其的管理,我们首先来实现对交换机的管理

交换机的特性

  • exchangeName: 交换机的名称
  • type: 交换机的类型
  • durable: 是否持久化
  • autoDelete: 是否自动删除
  • internal: 是否内置交换机
  • argument: 结构化参数
    public class MqExchage {
        /**
         * 交换机的名称
         */
        @NotNull(message = "交换机名称不能为空")
        private String name;
    
        /**
         * 交换机的类型
         */
        @NotNull(message = "交换机类型不能为空")
        private ExchangeTypeEnum type;
        /**
         * 是否持久化
         * 持久化可以将交换机存盘,在服务器重启的时候不会丢失相关的信息
         */
        private boolean durable;
        /**
         * 是否自动删除
         * 自动删除的前提是至少有一个队列或者交换机与这个交互机绑定,之后所有与这个交换机绑定的队列或者交换机都与此解绑
         */
        private boolean autoDelete;
    
        public MqExchage name(String name) {
            this.name = name;
            return this;
        }
    
    
        public MqExchage type(ExchangeTypeEnum type) {
            this.type = type;
            return this;
        }
    
        public MqExchage durable(boolean durable) {
            this.durable = durable;
            return this;
        }
    
        public MqExchage autoDelete(boolean autoDelete) {
            this.autoDelete = autoDelete;
            return this;
        }
    
        /**
         * 自定义属性参数
         * 比如:alternate-exchange
         */
        private Map<String, Object> arguments;
        public class MqExchage {
        /**
         * 交换机的名称
         */
        @NotNull(message = "交换机名称不能为空")
        private String name;
    
        /**
         * 交换机的类型
         */
        @NotNull(message = "交换机类型不能为空")
        private ExchangeTypeEnum type;
        /**
         * 是否持久化
         * 持久化可以将交换机存盘,在服务器重启的时候不会丢失相关的信息
         */
        private boolean durable;
        /**
         * 是否自动删除
         * 自动删除的前提是至少有一个队列或者交换机与这个交互机绑定,之后所有与这个交换机绑定的队列或者交换机都与此解绑
         */
        private boolean autoDelete;
    
        public MqExchage name(String name) {
            this.name = name;
            return this;
        }
    
    
        public MqExchage type(ExchangeTypeEnum type) {
            this.type = type;
            return this;
        }
    
        public MqExchage durable(boolean durable) {
            this.durable = durable;
            return this;
        }
    
        public MqExchage autoDelete(boolean autoDelete) {
            this.autoDelete = autoDelete;
            return this;
        }
    
        /**
         * 自定义属性参数
         * 比如:alternate-exchange
         */
        private Map<String, Object> arguments;
            
    }

交换机的注册和销毁

    //通用的Declare
    public abstract class AbstractDeclare {
    
        public final Log logger = LogFactory.getLog(this.getClass());
        @Autowired
        RabbitAdmin rabbitAdmin;
    
        /**
         * 自定义的校验
         *
         * @param object
         */
        public abstract void DefinedValidate(Object object);
    
        /**
         * 通用校验
         * 1. 校验字段是否是非空
         *
         * @param object
         */
        public void validate(Object object) {
            Result result = ValidateUtils.validate(object);
            if (!ResultEnum.success().equals(result.getCode())) {
                RabbitMQExceptionUtils.throwRabbitMQException(result.getMsg());
            }
    
            this.DefinedValidate(object);
        }
    }
   @Component
    public class AmExchangeDeclare extends AbstractDeclare {
    
        @Autowired
        RabbitAdmin rabbitAdmin;
    
        /**
         * 向rabbitMQ服务器注册指定的交换机以及交换机的类型
         *
         * @param mqExchage
         * @return
         */
        public Exchange declareExchange(MqExchage mqExchage) {
            this.logger.info("declare exchange is :" + mqExchage.toString());
    
            Exchange exchange = null;
    
            super.validate(mqExchage);
            exchange = this.initExchange(mqExchage);
            this.rabbitAdmin.declareExchange(exchange);
    
            this.logger.info("declare exchange success");
            return exchange;
        }
    
        /**
         * 从RabbitMQ服务端上删除指定的交换机
         *
         * @param exchangeName
         * @return
         */
        public boolean deleteExchange(String exchangeName) {
            this.logger.info("delete exchange is : " + exchangeName);
    
            if (StringUtils.isEmpty(exchangeName)) {
                throw new RabbitMQException("the parameter exchangeName couldn't not be null");
            }
    
            return this.rabbitAdmin.deleteExchange(exchangeName);
        }
    
        /**
         * 根据不同类型初始化不同类型的交换机
         *
         * @param mqExchage
         * @return
         */
        private Exchange initExchange(MqExchage mqExchage) {
            ExchangeTypeEnum exchangeTypeEnum = mqExchage.getType();
            switch (exchangeTypeEnum) {
                case DIRECT:
                    return new DirectExchange(mqExchage.getName(), mqExchage.isDurable(), mqExchage.isAutoDelete(), mqExchage.getArguments());
                case TOPIC:
                    return new TopicExchange(mqExchage.getName(), mqExchage.isDurable(), mqExchage.isAutoDelete(), mqExchage.getArguments());
                case FANOUT:
                    return new FanoutExchange(mqExchage.getName(), mqExchage.isDurable(), mqExchage.isAutoDelete(), mqExchage.getArguments());
                case HEADERS:
                    return new HeadersExchange(mqExchage.getName(), mqExchage.isDurable(), mqExchage.isAutoDelete(), mqExchage.getArguments());
                default:
                    return null;
            }
        }
    
        /**
         * 自定义校验规则
         *
         * @param object
         */
        @Override
        public void DefinedValidate(Object object) {
    
        }
    }
    

队列的管理

队列是存储消息的载体,默认没有指定交换机的哈,队列的创建都是绑定RabbitMQ提供的默认的交换机,队列的创建和维护与交换机比较类似,见如下的代码:

Queue的管理

    public class MqQueue {
        /**
         * 队列名称
         */
        @NotNull(message = "队列名称不能为空")
        private String name;
        /**
         * 是否持久化
         * 持久化会存盘,服务器重启时不会丢失相关信息
         */
        private boolean durable;
        /**
         * 是否排他
         * 如果是排他,则该队列对首次声明他的连接有效,并在连接断开时自动删除
         * 注意:
         * 1. 同一个连接的其他的Channel是可以连接该排他队列的
         * 2. 首次是说其他连接就不同创建同名的排他队列
         * 适用于一个客户端同时发送和读取消息
         */
        private boolean exclusive;
        /**
         * 是否自动删除
         * 自动删除的前提是至少有一个队列或者交换机与这个交互机绑定,之后所有与这个交换机绑定的队列或者交换机都与此解绑
         */
        private boolean autoDelete;
        /**
         * 结构化参数
         * x-message-ttl、x-expires等
         */
        private Map<String, Object> arguments;
    
        public MqQueue name(String name) {
            this.name = name;
            return this;
        }
    
        public MqQueue durable(boolean durable) {
            this.durable = durable;
            return this;
        }
    
        public MqQueue exclusive(boolean exclusive) {
            this.exclusive = exclusive;
            return this;
        }
    
        public MqQueue autoDelete(boolean autoDelete) {
            this.autoDelete = autoDelete;
            return this;
        }
    
        public MqQueue arguments(Map<String, Object> arguments) {
            this.arguments = arguments;
            return this;
        }
    }
    @Component
    public class AmQueueDeclare extends AbstractDeclare {
    
        /**
         * 声明队列
         * 向rabbitMQ服务器声明一个队列
         *
         * @param mqQueue
         * @return
         */
        public Queue declareQueue(MqQueue mqQueue) {
            this.logger.info("the parameter queue is : " + mqQueue.toString());
    
            super.validate(mqQueue);
    
            Queue queue = new Queue(mqQueue.getName());
            BeanUtils.copyProperties(mqQueue, queue);
    
            this.logger.info("declare queue is : " + queue.toString());
    
            super.rabbitAdmin.declareQueue(queue);
    
            this.logger.info("declare queue success");
            return queue;
        }
    
        /**
         * 清空队列中的消息
         *
         * @param queueName
         * @return 清楚队列中的消息的个数
         */
        public int purgeQueue(String queueName) {
            if (StringUtils.isEmpty(queueName)) {
                RabbitMQExceptionUtils.throwRabbitMQException();
            }
            this.logger.info("purge queue is : " + queueName);
            return super.rabbitAdmin.purgeQueue(queueName);
        }
    
        /**
         * 判断指定的队列是否存在
         * 1. 如果存在则返回该队列
         * 2. 如果不存在则返回null
         *
         * @param queueName
         * @return true 存在, false 不存在
         */
        public boolean isQueueExist(String queueName) {
            if (StringUtils.isEmpty(queueName)) {
                RabbitMQExceptionUtils.throwRabbitMQException();
            }
    
            this.logger.info("isQueueExist queue is : " + queueName);
    
            String isExist = super.rabbitAdmin.getRabbitTemplate().execute((channel -> {
                try {
                    AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(queueName);
                    return declareOk.getQueue();
                } catch (Exception e) {
                    if (this.logger.isDebugEnabled()) {
                        RabbitMQExceptionUtils.throwRabbitMQException(e.getMessage());
                    }
                    return null;
                }
            }));
    
            this.logger.info("the queue " + queueName + " is exist : " + isExist);
            return StringUtils.isEmpty(isExist) ? Boolean.FALSE : Boolean.TRUE;
        }
    
        /**
         * 从rabbitMQ服务器中删除指定的队列
         *
         * @param queueName
         * @return
         */
        public boolean deleteQueue(String queueName) {
            this.logger.info("delete queue is :" + queueName);
    
            if (StringUtils.isEmpty(queueName)) {
                RabbitMQExceptionUtils.throwRabbitMQException();
            }
    
            return super.rabbitAdmin.deleteQueue(queueName);
        }
    
        /**
         * 从rabbitMQ服务器中删除指定的队列
         *
         * @param queueName 队列名称
         * @param unused    队列是否在使用,如果设置为true则该队列只能在没有被使用的情况下才能删除
         * @param empty     队列是否为空,如果设置为true则该队列只能在该队列没有消息时才会被删除
         */
        public void deleteQueue(String queueName, boolean unused, boolean empty) {
            this.logger.info("delete queue is : { queueName : '" + queueName
                    + "' , unused: '" + unused + "' , empty:'" + empty + "'}");
    
            if (StringUtils.isEmpty(queueName)) {
                RabbitMQExceptionUtils.throwRabbitMQException();
            }
    
            super.rabbitAdmin.deleteQueue(queueName, unused, empty);
        }
    
        /**
         * 自定义的校验
         *
         * @param object
         */
        @Override
        public void DefinedValidate(Object object) {
    
        }
    }

Bing绑定

上一篇博文中我们详细介绍了交换机和队列之间的关系:生产者将消息投递给交换机,然后由交换器路由到一个或多个队列中,在这里我们需要重点理解下,交换机是如何路由到对应的队列?也可以理解为:交换机如何和队列进行绑定。

在理解前,我们需要知道两个概念:RoutingKey和BindingKey

生产者将消息发送给交换机,一般会指定一个RoutingKey,通过这个指定这个消息的路由规则,而这个RoutingKey需要与交换机的类型和Bindingkey联合使用才能生效。

RoutingKey是决定消息的流向的,生产者将消息发送给交换机时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会被路由到对应的队列中。

BindingKey可以是相同的,但是在Fanout类型的交换机时不生效的

上面说的可能比较晦涩,我们打个比方应该就比较容易理解了

还是之前我们的邮局投递邮件,RoutingKey相当于邮件上的地址,而BindingKey则相当于邮件的目的地,当地址和目的地是相匹配的时候,收件人(队列)才能收到邮件,如果不一致的话,则邮件可能会送不到,或者返回给发件人或者邮件丢失等

    @Component
    public class AmBindDeclare extends AbstractDeclare {
    
    
        @Autowired
        RabbitAdmin rabbitAdmin;
    
        /**
         * 队列与交换机进行绑定
         *
         * @param queueName    队列名称
         * @param exchangeName 交换机名称
         * @param routingKey   路由键
         * @return
         */
        public boolean queueBind(String queueName, String exchangeName, String routingKey) {
            return this.bing(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
        }
    
        /**
         * 交换机和交换机进行绑定
         *
         * @param destExchangeName 目标交换机名称
         * @param exchangeName     交换机名称
         * @param routingKey       路由键
         * @return
         */
        public boolean exchangeBind(String destExchangeName, String exchangeName, String routingKey) {
            return this.bing(destExchangeName, Binding.DestinationType.EXCHANGE, exchangeName, routingKey, null);
        }
    
        /**
         * bind绑定
         *
         * @param destName     目标名称(可以是队列 也可以是交换机)
         * @param type         绑定的类型 交换机 / 队列
         * @param exchangeName 交换机的名称
         * @param routingKey   路由键
         * @param map          结构参数
         * @return
         */
        private boolean bing(String destName, Binding.DestinationType type, String exchangeName, String routingKey, Map<String, Object> map) {
            this.logger.info("bind parameter is destName: " + destName + ", type: " + type.name()
                    + ", exchangeName: " + exchangeName + ", routingKey: " + routingKey + ", map: " + map.toString());
    
            Binding binding = new Binding(destName, Binding.DestinationType.QUEUE, exchangeName, routingKey, map);
            try {
                this.rabbitAdmin.declareBinding(binding);
            } catch (Exception e) {
                if (this.logger.isDebugEnabled()) {
                    RabbitMQExceptionUtils.throwRabbitMQException(e.getMessage());
                }
                return Boolean.FALSE;
            }
    
            return Boolean.TRUE;
        }
    
        @Override
        public void DefinedValidate(Object object) {
    
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,904评论 2 11
  • 介绍 RabbitMQ MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开...
    蒲韧如丝阅读 4,967评论 2 10
  • 概述 RabbitMQ是目前非常热门的一款消息中间件,不管是互联网行业还是传统行业都在大量地使用 。 Rabbit...
    Tian_Peng阅读 1,670评论 1 4
  • 暮色昏沉, 炉火黯淡。 早春未销的雪窗外泛着莹光, 椅上的人沉然入睡。 一切都是少车时, 花还在开, 树叶也还在枝...
    文狐狸尾巴阅读 393评论 2 5
  • 一直听说简书很好,可以在上面写些东西,也总想来试试,却坚持不下来,就这样把简书下载下来又删了。前两天鬼使神差又下载...
    涌哥阅读 165评论 0 0