Rabbimtmq unack问题分析

一、问题现象

应用程序通过spring amqp 操作rabbitmq,有个手动返回ack的queue,应用程序已经使用 channel.basicAck 返回ack了,但是通过rabbitmq 管理控制台看到,仍然有大量消息处于 unack 状态。

1、rabbit mq日志

通过 /var/log/rabbitmq/rabbit@host.log 可以看到对应的错误信息
显示如下:

2018-10-03 19:15:15.653 [error] <0.8578.1> Channel error on connection <0.8298.1> (127.0.0.1:54528 -> 127.0.0.1:5672, vhost: '/', user: 'guest'), channel 3:
operation basic.ack caused a channel exception precondition_failed: unknown delivery tag 4
2、应用程序日志

应用程序同时伴有如下错误日志:

[ERROR] 2018-10-03 16:43:40.251 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, method-id=80)
[ERROR] 2018-10-03 16:43:41.256 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 3, class-id=60, method-id=80)

二、解决过程

通过日志可以发现,channel 已经被close掉了,把 cachingConnectionFactory 的 channelCacheSize 改为大于等于目前系统里 consumer 的个数就可以。

1、xml配置

如果使用application.xml配置 RabbitTemplate ,添加如下配置

<rabbit:connection-factory id="xxxx"
                           host="${payment.rabbitmq.host}" username="${payment.rabbitmq.username}" password="${payment.rabbitmq.password}"
                           port="${payment.rabbitmq.port}" virtual-host="${payment.rabbitmq.virtual_host}" channel-cache-size="期望的size"/>
2、手动使用代码控制
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setChannelCacheSize(期望的size);
3、Tips:

1、spring-rabbit 从 1.6.0.RELEASE 之后,把 默认的 channelCacheSize 从 1 改为了 25。
2、channelCacheSize 一定要大于等于目前的 consumer 个数,在 1.6.0.RELEASE 之前,如果只有 1 个 consumer,不会出现这个问题。
在 1.6.0.RELEASE(含) 以及以后,如果 consumer 个数小于等于25,不会出现问题。建议手动显式设置。
3、如果使用spring xml配置 consumer,则不会出现这个问题,因为spring 会自动检查配置,在 SimpleMessageListenerContainer 类里,如果发现 channelCacheSize 小于当前的 consumer 数量,则会修改 channelCacheSize。

三、原因分析

1、开启spring 日志,有看到如下信息:

[TRACE] 2018-10-03 17:09:45.485 - Returning cached Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1)
[DEBUG] 2018-10-03 16:56:13.248 - Closing cached Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,1)

2、根据 Closing cached Channel 找到对应的代码,看在哪里执行了channel 的close操作。
搜索一下,在spring-rabbit源码路径下执行命令: grep -irn 'Closing cached Channel' org
发现代码的关闭操作实际上是在 CachedChannelInvocationHandlerphysicalClose() 方法进行的。
3、通过分析代码调用链路,发现如下流程:
RabbitTemplate 每次执行 execute 方法的时候,在 finally 块里最终都会释放资源,如果目前 CachingConnectionFactory 内部的 channelCacheSize 小于 配置的数量,则进行 逻辑关闭(logicalClose),即日志中的 Returning cached Channel,逻辑关闭不会真正的关闭 channel。 如果大于当前数量,则尝试进行 物理关闭(physicalClose): 会真正的关闭掉channel。

4、RabbitTemplate.execute 核心代码
    private <T> T doExecute(ChannelCallback<T> action, ConnectionFactory connectionFactory) {
        Assert.notNull(action, "Callback object must not be null");
        RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(
                (connectionFactory != null ? connectionFactory : getConnectionFactory()), isChannelTransacted());
        Channel channel = resourceHolder.getChannel();
        if (this.confirmCallback != null || this.returnCallback != null) {
            addListener(channel);
        }
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Executing callback on RabbitMQ Channel: " + channel);
            }
            return action.doInRabbit(channel);
        }
        catch (Exception ex) {
            if (isChannelLocallyTransacted(channel)) {
                resourceHolder.rollbackAll();
            }
            throw convertRabbitAccessException(ex);
        }
        finally {
                      //释放资源
            ConnectionFactoryUtils.releaseResources(resourceHolder);
        }
    }
