SpringBoot中RabbitMQ动态创建队列以及绑定关系(AmqpAdmin)

SpringBoot中一般我们创建RabbitMQ队列以及绑定关系,是通过@Bean的方式,但是RabbitMQ提供了AmqpAdmin对象,可以在代码中声明队列以及绑定关系。

读取配置文件的RabbitMQ的组件信息,然后动态的去创建关系。

配置文件格式:

/**
 * RabbitMq的队列,交互机,绑定关系的对象
 *
 */
@Data
public class RabbitModuleInfo {

    /**
     * 路由主键
     */
    private String routingKey;

    /**
     * 队列信息
     */
    private Queue queue;

    /**
     * 交换机信息
     */
    private Exchange exchange;

    /**
     * 交换机的详细配置
     */
    @Data
    public static class Exchange {

        /**
         * 交互机名称
         */
        private String name;

        /**
         * 交互机类型。
         * 默认:直连型号
         */
        private ExchangeTypeEnum type = ExchangeTypeEnum.DIRECT;

        /**
         * 是否持久化
         * 默认true:当RabbitMq重启时,消息不丢失
         */
        private boolean durable = true;

        /**
         * 当所有绑定队列都不在使用时,是否自动 删除交换器
         * 默认值false:不自动删除,推荐使用。
         */
        private boolean autoDelete = false;

        /**
         * 判断是否是延迟交互机
         */
        private boolean delayed;

        /**
         * 交互机的额外参数
         */
        private java.util.Map<String, Object> arguments;

    }


    /**
     * 队列的详细信息
     * 提供默认的配置参数
     */
    @Data
    public static class Queue {

        /**
         * 队列名
         * 必填
         */
        private String name;

        /**
         * 是否持久化
         * 默认true:当RabbitMq重启时,消息不丢失
         */
        private boolean durable = true;

        /**
         * 是否具有排他性
         * 默认false:可以多个消息者消费同一个队列
         */
        private boolean exclusive = false;

        /**
         * 当消费者客户端均断开连接,是否自动删除队列
         * 默认值false:不自动删除,推荐使用,避免消费者断开后,队列中丢弃消息
         */
        private boolean autoDelete = false;

        /**
         * 需要绑定的死信队列的交换机名称
         */
        private String deadExchangeName;

        /**
         * 需要绑定的死信队列的路由key的名称
         */
        private String deadRoutingKey;

        /**
         * 队列的额外参数
         */
        private java.util.Map<String, Object> arguments;

    }


}
@Data
@ConfigurationProperties(prefix = "my.rabbit")
public class SealRabbitProperty {


    /**
     * 组件信息(读取配置文件,自动创建队列信息)
     */
    private List<RabbitModuleInfo> moduleInfos;

}

配置文件:

my:
  rabbit:
    module-infos:
      # ocr队列
      - routing-key:my.routing.xx
        queue:
          name:my.queue.xx
        exchange:
          name: my.exchage.xx
      - routing-key: my.routing.yy
        queue:
          name:my.queue.yy
        exchange:
          name: my.exchage.yy
          type: FANOUT

声明操作类(容器中单例bean创建完毕,执行回调方法):

    @Bean
    @ConditionalOnMissingBean
    public DeclareRabbitModule declareRabbitModule(RegisterRabbitModule rabbitModule, SealRabbitProperty sealRabbitProperty) {
        return new DeclareRabbitModule(rabbitModule, sealRabbitProperty);
    }
@Slf4j
public class RegisterRabbitModule {

    private AmqpAdmin amqpAdmin;

    public void setAmqpAdmin(AmqpAdmin amqpAdmin) {
        this.amqpAdmin = amqpAdmin;
    }


    /**
     * 注册RabbitMq的组件信息
     *
     * @param queue    队列对象
     * @param exchange 交换机对象
     * @param binding  队列与交换机绑定关系对象
     */
    public void registerModule(Queue queue, Exchange exchange, Binding binding) {
        amqpAdmin.declareQueue(queue);
        amqpAdmin.declareExchange(exchange);
        amqpAdmin.declareBinding(binding);
    }


}

@Slf4j
public class DeclareRabbitModule implements SmartInitializingSingleton {

