RocketMQ 异常分析 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a whil

现象:

在对 RMQ 做集群压测时,偶现 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a whil 异常,对系统正确率有一定影响,所以决定一探究竟。

全局搜索代码

首先,clone 了一波代码,全局搜了一下,在 BrokerFastFailure 这个类里的 cleanExpiredRequestInQueue 方法看到了:

    void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
        while (true) {
            try {
                if (!blockingQueue.isEmpty()) {
                    final Runnable runnable = blockingQueue.peek();
                    if (null == runnable) {
                        break;
                    }
                    final RequestTask rt = castRunnable(runnable);
                    if (rt == null || rt.isStopRun()) {
                        break;
                    }

                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                    if (behind >= maxWaitTimeMillsInQueue) {//
                        if (blockingQueue.remove(runnable)) {
                            rt.setStopRun(true);
                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                        }
                    } else {
                        break;
                    }
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }
    }

这段代码逻辑:

  1. 如果当前时间减去这个请求的创建时间 > 配置的最大等待时间。就删除掉这个请求,并抛出这个异常。在调用 returnResponse 方法时,会直接调用 netty 的 writeAndFlush 方法写回数据。

而这个方法会被 4 个地方调用:

楼主在每个调用后面设置了默认值:
发送时:最大 200ms
pull 消息时: 最大 5 秒
心跳: 31 秒
事务: 3 秒。

而我们的系统都是在 send 消息时,报错的,看来,是队列里请求等待超过 200 毫秒了。

再回头看看 cleanExpiredRequest 这个方法。

这个方法被一个 10ms 间隔的定时任务执行的。因此,最多也就超过 210 毫秒,就会抛异常。而这个方法还有另外一个逻辑:

 while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {// 如果写不进去.
            try {
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                    if (null == runnable) {
                        break;
                    }

                    final RequestTask rt = castRunnable(runnable);
                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }

如果操作系统 PageCache 很忙,也抛出 PCBUSY 异常,忙的意思是:往 MQ 文件里写是需要一把锁的,如果上次上锁的时间截止到现在,超过了 1 秒,就表示忙,就不等了,就抛出异常。

明显,200ms 的队列排队超时,更容易被触发。

那有没有可能,把这个队列对应的线程池搞的大一点呢?

我们看看源码:
sendThreadPoolQueueCapacity = 10000;
sendMessageThreadPoolNums = 1;

发现这个队列的大小是 10000。线程池的大小是 1(max 和 core 都是 1).

为什么是1?

注释是这么说的:

thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.

意思是,4.0.x 版本后使用自旋锁,所以默认值是1.

这个锁说的是 PutMessageLock,在 RMQ 里,有 2 种实现:

  1. PutMessageSpinLock 自旋锁。
  2. PutMessageReentrantLock JDK 非公平重入锁。

默认使用自旋锁,但是,自旋锁适合在并发低的时候使用,说白了就是乐观锁,乐观锁适合并发低的时候使用,悲观锁适合在并发高的时候使用。

为什么使用一个线程?因为多个线程没有效果,往 PageCache 里写数据是上锁的,最耗时的就在这部分,开启多个线程在这里抢锁是没有意义的。而且,在高并发下,多个线程自旋抢锁,CPU 可能会爆炸。

那用重入锁呢?大部分时候,MQ 的压力都没那么大,使用自旋锁,能够减少 CPU 上下文切换,提高性能。这是一个权衡。如果你把线程池搞了多个 线程,那就使用重入锁吧。但多线程确实没意义。

所以,线程池这部分,我们最好不要修改,也就是说,你即使增加线程数,也解决不了这个问题。另外,如果不是 1,在使用绝对顺序消息时,是无法保证消息的顺序的。相当于多个线程处理一个队列的消息,顺序一定会错乱。这个千万要注意。

根本的原因还是 PageCache 刷盘的时候,会有毛刺,超时超过 200ms,就会偶发这种现象。

总结

总结一下:
这个错误是因为请求在队列里待的太久了,如果是 send 请求,就是 200 多毫秒。因此,会被 Server 端认为这是过期的请求。

相关参数:

  1. waitTimeMillsInSendQueue 这个最有效,默认值是 200,可稍微改的大一点。例如 250,300,切不可过大,过大会导致积压过多请求,而且大部分都是无效的。更会引起后面的请求都无效。

  2. sendMessageThreadPoolNums 发送消息处理线程数,可以改的大一点,但治标不治本。

  3. brokerFastFailureEnable 这个参数用于控制是否进行扫描,就是每 10ms 执行 cleanExpiredRequest 那个定时任务,可以修改为 false,但不建议这么做。

最后,这个错误的根本原因猜测是因为操作系统 IO 抖动,因为看到网上很多 tps 很低,也会出现这个情况。具体还需要详细的测试和排查。

通常,大家会使用重试策略,解决这个异常,但是可能会引起消息重复,所以,如果对消息重复敏感,做幂等是必须的。

如果 CPU 和 磁盘负载很高,出现这种问题就很正常了,建议增加 Broker 服务器,分担压力。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容

  • ReentrantLock 介绍 一个可重入的互斥锁,它具有与使用{synchronized}方法和语句访问的隐式...
    tomas家的小拨浪鼓阅读 4,056评论 1 4
  • 引用自多线程编程指南应用程序里面多个线程的存在引发了多个执行线程安全访问资源的潜在问题。两个线程同时修改同一资源有...
    Mitchell阅读 1,990评论 1 7
  • 1.写出synchronized的使用方式 synchronized的三种应用方式 synchronized关键字...
    wuyuan0127阅读 303评论 0 1
  • 在一个方法内部定义的变量都存储在栈中,当这个函数运行结束后,其对应的栈就会被回收,此时,在其方法体中定义的变量将不...
    Y了个J阅读 4,417评论 1 14
  • 线程池ThreadPoolExecutor corepoolsize:核心池的大小,默认情况下,在创建了线程池之后...
    irckwk1阅读 725评论 0 0