Java如何让线程池满后再放队列

背景

最近收到一道面试题:我们知道JDK的线程池在线程数达到corePoolSize之后,先判断队列,再判断maximumPoolSize。如果想反过来,即先判断maximumPoolSize再判断队列,怎么办?

建议往下浏览之前先思考一下解决方案,如果自己面对这道面试题,该如何作答?


方案一

由于线程池的行为是定义在JDK相关代码中,我们想改变其默认行为,很自然的一种想法便是:继承自JDK的线程池类java.util.concurrent.ThreadPoolExecutor,然后改写其execute方法,将判断队列与maximumPoolSize的逻辑顺序调整一下,以达到目的

原来的逻辑如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 创建新线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 代码运行到此处,说明线程池数量达到了corePoolSize
    if (isRunning(c) && workQueue.offer(command)) {
        // 将任务成功入队
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 代码运行到此处,说明入队失败
    else if (!addWorker(command, false))
        // 创建新线程失败则执行拒绝策略
        reject(command);
}

但是仔细阅读代码会发现,execute中涉及到的一些关键方法如workerCountOfaddWorker等是私有的,关键变量如ctlcorePoolSize也是私有的,即无法通过简单继承ThreadPoolExecutor改写其execute方法的核心逻辑达到目的。

那考虑的一个变种是,定义一个MyThreadPoolExecutor,把ThreadPoolExecutor的代码照搬过来,只改写其中execute方法,改写后的逻辑如下:

public void execute(Runnable command) {
    if (command == null)
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 先判断maximumPoolSize
    if (workerCountOf(c) < maximumPoolSize) {
        if (addWorker(command, false))
            return;
        c = ctl.get();
    }
    // 再判断队列
    else if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (isRunning(c) && !workQueue.offer(command))
        reject(command);
}

改写之后,发现reject方法也得重写,原因是RejectedExecutionHandler#rejectedExecution第二个入参是ThreadPoolExecutor,不能传this

// java.util.concurrent.ThreadPoolExecutor#reject

final void reject(Runnable command) {
     handler.rejectedExecution(command, this);
}
// java.util.concurrent.RejectedExecutionHandler

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

这样,连RejectedExecutionHandler也要改写一下

由于RejectedExecutionHandler的改造并非面试题核心逻辑,所以此处省略,明白要表达的意思即可

但这样做之后,与三方框架的兼容就很难了--->有不少三方框架入参是需要ThreadPoolExecutor,而不是自定义的MyThreadPoolExecutor,后续的使用会是个问题

评价:自定义MyThreadPoolExecutor需要代码大篇幅的拷贝,麻烦不说,兼容性还是个问题,从实战出发考虑,可行性很低

方案二

那有没有什么方案能够既省事,又能兼顾兼容性?

两步走:

  1. 自定义Queue,改写offer逻辑
  2. 自定义线程池类,继承自ThreadPoolExecutor,改写核心逻辑
自定义Queue
public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;
    
    // 自定义的线程池类,继承自ThreadPoolExecutor
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    // offer方法的含义是:将任务提交到队列中,返回值为true/false,分别代表提交成功/提交失败
    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }
        // 线程池的当前线程数
        int currentPoolThreadSize = executor.getPoolSize();
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            // 已提交的任务数量小于当前线程数,意味着线程池中有空闲线程,直接扔进队列里,让线程去处理
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            // 重点: 当前线程数小于 最大线程数 ,返回false,暗含入队失败,让线程池去创建新的线程
            return false;
        }

        // 重点: 代码运行到此处,说明当前线程数 >= 最大线程数,需要真正的提交到队列中
        return super.offer(runnable);
    }

    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}
自定义线程池类
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * 定义一个成员变量,用于记录当前线程池中已提交的任务数量
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }


    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        // ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        // 将池中已提交的任务数 + 1
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

核心逻辑:当提交任务给EagerThreadPoolExecutor,执行submittedTaskCount.incrementAndGet();将池中已提交的任务数 + 1,然后就调用父类的execute方法

// 代码运行到此处,说明线程数 >= corePoolSize, 此时workQueue为自定义的TaskQueue
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
else if (!addWorker(command, false))
    reject(command);

核心逻辑:当执行workQueue.offer(command),走到自定义的TaskQueue#offer逻辑,而offer方法的返回值决定着是否创建更多的线程:返回true,代表入队成功,不创建线程;返回false,代表入队失败,需要创建线程

