springboot中@RabbitListener的配置分析,多线程扩容以及prefetch

rabbitmq组件几个比较关键的类,ConnectionFactory,RabbitTemplate,SimpleRabbitListenerContainerFactoryConfigurer,SimpleRabbitListenerContainerFactory
,MessageListenerContainer,SimpleMessageListenerContainer

本文只说明Simple的容器

@RabbitListener的concurrency可以决定初始和最大开几个消费者线程,固定值直接设置concurrency="2"  如果是动态扩容则设置concurrency="2-10" ,2为最小开2个线程,最大开10个线程。是否需要动态调整线程,如果默认不配置containerFactory参数,则由系统决定,系统的默认值preFetch为250,consecutiveActiveTrigger为10次,startConsumerMinInterval为10秒,这些值默认控制着消费者是否动态增加,具体代码对应startConsumerMinInterval和consecutiveActiveTrigger图,代表的意思:如果一个消费者连续消费了10次并且距离上次增加新的消费者的间隔为大于10秒,则可以增加新的消费者线程,如果增加了新的消费者线程,但是rabbitmq队列没有待消费的消息,则过段时间会触发多余的消费者线程销毁,需要合理的配置preFetch大小

比如如果preFetch为默认的250的容量,表示每个消费者本地队列最大持有250个消息,消费者线程配置为2-5,当前消息队列的消息为200,初始开启了两个线程,每个消费者本地队列各持有100个消息,此时rabbitmq队列没有待消费的消息,如果当前有一个消费者满足动态增加新的消费者的条件,则增加的消费者本地消息队列接收不到rabbitmq的消息,过一会就会销毁,因为200个消息已经被之前的消费者缓存到了本地。如果 preFetch设置为5,则两个消费者共持有10个消息,当满足开辟新的消费者条件时,此时rabbitmq里面还有待消费的消息,新开辟的消费者可以最多从rabbitmq里面缓存5个消息。

看网上一些博主说的刚开始创建两个消费者,如果一个消费者的prefetch没有满,第二个消费者就不会去消费,这种是错误的说法,mq会根据当前待消费的消息数量,均匀的分发给现有的消费者。动态扩展消费者的默认时间间隔和消费次数配置不支持yml配置文件配置,可自己实现 SimpleRabbitListenerContainerFactory来配置,具体配置的源码解读看下文。

一般 preFetch设置大点用来处理耗时短的消息,来减少和mq交互时的网络请求。

1.从自动配置类作为入口RabbitAutoConfiguration

1-1

这里引入了RabbitAnnotationDrivenConfiguration,这个类主要配置了SimpleRabbitListenerContainerFactoryConfigurer,SimpleRabbitListenerContainerFactory并且开启@EnableRabbit注解

1-2
1-3

RabbitBootstrapConfiguration注入了RabbitListenerAnnotationBeanPostProcessor,从此进入springboot的生命周期

1-4

在后置处理函数中解析@RabbitListener注解的内容

1-5

上图的红框最终都会走到下面的函数,这个函数将endpoint和factory组装成AmqpListenerEndpointDescriptor,最终组装为MessageListenerContainer消息监听容器,遍历容器集合调用start方法

1-6

AbstractMessageListenerContainer是MessageListenerContainer的抽象模板类实现,最终会调用SimpleMessageListenerContainer,MessageListenerContainer的start方法会在AbstractMessageListenerContainer调用。这

1-7
1-8
1-9

最终的doStart()会调用到SimpleMessageListenerContainer中,在这里完成消费者初始化和开线程消费,下面三张图是消费者创建的关键源码,每个消费者有自己的本地缓存队列,队列大小为prefetchCount,这个值默认为250,可以在yml文件配置,这个值决定了消费者能缓存的消息的总大小

rabbitmq. listener. simple. prefetch=5  (需要改成yml格式)

AsyncMessageProcessingConsumer类会开启消费消息的循环

mainLoop
consecutiveActiveTrigger
startConsumerMinInterval

自定义SimpleRabbitListenerContainerFactory来实现动态扩容相关的参数解析

1.先初始化一个RabbitListenerEndpointRegistrar,见下图

2-1

下图中权重的endpointDescriptors为上面图6所在的代码注册

2-2
2-3
2-4
2-5
2-6

上面的6张图是factory创建MessageListenerContainer的主要流程和相关的参数配置,如果要更灵活的自定义参数,可以创建自己的SimpleRabbitListenerContainerFactory

示例:定义线程动态扩展的条件,@RabbitListener注解中的factory要设置为自己配置的这个对象

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容