spring-boot-starter-amqp 那些文档里没告诉你的事情

spring-boot 非常简便,但是文档不全、背后隐藏了大量配置细节、门槛比较高。这就需要阅读源码才能随心所欲得使用。下面配合源码来解释下官方文档中配置背后的原理。

启动配置路径:一个入口拉出一串螃蟹。配置都在这几个类里,之后还会提到。

  • RabbitAutoConfiguration
  • -> RabbitAnnotationDrivenConfiguration
  • -> EnableRabbitConfiguration
  • -> @EnableRabbit
  • -> RabbitBootstrapConfiguration

首先是AMQP三大件的申明,这段代码有啥用?

@SpringBootApplication
public class Application {
    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
    }
    ...
}
  • RabbitAdmin会读取所有的Queue/Exchange/Binding类型的Bean(包括它们的集合),并在启动时向Rabbit Server注册。
  • RabbitAdmin#afterPropertiesSet

    @Override
    public void afterPropertiesSet() {
        ...
        this.connectionFactory.addConnectionListener(new ConnectionListener() {
            @Override
            public void onCreate(Connection connection) {
                initialize();
            }
        })
        ...
    }
  • RabbitAdmin#initialize
public void initialize() {
    ...
    this.rabbitTemplate.execute(new ChannelCallback<Object>() {
        @Override
        public Object doInRabbit(Channel channel) throws Exception {
            declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
            declareQueues(channel, queues.toArray(new Queue[queues.size()]));
            declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
            return null;
        }
    });
    ...
}
  • 下一段是注册一个客户端接收端点, 适配任意带有处理逻辑的对象
   @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
  • SimpleMessageListenerContainer 是一个实现了org.springframework.context.Lifecycle 的自启动类,是客户端接收消息的入口。

    • AbstractMessageListenerContainer#start
    • -> SimpleMessageListenerContainer#doStart
    • -> AsyncMessageProcessingConsumer#run
    • -> SimpleMessageListenerContainer#receiveAndExecute
    • -> SimpleMessageListenerContainer#doReceiveAndExecute
    • ...
    • -> AbstractMessageListenerContainer#doInvokeListener
    • -> AbstractAdaptableMessageListener#onMessage
      ...
    • -> DelegatingInvocableHandler#invoke
    • -> InvocableHandlerMethod#invoke
  • InvocableHandlerMethod就是代理了刚才提供的业务逻辑的类了,我们之后再介绍。

  • Spring提供的客户端配置并不简便,一般我们用Rabbit的annotation来配置客户端代码,像这样:

@Component
@RabbitListener(queues = "${queue.name}")
public class Receiver {
    @RabbitHandler
    public void process(@Payload String body, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {
        
    }
}
  • 处理这些Annotation的类位于
    • RabbitBootstrapConfiguration#rabbitListenerAnnotationProcessor

    • -> RabbitListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

    • 过程与例子里类似,也是构造幷启动了一个SimpleMessageListenerContainer

  • 这里有几个大的问题, RabbitHandler到底如何定义,可以接受哪些参数,返回值如何处理呢?
  • 首先一个RabbitListener内是可以定义多个RabbitHandler的,调用者通过payload类型来选择最合适的handler来处理
@Component
@RabbitListener(queues = "${queue.name}")
public class Receiver {
    @RabbitHandler
    public void process(@Payload String stringBody, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routingKey) {
        
    }
    
    @RabbitHandler
    public void process(@Payload Integer intBody) {
        
    }
    
    @RabbitHandler
    public void process(@Payload Long longBody) {
        
    }
}
  • 这个逻辑流程如下

    • DelegatingInvocableHandler#invoke
    • DelegatingInvocableHandler#getHandlerForPayload
    • DelegatingInvocableHandler#findHandlerForPayload
    • DelegatingInvocableHandler #matchHandlerMethod
  • 方法可以接受哪些参数?这个是由org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver的实现类处理的

  • argumentResolver.jpeg
  • 由实现类的名字可以看出,参数可以接受Header,Message, Payload, Principal等类型

  • 实现逻辑流程:

    • DelegatingInvocableHandler#invoke
    • InvocableHandlerMethod#invoke
    • InvocableHandlerMethod#getMethodArgumentValues
    • HandlerMethodArgumentResolverComposite#resolveArgument
    • HandlerMethodArgumentResolver#resolveArgument
  • 哪些Header可以用?参考org.springframework.amqp.support.AmqpHeaders

  • 非空的返回值会从客户端发起一个新的消息,可以配合@Sendto或者AmqpHeaders.REPLY_TO使用,这里不再展开。参考AbstractAdaptableMessageListener#handleResult

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容