RabbitMQ整合Spring AMQP实战——RabbitAdmin

1.1 Spring AMQP 介绍

Spring AMQP是基于AMQP的消息传递解决方案的Spring实现。Spring AMQP为我们提供了一个“模板”,作为发送和接收消息的高级抽象。

1.2 Spring AMQP核心组件

  • RabbitAdmin:管控组件
  • Spring AMQP声明
  • RabbitTemplate:消息模版组件
  • SimpleMessageListenerContainer:简单消息监听容器
  • MessageListenerAdapter:消息适配器
  • MessageConverter:消息转换器

1.3 RabbitAdmin介绍

  • RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可

  • 注意:autoStartup必须要设置为true,否则Spring容器在启动时不会加载RabbitAdmin

  • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明

  • 然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作,例如:添加一个交换机、删除一个绑定、 清空一个队列里的消息等等

@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  rabbitAdmin.setAutoStartup(true);
  return rabbitAdmin;
}

1.4 代码演示

添加依赖

SpringBoot 版本2.0.1以上的版本仅需添加<artifactId>spring-boot-starter-amqp</artifactId>jar包, spring-boot-starter-amqp已经包含amqp-client了 ,否则会jar包冲突。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitAdminConfig
@Configuration
@ComponentScan({"com.example.springamqp.*"})
public class RabbitMQConfig {
    
    @Bean
    public ConnectionFactory connectionFactory() {
        //注入并配置ConnectionFactory
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("host:prot");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }
    
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        //注入RabbitAdmin
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}
RabbitAdmin功能演示
@SpringBootTest
class RabbitmqSpringApplicationTests {

    @Autowired
    RabbitAdmin rabbitAdmin;

    @Test
    void testAdmin() throws Exception {
        // 创建一个Direct Exchange,不持久化,不自动删除,new DirectExchange(exchangeName, durable, autoDelete)
        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));

        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
        // 创建一个Queue,new Queue(queueName, durable)
        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
             // 建立绑定关系
        rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                Binding.DestinationType.QUEUE,
                "test.direct", "direct", new HashMap<>()));
      
            // 另一种创建queue和exchenger的方式,在绑定时一步到位
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test.topic.queue", false))     //直接创建队列
                        .to(new TopicExchange("test.topic", false, false))  //直接创建交换机 建立关联关系
                        .with("user.#"));   //指定路由Key

        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test.fanout.queue", false))
                        .to(new FanoutExchange("test.fanout", false, false)));
        
        //清空队列数据
        rabbitAdmin.purgeQueue("test.topic.queue", false);
    }
}

1.5 声明式配置

通过@Bean注解,注入到Spring中使用

java
     @Bean  
    public TopicExchange exchange001() {  
        return new TopicExchange("topic001", true, false);  
    }  

    @Bean  
    public Queue queue001() {  
        return new Queue("queue001", true); //队列持久化  
    }  
    
    @Bean  
    public Binding binding001() {  
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
    }  
    ```
#### 1.6 RabbitTemplate

* **消息模板组件**,我们在与SpringAMQP整合的时候进行发送消息的关键类,需要进行注入到Spring容器中,然后直接使用

* 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口 ReturnCallback等等。

* 在与Spring整合时需要实例化,但是在与SrpingBoot整合时,在配置文件里添加配置即可

###### 代码演示

```java
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  return rabbitTemplate;
}

使用RabbitTemplate发送消息方式一

@Test
    public void testSendMessage() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "信息描述..");
        messageProperties.getHeaders().put("type", "自定义消息类型..");
        Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
        
        rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.err.println("------添加额外的设置---------");
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    
    rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
        rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
    }

使用RabbitTemplate发送消息方式二

@Test
    public void testSendMessage4Text() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("mq 消息1234".getBytes(), messageProperties);
        
        rabbitTemplate.send("topic001", "spring.abc", message);
        rabbitTemplate.send("topic002", "rabbit.abc", message);
    }

1.7 SimpleMessageListenerContainer

  • 简单消息监听容器,这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足

  • 监听队列(多个队列)、自动启动、自动声明功能

  • 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等

  • 设置消费者数量、最小最大数量、批量消费

  • 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数

  • 设置消费者标签生成策略、是否独占模式、消费者属性等

  • 设置具体的监听器、消息转换器等等

注意:SimpleMessageListenerContainer可以进行动态设置, 比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等,很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特去实现的。所以可以看出SpringAMQP非常的强大

代码演示
        @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //监控队列,可以同时监控多个
        container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
        //设置当前消费者数量
        container.setConcurrentConsumers(1);
        //指的是消费者的最大并行度是多少
        container.setMaxConcurrentConsumers(5);
        //设置是否重回队列
        container.setDefaultRequeueRejected(false);
        //设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //设置消费端标签策略:就是在消费端生成自己的标签时可以指定一个生成策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
        
        //设置消息监听器
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.err.println("----------消费者: " + msg);
            }
        });
        return container;
    }
结束语

本文的实质为慕课网阿神老师的《RabbitMQ消息中间件技术精讲》中关于rabbitAdmin的讲课内容的总结
关于RabbitAdmin的更详细的介绍,建议移步"若汐缘"的
《RabbitMQ整合Spring AMQP(一)》,地址:https://www.jianshu.com/p/13d8c4a284b3

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容