重新审视JDK线程池

JDK 线程池使用过程中,很多人都知道有一些关键参数需要配置,

​    public ThreadPoolExecutor(int corePoolSize, 

​                              int maximumPoolSize, 

​                              long keepAliveTime, 

​                              TimeUnit unit, 

​                              BlockingQueue<Runnable> workQueue, 

​                              RejectedExecutionHandler handler)  

也大致知道线程池的大致原理,但是不一定能解释某些现象。

有个系统,设计大致是这样的:服务A 发送消息到 MQ(具体来说是 kafka),消费端调用服务 B 实际执行消费操作。公司的中间件对 kafka 做了一层封装,能够动态配置一些参数,动态重建 consumer 应用新的配置,其中有一个参数就是并行度 parallelCount ,含义是:对于同一个 partition 分配多少个线程并行处理消息。

系统设计之初,就考虑了使用这个配置来动态调整整个系统的承载能力,因为流量弹性比较高,少的时候一天没有调用量,多的时候可能需要在较短时间内处理几十万到上百万的消息,处理时间甚至可能需要根据下游系统性能调整。所以这个参数的动态调整至关重要。

Snipaste_2019-07-15_21-14-06.png

但是实际上线之后,一次大批量调用,观察到并行度调整似乎没有达到预期效果,默认 parallelCount = 1,如果业务能保证不依赖消息顺序,则可以调整并行度提高吞吐量

有一天线上收到告警,消息积压。于是赶紧调整并行度,

n=1,

n=2,

一切有序进行中,消息处理速度整体不断增加

n=8,

n=9,

n=10

n=16

但是观察线上监控,似乎处理能力不再增加了?这是怎么回事?

我记得下游系统 actual service 配置了最大 64 线程,这还差很多呢,怎么就不线性增长了?

Snipaste_2019-07-15_20-07-45.png

下游系统响应变慢?

开始的时候猜想,是不是下游系统处理能力不够了?请求的响应速度变慢,所以请求堆积起来了?

由于下游系统是个外部的 HTTP 服务,所以无从得知,但是从历史经验来看,远远达不到这个系统的瓶颈,因为这个系统其实有很多的外部调用方,我们的请求量不见得算很大。

而且从 actual service 的内部打点来看,实际执行 HTTP 调用的地方 TP99 并没有变慢,和平时一样。

下游系统限流?

这是有可能的,因为下游系统这个HTTP服务本身有对各个接入放有限流,但是查了文档,当前调用量还远没有达到限流阈值。

系统内部分析

那么问题只会出现在系统内部了

查看监控

consumer 应用内部,当时的线程堆栈采样可以看出来,kafka consumer 端线程数量确实有 16 个

这样也就排除了 consumer 并行度调整不生效的问题。

actual service 内部,查看当时的线程堆栈采样,对应 consumer 并行度 16 的时候,service 内部用于处理任务的专用线程池,thread-count == 10

起初很奇怪,细想一下终于明白了。JDK 线程池确实是这个逻辑

image

简单来说,就是 threadCount > coreSize ,先开始排队,队列满再扩充线程池

//java.util.concurrent.ThreadPoolExecutor#execute
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

JDK 代码注释中有解释,代码短小精悍。

优化方案

知道问题所在了,那么怎么解决呢?

初步想,有两种方案。

方案一

直接把 coreSize 设置到一个足够大的值,比如 64,或者干脆配置一个 fixed size 的线程池

优点:简单直接,能解决问题

缺点:请求量低的时候大量线程闲置,浪费系统资源

方案二

这也是本篇的精髓所在了,改造 JDK 线程池。

既然缺陷在于先排队后扩容,延迟了扩容的时机,那就改成先扩容后排队,这样就能确保在一定空间下处理能力线性增长了。

怎么做呢?分析上面的代码,第二个 if 语句,isRunning(c) && workQueue.offer(command) 如果入队成功了就不会创建线程,所以只要重载 Queue,判断当前 threadCount > coreSize && threadCount < maxCount 的时候返回 false,就可以了,等到 threadCount > maxSize 的时候再实际执行入队操作。

其实这就是 tomcat 线程池的做法,细节上需要注意:queue 需要感知到 threadPool 当前的 count,需要做一些改造。

看源码:tomcat 8.0.30 版本

//org.apache.tomcat.util.threads.TaskQueue#offer
    @Override
    public boolean offer(Runnable o) {
      //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //if we have less threads than maximum force creation of a new thread
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }

创建的时候持有 Pool 的引用

// org.apache.catalina.core.StandardThreadExecutor#startInternal
    @Override
    protected void startInternal() throws LifecycleException {

        taskqueue = new TaskQueue(maxQueueSize);
        TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
        executor.setThreadRenewalDelay(threadRenewalDelay);
        if (prestartminSpareThreads) {
            executor.prestartAllCoreThreads();
        }
        taskqueue.setParent(executor);

        setState(LifecycleState.STARTING);
    }

剥离开 tomcat 的一些不相关的参数,简单改造一下就可以用了。

感谢 tomcat ,随便一看都是宝藏

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容

  • Java8张图 11、字符串不变性 12、equals()方法、hashCode()方法的区别 13、...
    Miley_MOJIE阅读 3,697评论 0 11
  • 线程 操作系统线程理论 线程概念的引入背景 进程 之前我们已经了解了操作系统中进程的概念,程序并不能单独运行,只有...
    go以恒阅读 1,635评论 0 6
  • 一、线程池简介 在实际开发中,如果每个请求到达就创建一个新线程,开销是相当大的。服务器在创建和销毁线程上花费的时间...
    不知名的蛋挞阅读 572评论 0 7
  •   一个任务通常就是一个程序,每个运行中的程序就是一个进程。当一个程序运行时,内部可能包含了多个顺序执行流,每个顺...
    OmaiMoon阅读 1,664评论 0 12
  • 一 基础篇 1.1 Java基础 面向对象的特征抽象:将一类对象的共同特征总结出来构建类的过程。继承:对已有类的一...
    essential_note阅读 690评论 0 0