// 线程池的当前线程数
int currentPoolThreadSize = executor.getPoolSize();
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
    // 已提交的任务数量小于当前线程数,意味着线程池中有空闲线程,直接扔进队列里,让线程去处理
    return super.offer(runnable);
}

if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
    // 重点: 当前线程数小于 最大线程数 ,返回false,暗含入队失败,让线程池去创建新的线程
    return false;
}

// 重点: 代码运行到此处,说明当前线程数 >= 最大线程数,需要真正的提交到队列中
return super.offer(runnable);

核心逻辑:当前线程数小于最大线程数就返回false,代表入队失败,需要创建线程

因此,总结起来就是:自定义的EagerThreadPoolExecutor依赖自定义的TaskQueue的offer返回值来决定是否创建更多的线程,达到先判断maximumPoolSize再判断队列的目的

评价:该方案不需要修改JDK线程池的核心逻辑,尽最大可能避免因更改核心流程考虑不周而引入的BUG。另一方面,扩展Queue的手段,也是JDK提供的一个能够让用户在不干涉核心流程的情况下,达到安全扩展线程池能力的方式

题外话

有朋友或许会有疑问,这道面试题是面试官天马行空想像出来的吗?是否有实际的场景跟需要呢?

可以从至少两个开源框架上找到答案

Dubbo 2.6.2及以上

其实上边的方案二,代码来自于Dubbo源码,
相关git issue在此: Extension: Eager Thread Pool

Tomcat

Tomcat自定义的线程池类名与JDK的相同,都叫ThreadPoolExecutor,只是包不同,且Tomcat的ThreadPoolExecutor继承自JDK的ThreadPoolExecutor

Tomcat自定义的队列也叫TaskQueue

Tomcat的ThreadPoolExecutor与TaskQueue核心逻辑、思想与方案二贴的代码几乎一致。实际上,是carryxyh(Dubbo EagerThreadPoolExecutor作者)借鉴的Tomcat设计,关于这一点Dubbo github issue上作者本人也有提及

JDK线程池与Tomcat线程池方案谁最好?

笔者认为,没有哪种方案最好,技术没有银弹,只是在不同视角进行的trade off,在某种场景下最好的方案在另一个场景中可能却导致糟糕的后果。可以从另一个角度考虑:如果有一种放之四海皆准,从各个角度考虑都优于其他技术的存在,那么它的出现必将完全取代它的竞品。而从现实看,显然, JDK线程线与Tomcat线程池都各有场景与发展,并没有出现一方取代另一方的情况,因此不存在哪种方案最好的说法

如果线上环境要使用线程池,哪一种更合适?

线程数与CPU核数、任务类型的关系就不细说了。简单而言,如果不能忍受延迟,期望应用能尽快地为用户提供服务,那么Tomcat线程池可能更适合你;相反,如果你能容忍一些延迟来换取性能上的提升,那么JDK线程池可能会更合适一些

方案一的代码乃笔者随手而敲,未经过任何生产环境的检验跟锤炼,可能存在潜在的BUG,强烈不建议生产环境使用。如果确实有需要,请使用方案二,有知名框架背书,且实现更为安全与优雅,乃首先之姿


最后,感谢这位朋友的面试题,也感谢孤独烟(人称烟哥)分享面试题让大家参与讨论,以及飞奔的普朗克(人称何总)提供的思路,才有了本篇的内容分享,希望大家都能有所收获

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容

  • 线程池不仅在项目中是非常常用的一项技术而且在面试中基本上也是必问的知识点,接下来跟着我一起来巩固一下线程池的相关知...
    不学无数的程序员阅读 3,154评论 0 21
  • 第6章介绍了任务执行框架, 它不仅能简化任务与线程的生命周期管理, 而且还提供一 种简单灵活的方式将任务的提交与任...
    好好学习Sun阅读 1,174评论 0 2
  • 昨晚手机被张小帅拿去查资料,我边看书边等他,打算等他用完之后就来简书更新,谁知,居然一不小心睡着了。 等张小帅把手...
    葛芳阅读 414评论 6 7
  • 王仙娥 古稀老人王仙娥 十五年来素生活 八十万元救儿命 企盼母爱胜病魔 (古稀老人,为救儿子给儿子移肾用去八十万元...
    旖旎i阅读 385评论 3 9
  • 说起来,钟书阁是离我们最近的书店,却是第一次去。 据说重庆钟书阁成了网红打卡地,春节很多人排一两个小时才能进去,不...
    不舍札记阅读 436评论 0 0