    /**
     * 死信队列 交换机标识符
     */
    public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
    /**
     * 死信队列交换机绑定键标识符
     */
    public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";


    private AmqpAdmin amqpAdmin;

    private SealRabbitProperty sealRabbitProperty;


    public DeclareRabbitModule(AmqpAdmin amqpAdmin, SealRabbitProperty sealRabbitProperty) {
        this.amqpAdmin = amqpAdmin;
        this.sealRabbitProperty = sealRabbitProperty;
    }

    @Override
    public void afterSingletonsInstantiated() {
        log.info("动态创建MQ配置信息...");
        declareModule();
    }

    private void declareModule() {
        //获取组件信息
        List<RabbitModuleInfo> moduleInfos = sealRabbitProperty.getModuleInfos();
        if (CollectionUtils.isEmpty(moduleInfos)) {
            log.warn("配置文件中不含有组件信息,不进行注册!");
            return;
        }
        //注册组件信息
        for (RabbitModuleInfo moduleInfo : moduleInfos) {
            //数据校验
            declareValidate(moduleInfo);
            //获取队列
            Queue queue = tranSealQueue(moduleInfo.getQueue());
            //获取交换机
            Exchange exchange = tranSealExchange(moduleInfo.getExchange());
            //绑定关系
            Binding binding = new Binding(queue.getName(), Binding.DestinationType.QUEUE,
                    exchange.getName(), moduleInfo.getRoutingKey(), null);
            //创建队列
            amqpAdmin.declareQueue(queue);
            amqpAdmin.declareExchange(exchange);
            amqpAdmin.declareBinding(binding);
        }
    }


    /**
     * 声明模块数据校验
     *
     * @param moduleInfo 配置文件的模块信息
     */
    public void declareValidate(RabbitModuleInfo moduleInfo) {
        //判断关键参数是否存在且合法
        if (moduleInfo.getRoutingKey() == null) {
            throw new RabbitDeclareModuleException("RoutingKey 不存在!");
        }

        if (moduleInfo.getExchange() == null) {
            throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置exchange!", moduleInfo.getRoutingKey()));
        }
        if (moduleInfo.getExchange().getName() == null) {
            throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置exchange的name属性!", moduleInfo.getRoutingKey()));
        }
        if (moduleInfo.getQueue() == null) {
            throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置queue!", moduleInfo.getRoutingKey()));
        }
        if (moduleInfo.getQueue().getName() == null) {
            throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置queue的name属性!", moduleInfo.getRoutingKey()));
        }
    }


    /**
     * 队列的对象转换
     *
     * @param queue 自定义的队列信息
     * @return RabbitMq的Queue对象
     */
    private Queue tranSealQueue(RabbitModuleInfo.Queue queue) {
        Map<String, Object> arguments = queue.getArguments();
        //判断是否需要绑定死信队列
        if (queue.getDeadExchangeName() != null && queue.getDeadRoutingKey() != null) {
            //设置响应参数
            if (queue.getArguments() == null) {
                arguments = new HashMap<>(2);
            }
            arguments.put(DEAD_LETTER_QUEUE_KEY, queue.getDeadExchangeName());
            arguments.put(DEAD_LETTER_ROUTING_KEY, queue.getDeadRoutingKey());
        }
        return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
    }


    /**
     * 将配置信息转换交换机信息
     *
     * @param exchangeInfo 自定义交换机信息
     * @return RabbitMq的Exchange的信息
     */
    private Exchange tranSealExchange(RabbitModuleInfo.Exchange exchangeInfo) {
        AbstractExchange exchange = null;
        //判断类型
        switch (exchangeInfo.getType()) {
            //直连模式
            case DIRECT:
                exchange = new DirectExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
                break;
            //广播模式:
            case FANOUT:
                exchange = new FanoutExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
                break;
            //通配符模式
            case TOPIC:
                exchange = new TopicExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
                break;
            case HEADERS:
                exchange = new HeadersExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
                break;
        }
        //设置延迟队列
        exchange.setDelayed(exchangeInfo.isDelayed());
        return exchange;
    }

}

在项目启动后,便会去读取配置文件的信息,然后去创建RabbitMQ的组件信息,实现了配置集中管理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容