本博文从某个角度来说,应该是rabbitMQ应用开发的知识点梳理,使用目前最广泛流行的SpringBoot来集成rabbitMQ的功能开发,在开发的过程中讲解rabbitMQ的各种特性、技能点以及开发过程中需要注意的地方
上一篇博文Centos7下RabbitMQ的搭建我们已经搭建好RabbitMQ的服务器,并且简单的介绍了MQ服务中主要的几个概念,各个概念的主要作用以及为了实现消息的发送和接收机制,各个概念之间的依存关系。也简单的介绍了消息生产和消费的业务流程图,大致了的了解了RabbitMQ的整体的概念模型,但是talk is cheap, show me the code
,所以为了更加真实的了解如何使用代码来实现的,下面我们就SpringBoot来整合RabbitMQ的开发环境,并且简单了解具体的技术参数的运用
上一篇博文:
SpringBoot集成RabbitMQ
SpringBoot集成RabbitMQ非常简单,只需要在maven依赖中添加依赖即可
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后再application.yml文件中添加rabbitMQ的配置信息
spring:
rabbitmq:
addresses: 192.168.56.105:5672
username: root
password: root
virtual-host: /
这样一个简单的rabbitMQ的开发环境就搭建好了,下面我们开始编写交换机创建和队形注册rabbitMQ的相关功能,最好是能独立成一个简单通用的jar被其他项目复用,下面我们来新建个项目,有两个子module,其中一个是消费者,另外一个是生产者。具体的目录结构如下:
具体的项目可以参考我的gitee项目:Gitee-rabbitmq
[图片上传失败...(image-e8534f-1561823748958)]
其中common主要是存放消费者和生产者共同的代码,比如一些常量和通用配置类的信息
连接RabbitMQ
系统应该在一启动的时候就连接RabbitMQ,我们这里不采用SpringBoot默认提供的RabbitTemplate,这样不方便我们自定义扩展的属性,所以我们手动注册ConnectionFactory,并且提供RabbitAdmin来进行交换机、队列以及路由绑定的管理
@Configuration
public class RabbitMQConfig {
/**
* rabbitMQ服务器的地址
*/
@Value("${spring.rabbitmq.addresses:192.168.56.105:5672}")
private String addresses;
/**
* rabbitMQ用户名
*/
@Value("${spring.rabbitmq.username:root}")
private String username;
/**
* rabbitMQ密码
*/
@Value("${spring.rabbitmq.password:root}")
private String password;
/**
* rabbitMQ虚拟机 这里默认 /
*/
@Value("${spring.rabbitmq.virtual-host:/}")
private String virtualHost;
/**
* 注册rabbitMQ的Connection
*
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(this.addresses);
cachingConnectionFactory.setUsername(this.username);
cachingConnectionFactory.setPassword(this.password);
cachingConnectionFactory.setVirtualHost(this.virtualHost);
return cachingConnectionFactory;
}
/**
* 注册rabbitAdmin 方便管理
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
交换机的管理
Exchange和Queueshi AMQP中的high-level层面的构建模块,应用程序应该提供对其的管理,我们首先来实现对交换机的管理
交换机的特性
- exchangeName: 交换机的名称
- type: 交换机的类型
- durable: 是否持久化
- autoDelete: 是否自动删除
- internal: 是否内置交换机
- argument: 结构化参数
public class MqExchage {
/**
* 交换机的名称
*/
@NotNull(message = "交换机名称不能为空")
private String name;
/**
* 交换机的类型
*/
@NotNull(message = "交换机类型不能为空")
private ExchangeTypeEnum type;
/**
* 是否持久化
* 持久化可以将交换机存盘,在服务器重启的时候不会丢失相关的信息
*/
private boolean durable;
/**
* 是否自动删除
* 自动删除的前提是至少有一个队列或者交换机与这个交互机绑定,之后所有与这个交换机绑定的队列或者交换机都与此解绑
*/
private boolean autoDelete;
public MqExchage name(String name) {
this.name = name;
return this;
}
public MqExchage type(ExchangeTypeEnum type) {
this.type = type;
return this;
}
public MqExchage durable(boolean durable) {
this.durable = durable;
return this;
}
public MqExchage autoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
return this;
}
/**
* 自定义属性参数
* 比如:alternate-exchange
*/
private Map<String, Object> arguments;
public class MqExchage {
/**
* 交换机的名称
*/
@NotNull(message = "交换机名称不能为空")
private String name;
/**
* 交换机的类型
*/
@NotNull(message = "交换机类型不能为空")
private ExchangeTypeEnum type;
/**
* 是否持久化
* 持久化可以将交换机存盘,在服务器重启的时候不会丢失相关的信息
*/
private boolean durable;
/**
* 是否自动删除
* 自动删除的前提是至少有一个队列或者交换机与这个交互机绑定,之后所有与这个交换机绑定的队列或者交换机都与此解绑
*/
private boolean autoDelete;
public MqExchage name(String name) {
this.name = name;
return this;
}
public MqExchage type(ExchangeTypeEnum type) {
this.type = type;
return this;
}
public MqExchage durable(boolean durable) {
this.durable = durable;
return this;
}
public MqExchage autoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
return this;
}
/**
* 自定义属性参数
* 比如:alternate-exchange
*/
private Map<String, Object> arguments;
}
交换机的注册和销毁
//通用的Declare
public abstract class AbstractDeclare {
public final Log logger = LogFactory.getLog(this.getClass());
@Autowired
RabbitAdmin rabbitAdmin;
/**
* 自定义的校验
*
* @param object
*/
public abstract void DefinedValidate(Object object);
/**
* 通用校验
* 1. 校验字段是否是非空
*
* @param object
*/
public void validate(Object object) {
Result result = ValidateUtils.validate(object);
if (!ResultEnum.success().equals(result.getCode())) {
RabbitMQExceptionUtils.throwRabbitMQException(result.getMsg());
}
this.DefinedValidate(object);
}
}
@Component
public class AmExchangeDeclare extends AbstractDeclare {
@Autowired
RabbitAdmin rabbitAdmin;
/**
* 向rabbitMQ服务器注册指定的交换机以及交换机的类型
*
* @param mqExchage
* @return
*/
public Exchange declareExchange(MqExchage mqExchage) {
this.logger.info("declare exchange is :" + mqExchage.toString());
Exchange exchange = null;
super.validate(mqExchage);
exchange = this.initExchange(mqExchage);
this.rabbitAdmin.declareExchange(exchange);
this.logger.info("declare exchange success");
return exchange;
}
/**
* 从RabbitMQ服务端上删除指定的交换机
*
* @param exchangeName
* @return
*/
public boolean deleteExchange(String exchangeName) {
this.logger.info("delete exchange is : " + exchangeName);
if (StringUtils.isEmpty(exchangeName)) {
throw new RabbitMQException("the parameter exchangeName couldn't not be null");
}
return this.rabbitAdmin.deleteExchange(exchangeName);
}
/**
* 根据不同类型初始化不同类型的交换机
*
* @param mqExchage
* @return
*/
private Exchange initExchange(MqExchage mqExchage) {
ExchangeTypeEnum exchangeTypeEnum = mqExchage.getType();
switch (exchangeTypeEnum) {
case DIRECT:
return new DirectExchange(mqExchage.getName(), mqExchage.isDurable(), mqExchage.isAutoDelete(), mqExchage.getArguments());
case TOPIC:
return new TopicExchange(mqExchage.getName(), mqExchage.isDurable(), mqExchage.isAutoDelete(), mqExchage.getArguments());
case FANOUT:
return new FanoutExchange(mqExchage.getName(), mqExchage.isDurable(), mqExchage.isAutoDelete(), mqExchage.getArguments());
case HEADERS:
return new HeadersExchange(mqExchage.getName(), mqExchage.isDurable(), mqExchage.isAutoDelete(), mqExchage.getArguments());
default:
return null;
}
}
/**
* 自定义校验规则
*
* @param object
*/
@Override
public void DefinedValidate(Object object) {
}
}
队列的管理
队列是存储消息的载体,默认没有指定交换机的哈,队列的创建都是绑定RabbitMQ提供的默认的交换机,队列的创建和维护与交换机比较类似,见如下的代码:
Queue的管理
public class MqQueue {
/**
* 队列名称
*/
@NotNull(message = "队列名称不能为空")
private String name;
/**
* 是否持久化
* 持久化会存盘,服务器重启时不会丢失相关信息
*/
private boolean durable;
/**
* 是否排他
* 如果是排他,则该队列对首次声明他的连接有效,并在连接断开时自动删除
* 注意:
* 1. 同一个连接的其他的Channel是可以连接该排他队列的
* 2. 首次是说其他连接就不同创建同名的排他队列
* 适用于一个客户端同时发送和读取消息
*/
private boolean exclusive;
/**
* 是否自动删除
* 自动删除的前提是至少有一个队列或者交换机与这个交互机绑定,之后所有与这个交换机绑定的队列或者交换机都与此解绑
*/
private boolean autoDelete;
/**
* 结构化参数
* x-message-ttl、x-expires等
*/
private Map<String, Object> arguments;
public MqQueue name(String name) {
this.name = name;
return this;
}
public MqQueue durable(boolean durable) {
this.durable = durable;
return this;
}
public MqQueue exclusive(boolean exclusive) {
this.exclusive = exclusive;
return this;
}
public MqQueue autoDelete(boolean autoDelete) {
this.autoDelete = autoDelete;
return this;
}
public MqQueue arguments(Map<String, Object> arguments) {
this.arguments = arguments;
return this;
}
}
@Component
public class AmQueueDeclare extends AbstractDeclare {
/**
* 声明队列
* 向rabbitMQ服务器声明一个队列
*
* @param mqQueue
* @return
*/
public Queue declareQueue(MqQueue mqQueue) {
this.logger.info("the parameter queue is : " + mqQueue.toString());
super.validate(mqQueue);
Queue queue = new Queue(mqQueue.getName());
BeanUtils.copyProperties(mqQueue, queue);
this.logger.info("declare queue is : " + queue.toString());
super.rabbitAdmin.declareQueue(queue);
this.logger.info("declare queue success");
return queue;
}
/**
* 清空队列中的消息
*
* @param queueName
* @return 清楚队列中的消息的个数
*/
public int purgeQueue(String queueName) {
if (StringUtils.isEmpty(queueName)) {
RabbitMQExceptionUtils.throwRabbitMQException();
}
this.logger.info("purge queue is : " + queueName);
return super.rabbitAdmin.purgeQueue(queueName);
}
/**
* 判断指定的队列是否存在
* 1. 如果存在则返回该队列
* 2. 如果不存在则返回null
*
* @param queueName
* @return true 存在, false 不存在
*/
public boolean isQueueExist(String queueName) {
if (StringUtils.isEmpty(queueName)) {
RabbitMQExceptionUtils.throwRabbitMQException();
}
this.logger.info("isQueueExist queue is : " + queueName);
String isExist = super.rabbitAdmin.getRabbitTemplate().execute((channel -> {
try {
AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive(queueName);
return declareOk.getQueue();
} catch (Exception e) {
if (this.logger.isDebugEnabled()) {
RabbitMQExceptionUtils.throwRabbitMQException(e.getMessage());
}
return null;
}
}));
this.logger.info("the queue " + queueName + " is exist : " + isExist);
return StringUtils.isEmpty(isExist) ? Boolean.FALSE : Boolean.TRUE;
}
/**
* 从rabbitMQ服务器中删除指定的队列
*
* @param queueName
* @return
*/
public boolean deleteQueue(String queueName) {
this.logger.info("delete queue is :" + queueName);
if (StringUtils.isEmpty(queueName)) {
RabbitMQExceptionUtils.throwRabbitMQException();
}
return super.rabbitAdmin.deleteQueue(queueName);
}
/**
* 从rabbitMQ服务器中删除指定的队列
*
* @param queueName 队列名称
* @param unused 队列是否在使用,如果设置为true则该队列只能在没有被使用的情况下才能删除
* @param empty 队列是否为空,如果设置为true则该队列只能在该队列没有消息时才会被删除
*/
public void deleteQueue(String queueName, boolean unused, boolean empty) {
this.logger.info("delete queue is : { queueName : '" + queueName
+ "' , unused: '" + unused + "' , empty:'" + empty + "'}");
if (StringUtils.isEmpty(queueName)) {
RabbitMQExceptionUtils.throwRabbitMQException();
}
super.rabbitAdmin.deleteQueue(queueName, unused, empty);
}
/**
* 自定义的校验
*
* @param object
*/
@Override
public void DefinedValidate(Object object) {
}
}
Bing绑定
上一篇博文中我们详细介绍了交换机和队列之间的关系:生产者将消息投递给交换机,然后由交换器路由到一个或多个队列中,在这里我们需要重点理解下,交换机是如何路由到对应的队列?也可以理解为:交换机如何和队列进行绑定。
在理解前,我们需要知道两个概念:RoutingKey和BindingKey
生产者将消息发送给交换机,一般会指定一个RoutingKey,通过这个指定这个消息的路由规则,而这个RoutingKey需要与交换机的类型和Bindingkey联合使用才能生效。
RoutingKey是决定消息的流向的,生产者将消息发送给交换机时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会被路由到对应的队列中。
BindingKey可以是相同的,但是在Fanout类型的交换机时不生效的
上面说的可能比较晦涩,我们打个比方应该就比较容易理解了
还是之前我们的邮局投递邮件,RoutingKey相当于邮件上的地址,而BindingKey则相当于邮件的目的地,当地址和目的地是相匹配的时候,收件人(队列)才能收到邮件,如果不一致的话,则邮件可能会送不到,或者返回给发件人或者邮件丢失等
@Component
public class AmBindDeclare extends AbstractDeclare {
@Autowired
RabbitAdmin rabbitAdmin;
/**
* 队列与交换机进行绑定
*
* @param queueName 队列名称
* @param exchangeName 交换机名称
* @param routingKey 路由键
* @return
*/
public boolean queueBind(String queueName, String exchangeName, String routingKey) {
return this.bing(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
}
/**
* 交换机和交换机进行绑定
*
* @param destExchangeName 目标交换机名称
* @param exchangeName 交换机名称
* @param routingKey 路由键
* @return
*/
public boolean exchangeBind(String destExchangeName, String exchangeName, String routingKey) {
return this.bing(destExchangeName, Binding.DestinationType.EXCHANGE, exchangeName, routingKey, null);
}
/**
* bind绑定
*
* @param destName 目标名称(可以是队列 也可以是交换机)
* @param type 绑定的类型 交换机 / 队列
* @param exchangeName 交换机的名称
* @param routingKey 路由键
* @param map 结构参数
* @return
*/
private boolean bing(String destName, Binding.DestinationType type, String exchangeName, String routingKey, Map<String, Object> map) {
this.logger.info("bind parameter is destName: " + destName + ", type: " + type.name()
+ ", exchangeName: " + exchangeName + ", routingKey: " + routingKey + ", map: " + map.toString());
Binding binding = new Binding(destName, Binding.DestinationType.QUEUE, exchangeName, routingKey, map);
try {
this.rabbitAdmin.declareBinding(binding);
} catch (Exception e) {
if (this.logger.isDebugEnabled()) {
RabbitMQExceptionUtils.throwRabbitMQException(e.getMessage());
}
return Boolean.FALSE;
}
return Boolean.TRUE;
}
@Override
public void DefinedValidate(Object object) {
}
}