【RabbitMQ-13】惊!线上的RabbitMQ消费者自己kill自己,导致消息大量堆积

在很早之前,项目出现了一个问题,即有一个消费队列。其中数据处理的过程中出现了OOM,然后导致了RabbitMQ消费者全部挂掉,最终导致消息大量堆积。

在排查这个问题的时候,我深入了解了下RabbitMq源码,最终定位并解决了这个问题。

版本信息:SpringBoot 2.0.4.RELEASE
因为SpringBoot版本不同,RabbitMq源码有一些改动。所以要确定版本号。

1. 问题定位

某个消息出现error级别的异常后,对应的消费者会关闭,且返回unack。消息会回到Mq并转发给下一个消费者去消费。

当然,这个消息在下一个消费者中很大程度上也会出现error异常。故一个error级别的异常,会将整个队列所有的消息者全关闭

1.1 源码分析

每创建一个消息者,则在线程池中获取一个线程。去执行AsyncMessageProcessingConsumer

创建消费者.png

然后消费者将“死循环”式的监听消费消息。
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer

消息队列是死循环去消费数据.png

当出现error异常时,如代码所示,会stop掉消费者。

stop消费者.png

2. 解决方案

2.1 方式一:监听事件,重启消费者

在上图可知,当消费者被终止后,会发送Event事件,那么监听事件后重启消费者即可。

@Slf4j
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {

    /**
     * 重启消费者失败的消费者方法
     */
    private Consumer<RestartConsumerFailEvent> restartConsumerFailEventConsumer;

    /**
     * 开启自动重启的回调方法
     * true:表示将要重启消费者
     */
    private Function<ListenerContainerConsumerFailedEvent, Boolean> failedEventListenerBooleanFunction;


    public void setRestartConsumerFailEventConsumer(Consumer<RestartConsumerFailEvent> restartConsumerFailEventConsumer) {
        this.restartConsumerFailEventConsumer = restartConsumerFailEventConsumer;
    }

    public void setFailedEventListenerBooleanFunction(Function<ListenerContainerConsumerFailedEvent, Boolean> failedEventListenerBooleanFunction) {
        this.failedEventListenerBooleanFunction = failedEventListenerBooleanFunction;
    }

    @Override
    public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        log.error("消费者失败事件发生:{}", event);
        //判断是否需要进行重试
        Boolean restart = false;
        if (failedEventListenerBooleanFunction != null) {
            restart = failedEventListenerBooleanFunction.apply(event);
        }
        if (restart) {
            log.error(String.format("Stopping container from aborted consumer. Reason::%s.",
                    event.getReason()), event.getThrowable());
            //获取到容器
            SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
            String queueNames = Arrays.toString(container.getQueueNames());
            // 重启
            try {
                restart(container);
                log.info("重启队列{}的监听成功!", queueNames);
            } catch (Exception e) {
                log.error(String.format("重启队列%s的监听失败!", queueNames), e);
                //发布事件
                if (restartConsumerFailEventConsumer != null) {
                    RestartConsumerFailEvent restartConsumerFailEvent = new RestartConsumerFailEvent(event.getSource());
                    restartConsumerFailEvent.setThrowable(e);
                    restartConsumerFailEventConsumer.accept(restartConsumerFailEvent);
                }
            }
        }

    }

    /**
     * 重启消费者
     *
     * @param container 容器对象
     */
    private void restart(SimpleMessageListenerContainer container) {
        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            log.error("", e);
        }
        Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));
        //重启
        container.start();
    }

}

缺点是:触发error异常的消息依旧没有被消费掉,依旧可能会将重启的消费者给kill掉。

2.2 方式二:捕获Error异常

首先实现拦截器,当出现Error异常后,捕获处理异常,并向外抛出事件通知。

@Slf4j
public class ErrorHandlerInterceptor implements MethodInterceptor, Serializable {

    private ApplicationContext applicationContext;

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    /**
     * 当抛出Error异常后,处理方案。
     */
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        Object proceed = null;
        try {
            proceed = invocation.proceed();
        } catch (Error e) {
            log.error("", e);
            //遇到Error异常后的处理方案
            MessageErrorEvent messageErrorEvent = new MessageErrorEvent(applicationContext, RabbitUtil.getMessage(invocation), e);
            applicationContext.publishEvent(messageErrorEvent);
        }
        return proceed;
    }
}

事件信息:

public class MessageErrorEvent extends ApplicationContextEvent {

    /**
     * 消息对象
     */
    private Message message;

    /**
     * Error异常信息
     */
    private Error error;

    /**
     * Create a new ContextStartedEvent.
     *
     * @param source the {@code ApplicationContext} that the event is raised for
     *               (must not be {@code null})
     */
    public MessageErrorEvent(ApplicationContext source) {
        super(source);
    }

    public MessageErrorEvent(ApplicationContext source, Message message, Error error) {
        super(source);
        this.message = message;
        this.error = error;
    }

    public Message getMessage() {
        return message;
    }

    public void setMessage(Message message) {
        this.message = message;
    }

    public Error getError() {
        return error;
    }

    public void setError(Error error) {
        this.error = error;
    }
}

关键点:将拦截器设置到SimpleRabbitListenerContainerFactory中:

@EnableRabbit
@Configuration
public class RabbitConfiguration {

    @Autowired
    private ObjectProvider<ErrorHandlerInterceptor> errorHandlerInterceptorObjectProvider;

    @Bean(name = "sealListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory(CachingConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(250);
        /* 设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。 */
        factory.setDefaultRequeueRejected(true);
        //慢消息触发事件通知
        List<Advice> adviceList = new ArrayList<>();
  
        ErrorHandlerInterceptor errorHandlerInterceptor = errorHandlerInterceptorObjectProvider.getIfAvailable();
        if (errorHandlerInterceptor != null) {
            adviceList.add(errorHandlerInterceptor);
        }
        //加入拦截器配置
        List<MessageInterceptor> messageInterceptors = messageInterceptorsObjectProvider.getIfAvailable();
        if (!CollectionUtils.isEmpty(messageInterceptors)) {
            for (MessageInterceptor messageInterceptor : messageInterceptors) {
                if (messageInterceptor != null) {
                    adviceList.add(messageInterceptor);
                }
            }
        }
        factory.setAdviceChain(adviceList.toArray(new Advice[adviceList.size()]));
        //自动确认
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }
}

由此,可以将通过责任链的方式,进行AOP处理,使得消费者不对外抛出Error级别的异常。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容