SpringBoot中一般我们创建RabbitMQ队列以及绑定关系,是通过@Bean的方式,但是RabbitMQ提供了
AmqpAdmin
对象,可以在代码中声明队列以及绑定关系。
读取配置文件的RabbitMQ的组件信息,然后动态的去创建关系。
配置文件格式:
/**
* RabbitMq的队列,交互机,绑定关系的对象
*
*/
@Data
public class RabbitModuleInfo {
/**
* 路由主键
*/
private String routingKey;
/**
* 队列信息
*/
private Queue queue;
/**
* 交换机信息
*/
private Exchange exchange;
/**
* 交换机的详细配置
*/
@Data
public static class Exchange {
/**
* 交互机名称
*/
private String name;
/**
* 交互机类型。
* 默认:直连型号
*/
private ExchangeTypeEnum type = ExchangeTypeEnum.DIRECT;
/**
* 是否持久化
* 默认true:当RabbitMq重启时,消息不丢失
*/
private boolean durable = true;
/**
* 当所有绑定队列都不在使用时,是否自动 删除交换器
* 默认值false:不自动删除,推荐使用。
*/
private boolean autoDelete = false;
/**
* 判断是否是延迟交互机
*/
private boolean delayed;
/**
* 交互机的额外参数
*/
private java.util.Map<String, Object> arguments;
}
/**
* 队列的详细信息
* 提供默认的配置参数
*/
@Data
public static class Queue {
/**
* 队列名
* 必填
*/
private String name;
/**
* 是否持久化
* 默认true:当RabbitMq重启时,消息不丢失
*/
private boolean durable = true;
/**
* 是否具有排他性
* 默认false:可以多个消息者消费同一个队列
*/
private boolean exclusive = false;
/**
* 当消费者客户端均断开连接,是否自动删除队列
* 默认值false:不自动删除,推荐使用,避免消费者断开后,队列中丢弃消息
*/
private boolean autoDelete = false;
/**
* 需要绑定的死信队列的交换机名称
*/
private String deadExchangeName;
/**
* 需要绑定的死信队列的路由key的名称
*/
private String deadRoutingKey;
/**
* 队列的额外参数
*/
private java.util.Map<String, Object> arguments;
}
}
@Data
@ConfigurationProperties(prefix = "my.rabbit")
public class SealRabbitProperty {
/**
* 组件信息(读取配置文件,自动创建队列信息)
*/
private List<RabbitModuleInfo> moduleInfos;
}
配置文件:
my:
rabbit:
module-infos:
# ocr队列
- routing-key:my.routing.xx
queue:
name:my.queue.xx
exchange:
name: my.exchage.xx
- routing-key: my.routing.yy
queue:
name:my.queue.yy
exchange:
name: my.exchage.yy
type: FANOUT
声明操作类(容器中单例bean创建完毕,执行回调方法):
@Bean
@ConditionalOnMissingBean
public DeclareRabbitModule declareRabbitModule(RegisterRabbitModule rabbitModule, SealRabbitProperty sealRabbitProperty) {
return new DeclareRabbitModule(rabbitModule, sealRabbitProperty);
}
@Slf4j
public class RegisterRabbitModule {
private AmqpAdmin amqpAdmin;
public void setAmqpAdmin(AmqpAdmin amqpAdmin) {
this.amqpAdmin = amqpAdmin;
}
/**
* 注册RabbitMq的组件信息
*
* @param queue 队列对象
* @param exchange 交换机对象
* @param binding 队列与交换机绑定关系对象
*/
public void registerModule(Queue queue, Exchange exchange, Binding binding) {
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(binding);
}
}
@Slf4j
public class DeclareRabbitModule implements SmartInitializingSingleton {
/**
* 死信队列 交换机标识符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
private AmqpAdmin amqpAdmin;
private SealRabbitProperty sealRabbitProperty;
public DeclareRabbitModule(AmqpAdmin amqpAdmin, SealRabbitProperty sealRabbitProperty) {
this.amqpAdmin = amqpAdmin;
this.sealRabbitProperty = sealRabbitProperty;
}
@Override
public void afterSingletonsInstantiated() {
log.info("动态创建MQ配置信息...");
declareModule();
}
private void declareModule() {
//获取组件信息
List<RabbitModuleInfo> moduleInfos = sealRabbitProperty.getModuleInfos();
if (CollectionUtils.isEmpty(moduleInfos)) {
log.warn("配置文件中不含有组件信息,不进行注册!");
return;
}
//注册组件信息
for (RabbitModuleInfo moduleInfo : moduleInfos) {
//数据校验
declareValidate(moduleInfo);
//获取队列
Queue queue = tranSealQueue(moduleInfo.getQueue());
//获取交换机
Exchange exchange = tranSealExchange(moduleInfo.getExchange());
//绑定关系
Binding binding = new Binding(queue.getName(), Binding.DestinationType.QUEUE,
exchange.getName(), moduleInfo.getRoutingKey(), null);
//创建队列
amqpAdmin.declareQueue(queue);
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(binding);
}
}
/**
* 声明模块数据校验
*
* @param moduleInfo 配置文件的模块信息
*/
public void declareValidate(RabbitModuleInfo moduleInfo) {
//判断关键参数是否存在且合法
if (moduleInfo.getRoutingKey() == null) {
throw new RabbitDeclareModuleException("RoutingKey 不存在!");
}
if (moduleInfo.getExchange() == null) {
throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置exchange!", moduleInfo.getRoutingKey()));
}
if (moduleInfo.getExchange().getName() == null) {
throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置exchange的name属性!", moduleInfo.getRoutingKey()));
}
if (moduleInfo.getQueue() == null) {
throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置queue!", moduleInfo.getRoutingKey()));
}
if (moduleInfo.getQueue().getName() == null) {
throw new RabbitDeclareModuleException(String.format("my-rabbit的配置文件中,routingKey:[%s]未配置queue的name属性!", moduleInfo.getRoutingKey()));
}
}
/**
* 队列的对象转换
*
* @param queue 自定义的队列信息
* @return RabbitMq的Queue对象
*/
private Queue tranSealQueue(RabbitModuleInfo.Queue queue) {
Map<String, Object> arguments = queue.getArguments();
//判断是否需要绑定死信队列
if (queue.getDeadExchangeName() != null && queue.getDeadRoutingKey() != null) {
//设置响应参数
if (queue.getArguments() == null) {
arguments = new HashMap<>(2);
}
arguments.put(DEAD_LETTER_QUEUE_KEY, queue.getDeadExchangeName());
arguments.put(DEAD_LETTER_ROUTING_KEY, queue.getDeadRoutingKey());
}
return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
}
/**
* 将配置信息转换交换机信息
*
* @param exchangeInfo 自定义交换机信息
* @return RabbitMq的Exchange的信息
*/
private Exchange tranSealExchange(RabbitModuleInfo.Exchange exchangeInfo) {
AbstractExchange exchange = null;
//判断类型
switch (exchangeInfo.getType()) {
//直连模式
case DIRECT:
exchange = new DirectExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
break;
//广播模式:
case FANOUT:
exchange = new FanoutExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
break;
//通配符模式
case TOPIC:
exchange = new TopicExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
break;
case HEADERS:
exchange = new HeadersExchange(exchangeInfo.getName(), exchangeInfo.isDurable(), exchangeInfo.isAutoDelete(), exchangeInfo.getArguments());
break;
}
//设置延迟队列
exchange.setDelayed(exchangeInfo.isDelayed());
return exchange;
}
}
在项目启动后,便会去读取配置文件的信息,然后去创建RabbitMQ的组件信息,实现了配置集中管理。