rabbitmq中消费者可使用@RabbitListener标注的方法进行处理,这里介绍下@RabbitListener解析过程。
解析类
RabbitListenerAnnotationBeanPostProcessor类用于解析RabbitListener注解,该类实现了BeanPostProcessor,Ordered,BeanFactoryAware,BeanClassLoaderAware,EnvironmentAware说明该类具有以下特性:
- 是一个Bean后处理,@RabbitListener标注的类(或方法所在类)必须可被Bean实例化,也就是要么在@Bean方法中手动创建实例,或者用@@Component标注。
- 可设定Bean初始化优先级
- 注入BeanFactory,Environment,可以获取Bean工厂中的Bean实例和环境变量
- 单例实例化后处理
- 解析入口方法:postProcessAfterInitialization
查找RabbitListener标注的类或方法
- Bean工厂类中的Bean都是通过动态代理创建的代理类,要获取目标类,先使用AopUtils
Class<?> targetClass = AopUtils.getTargetClass(bean);
- 获取目标类中RabbitListener注解
有2种方式配置RabbitListener,一种使用RabbitListener标注类且使用RabbitHandler标注方法,另一种使用RabbitListener标注方法,两种模式同时存在也可以。使用RabbitHandler标注的方法和RabbitListener标注的方法都会被当做消费者的消息监听方法
//查找RabbitListener标注的类
Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
@Override
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
//查找RabbitListener标注的方法
Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
if (listenerAnnotations.size() > 0) {
methods.add(new ListenerMethod(method,listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
}
//存在RabbitListener标注的类
if (hasClassLevelListeners) {
//查找RabbitHandler标注的方法
RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
if (rabbitHandler != null) {
multiMethods.add(method);
}
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);//查找范围是除了Object类之外的方法
处理@RabbitListener标注的方法
@RabbitListener中可使用SpEL表达式
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,Object adminTarget, String beanName) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
//Id
endpoint.setId(getEndpointId(rabbitListener));
//解析队列
endpoint.setQueueNames(resolveQueues(rabbitListener));
//独占队列
endpoint.setExclusive(rabbitListener.exclusive());
//分组
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
//优先级
String priority = resolve(rabbitListener.priority());
if (StringUtils.hasText(priority)) {
endpoint.setPriority(Integer.valueOf(priority));
}
//查找rabbitAdmin
String rabbitAdmin = resolve(rabbitListener.admin());
if (StringUtils.hasText(rabbitAdmin)) {
endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
}
//查找containerFactory
String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
}
//将endpoint关联到containerFactory中
this.registrar.registerEndpoint(endpoint, factory);
}
private String[] resolveQueues(RabbitListener rabbitListener) {
String[] queues = rabbitListener.queues();
QueueBinding[] bindings = rabbitListener.bindings();
//queues和bindings都配置会抛出异常
if (queues.length > 0 && bindings.length > 0) {
throw new BeanInitializationException("@RabbitListener can have 'queues' or 'bindings' but not both");
}
List<String> result = new ArrayList<String>();
if (queues.length > 0) {
for (int i = 0; i < queues.length; i++) {
//队列名称可为SpEL表达式,如#{myQueue},注意不能带.,如my.queue(可能会报错)
Object resolvedValue = resolveExpression(queues[i]);
//解析队列名称,resolvedValue可为String[],Queue,String,最终都会解析为String,存放到result中
resolveAsString(resolvedValue, result);
}
}else {
return registerBeansForDeclaration(rabbitListener);
}
return result.toArray(new String[result.size()]);
}
private String[] registerBeansForDeclaration(RabbitListener rabbitListener) {
List<String> queues = new ArrayList<String>();
if (this.beanFactory instanceof ConfigurableBeanFactory) {
for (QueueBinding binding : rabbitListener.bindings()) {
//声明队列
String queueName = declareQueue(binding);
queues.add(queueName);
//声明Exchange和Binding
declareExchangeAndBinding(binding, queueName);
}
}
return queues.toArray(new String[queues.size()]);
}
//具体声明队列和Exchange及Binding的代码可以去看源码
处理@RabbitListener标注的类和@RabbitHandler标注的方法
解析原理同上,只是解析信息封装在MultiMethodRabbitListenerEndpoint中,上面的是MethodRabbitListenerEndpoint
这里要说明下注解中均可以使用SpEL表达式,SpEL表达式可以解析BeanFactory和Environment中(spring.rabbitmq.emptyStringArguments(多个,分隔))的数据,如我们自己创建的Queue、Exchaneg、Binding等Bean这里都可以通过表达式引用(定义好Bean名称),还比如我们使用@Queue、@Exchange、@binding配置的属性,也可以通过表达式引用,这些配置值可以写在配置文件中。
- beanFactory:DefaultListableBeanFactory实例,Bean工厂
- resolver:StandardBeanExpressionResolver,通过SpEL表达式解析Bean实例
- expressionContext:BeanExpressionContext,封装了beanFactory,Bean实例上下文,提供SpEL表达式解析
- environment:StandardServletEnvironment