在很早之前,项目出现了一个问题,即有一个消费队列。其中数据处理的过程中出现了OOM,然后导致了RabbitMQ消费者全部挂掉,最终导致消息大量堆积。
在排查这个问题的时候,我深入了解了下RabbitMq源码,最终定位并解决了这个问题。
版本信息:SpringBoot 2.0.4.RELEASE
因为SpringBoot版本不同,RabbitMq源码有一些改动。所以要确定版本号。
1. 问题定位
某个消息出现error级别的异常后,对应的消费者会关闭,且返回unack。消息会回到Mq并转发给下一个消费者去消费。
当然,这个消息在下一个消费者中很大程度上也会出现error异常。故一个error级别的异常,会将整个队列所有的消息者全关闭。
1.1 源码分析
每创建一个消息者,则在线程池中获取一个线程。去执行AsyncMessageProcessingConsumer
然后消费者将“死循环”式的监听消费消息。
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer
当出现error异常时,如代码所示,会stop掉消费者。
2. 解决方案
2.1 方式一:监听事件,重启消费者
在上图可知,当消费者被终止后,会发送Event事件,那么监听事件后重启消费者即可。
@Slf4j
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
/**
* 重启消费者失败的消费者方法
*/
private Consumer<RestartConsumerFailEvent> restartConsumerFailEventConsumer;
/**
* 开启自动重启的回调方法
* true:表示将要重启消费者
*/
private Function<ListenerContainerConsumerFailedEvent, Boolean> failedEventListenerBooleanFunction;
public void setRestartConsumerFailEventConsumer(Consumer<RestartConsumerFailEvent> restartConsumerFailEventConsumer) {
this.restartConsumerFailEventConsumer = restartConsumerFailEventConsumer;
}
public void setFailedEventListenerBooleanFunction(Function<ListenerContainerConsumerFailedEvent, Boolean> failedEventListenerBooleanFunction) {
this.failedEventListenerBooleanFunction = failedEventListenerBooleanFunction;
}
@Override
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
log.error("消费者失败事件发生:{}", event);
//判断是否需要进行重试
Boolean restart = false;
if (failedEventListenerBooleanFunction != null) {
restart = failedEventListenerBooleanFunction.apply(event);
}
if (restart) {
log.error(String.format("Stopping container from aborted consumer. Reason::%s.",
event.getReason()), event.getThrowable());
//获取到容器
SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
String queueNames = Arrays.toString(container.getQueueNames());
// 重启
try {
restart(container);
log.info("重启队列{}的监听成功!", queueNames);
} catch (Exception e) {
log.error(String.format("重启队列%s的监听失败!", queueNames), e);
//发布事件
if (restartConsumerFailEventConsumer != null) {
RestartConsumerFailEvent restartConsumerFailEvent = new RestartConsumerFailEvent(event.getSource());
restartConsumerFailEvent.setThrowable(e);
restartConsumerFailEventConsumer.accept(restartConsumerFailEvent);
}
}
}
}
/**
* 重启消费者
*
* @param container 容器对象
*/
private void restart(SimpleMessageListenerContainer container) {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
log.error("", e);
}
Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));
//重启
container.start();
}
}
缺点是:触发error异常的消息依旧没有被消费掉,依旧可能会将重启的消费者给kill掉。
2.2 方式二:捕获Error异常
首先实现拦截器,当出现Error异常后,捕获处理异常,并向外抛出事件通知。
@Slf4j
public class ErrorHandlerInterceptor implements MethodInterceptor, Serializable {
private ApplicationContext applicationContext;
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
/**
* 当抛出Error异常后,处理方案。
*/
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Object proceed = null;
try {
proceed = invocation.proceed();
} catch (Error e) {
log.error("", e);
//遇到Error异常后的处理方案
MessageErrorEvent messageErrorEvent = new MessageErrorEvent(applicationContext, RabbitUtil.getMessage(invocation), e);
applicationContext.publishEvent(messageErrorEvent);
}
return proceed;
}
}
事件信息:
public class MessageErrorEvent extends ApplicationContextEvent {
/**
* 消息对象
*/
private Message message;
/**
* Error异常信息
*/
private Error error;
/**
* Create a new ContextStartedEvent.
*
* @param source the {@code ApplicationContext} that the event is raised for
* (must not be {@code null})
*/
public MessageErrorEvent(ApplicationContext source) {
super(source);
}
public MessageErrorEvent(ApplicationContext source, Message message, Error error) {
super(source);
this.message = message;
this.error = error;
}
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
this.message = message;
}
public Error getError() {
return error;
}
public void setError(Error error) {
this.error = error;
}
}
关键点:将拦截器设置到SimpleRabbitListenerContainerFactory中:
@EnableRabbit
@Configuration
public class RabbitConfiguration {
@Autowired
private ObjectProvider<ErrorHandlerInterceptor> errorHandlerInterceptorObjectProvider;
@Bean(name = "sealListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainerFactory(CachingConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(250);
/* 设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。 */
factory.setDefaultRequeueRejected(true);
//慢消息触发事件通知
List<Advice> adviceList = new ArrayList<>();
ErrorHandlerInterceptor errorHandlerInterceptor = errorHandlerInterceptorObjectProvider.getIfAvailable();
if (errorHandlerInterceptor != null) {
adviceList.add(errorHandlerInterceptor);
}
//加入拦截器配置
List<MessageInterceptor> messageInterceptors = messageInterceptorsObjectProvider.getIfAvailable();
if (!CollectionUtils.isEmpty(messageInterceptors)) {
for (MessageInterceptor messageInterceptor : messageInterceptors) {
if (messageInterceptor != null) {
adviceList.add(messageInterceptor);
}
}
}
factory.setAdviceChain(adviceList.toArray(new Advice[adviceList.size()]));
//自动确认
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
}
由此,可以将通过责任链的方式,进行AOP处理,使得消费者不对外抛出Error级别的异常。