1.1 Spring AMQP 介绍
Spring AMQP是基于AMQP的消息传递解决方案的Spring实现。Spring AMQP为我们提供了一个“模板”,作为发送和接收消息的高级抽象。
1.2 Spring AMQP核心组件
- RabbitAdmin:管控组件
- Spring AMQP声明
- RabbitTemplate:消息模版组件
- SimpleMessageListenerContainer:简单消息监听容器
- MessageListenerAdapter:消息适配器
- MessageConverter:消息转换器
1.3 RabbitAdmin介绍
RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可
注意:autoStartup必须要设置为true,否则Spring容器在启动时不会加载RabbitAdmin
RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明
然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作,例如:添加一个交换机、删除一个绑定、 清空一个队列里的消息等等
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
1.4 代码演示
添加依赖
SpringBoot 版本2.0.1以上的版本仅需添加<artifactId>spring-boot-starter-amqp</artifactId>jar包, spring-boot-starter-amqp已经包含amqp-client了 ,否则会jar包冲突。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitAdminConfig
@Configuration
@ComponentScan({"com.example.springamqp.*"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
//注入并配置ConnectionFactory
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("host:prot");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
//注入RabbitAdmin
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
RabbitAdmin功能演示
@SpringBootTest
class RabbitmqSpringApplicationTests {
@Autowired
RabbitAdmin rabbitAdmin;
@Test
void testAdmin() throws Exception {
// 创建一个Direct Exchange,不持久化,不自动删除,new DirectExchange(exchangeName, durable, autoDelete)
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
// 创建一个Queue,new Queue(queueName, durable)
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
// 建立绑定关系
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
// 另一种创建queue和exchenger的方式,在绑定时一步到位
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.topic.queue", false)) //直接创建队列
.to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系
.with("user.#")); //指定路由Key
rabbitAdmin.declareBinding(
BindingBuilder
.bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空队列数据
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
}
1.5 声明式配置
通过@Bean注解,注入到Spring中使用
java
@Bean
public TopicExchange exchange001() {
return new TopicExchange("topic001", true, false);
}
@Bean
public Queue queue001() {
return new Queue("queue001", true); //队列持久化
}
@Bean
public Binding binding001() {
return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");
}
```
#### 1.6 RabbitTemplate
* **消息模板组件**,我们在与SpringAMQP整合的时候进行发送消息的关键类,需要进行注入到Spring容器中,然后直接使用
* 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口 ReturnCallback等等。
* 在与Spring整合时需要实例化,但是在与SrpingBoot整合时,在配置文件里添加配置即可
###### 代码演示
```java
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
使用RabbitTemplate发送消息方式一
@Test
public void testSendMessage() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "信息描述..");
messageProperties.getHeaders().put("type", "自定义消息类型..");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
System.err.println("------添加额外的设置---------");
message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
return message;
}
});
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
}
使用RabbitTemplate发送消息方式二
@Test
public void testSendMessage4Text() throws Exception {
//1 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.send("topic002", "rabbit.abc", message);
}
1.7 SimpleMessageListenerContainer
简单消息监听容器,这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
监听队列(多个队列)、自动启动、自动声明功能
设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
设置消费者数量、最小最大数量、批量消费
设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
设置消费者标签生成策略、是否独占模式、消费者属性等
设置具体的监听器、消息转换器等等
注意:SimpleMessageListenerContainer可以进行动态设置, 比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等,很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特去实现的。所以可以看出SpringAMQP非常的强大
代码演示
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//监控队列,可以同时监控多个
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//设置当前消费者数量
container.setConcurrentConsumers(1);
//指的是消费者的最大并行度是多少
container.setMaxConcurrentConsumers(5);
//设置是否重回队列
container.setDefaultRequeueRejected(false);
//设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置消费端标签策略:就是在消费端生成自己的标签时可以指定一个生成策略
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//设置消息监听器
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.err.println("----------消费者: " + msg);
}
});
return container;
}
结束语
本文的实质为慕课网阿神老师的《RabbitMQ消息中间件技术精讲》中关于rabbitAdmin的讲课内容的总结
关于RabbitAdmin的更详细的介绍,建议移步"若汐缘"的
《RabbitMQ整合Spring AMQP(一)》,地址:https://www.jianshu.com/p/13d8c4a284b3