6、代码流程如下:
image.png
7、logicalClose 代码如下:

通过代码可以看到,close 只是把它放回到 channel pool list里,并没有做真正的关闭操作。

        private void logicalClose(ChannelProxy proxy) throws Exception {
            if (target == null) {
                return;
            }
            if (this.target != null && !this.target.isOpen()) {
                synchronized (targetMonitor) {
                    if (this.target != null && !this.target.isOpen()) {
                        if (this.channelList.contains(proxy)) {
                            this.channelList.remove(proxy);
                        }
                        this.target = null;
                        return;
                    }
                }
            }
            // Allow for multiple close calls...
            if (!this.channelList.contains(proxy)) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Returning cached Channel: " + this.target);
                }
                this.channelList.addLast(proxy);
            }
        }
8、physicalClose 代码,真实的关闭 channel, 会通过 rabbit 包下的 channelN 真正的关闭掉这个channel(向rabbitmq server发送消息)
        private void physicalClose() throws Exception {
            if (logger.isDebugEnabled()) {
                logger.debug("Closing cached Channel: " + this.target);
            }
            if (this.target == null) {
                return;
            }
            try {
                if (CachingConnectionFactory.this.active &&
                        (CachingConnectionFactory.this.publisherConfirms ||
                                CachingConnectionFactory.this.publisherReturns)) {
                    ExecutorService executorService = (getExecutorService() != null
                            ? getExecutorService()
                            : CachingConnectionFactory.this.deferredCloseExecutor);
                    final Channel channel = CachedChannelInvocationHandler.this.target;
                    executorService.execute(new Runnable() {

                        @Override
                        public void run() {
                            try {
                                if (CachingConnectionFactory.this.publisherConfirms) {
                                    channel.waitForConfirmsOrDie(5000);
                                }
                                else {
                                    Thread.sleep(5000);
                                }
                            }
                            catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            }
                            catch (Exception e) {}
                            finally {
                                try {
                                    if (channel.isOpen()) {
                                        channel.close(); //真实关闭掉这个channel
                                    }
                                }
                                catch (IOException e) {}
                                catch (AlreadyClosedException e) {}
                            }
                        }

                    });
                }
                else {
                    this.target.close();
                }
            }
            catch (AlreadyClosedException e) {
                if (logger.isTraceEnabled()) {
                    logger.trace(this.target + " is already closed");
                }
            }
            finally {
                this.target = null;
            }
        }

    }

四、总结

归根结底,是因为channel 被关闭了,但是仍然通过这个channel 回复 ack,在basicAck的时候,会把当前的 channelNumber 一起发送给rabbitmq server。
有兴趣的可以深入了解下 rabbitmq server的实现原理。
代码在 https://github.com/rabbitmq/rabbitmq-server
rabbitmq 是erlang 开发的,上面的错误日志在

collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
    case queue:out(Q) of
        {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
         QTail} ->
            if CurrentDeliveryTag == DeliveryTag ->
                    {[UnackedMsg | ToAcc],
                     case PrefixAcc of
                         [] -> QTail;
                         _  -> queue:join(
                                 queue:from_list(lists:reverse(PrefixAcc)),
                                 QTail)
                     end};
               Multiple ->
                    collect_acks([UnackedMsg | ToAcc], PrefixAcc,
                                 QTail, DeliveryTag, Multiple);
               true ->
                    collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
                                 QTail, DeliveryTag, Multiple)
            end;
        {empty, _} ->
            precondition_failed("unknown delivery tag ~w", [DeliveryTag])
    end.
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,594评论 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,864评论 2 11
  • 前言 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来...
    Chandler_珏瑜阅读 6,560评论 2 39
  • % rabbitMQ learn% qijun% 19/01/2018 mq 的一些概念 mq: mq 是一个m...
    c7d122ec46c0阅读 2,065评论 0 21
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,733评论 6 342