大厂面试官:JDK 线程池中如何不超最大线程数快速消费任务?

01、线程池参数

我们这里讲解以 ThreadPoolExecutor#execute(Runnable runnable) 举例, 这里先说下线程池的一些参数

本篇只是说明上述问题, 不会对线程池做详细讲解

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {...}

corePoolSize

线程池中的核心线程数量, 如果没有全局设置池内线程的过期时间, 池内会维持此数量线程

maximumPoolSize

线程池中的最大线程数量, 当核心线程都在运行任务, 并且阻塞队列中任务数量已满, 此时会创建非核心线程

keepAliveTime & unit

线程池中线程过期时间以及时间单位

workQueue

存放线程池内任务的阻塞队列, 如 ArrayBlockingQueue、LinkedBlockingQueue...

threadFactory

创建线程池中线程的线程工厂, 可以在创建线程时初始化优先级、名称、守护状态...

handler

当线程池中全部线程都在运行, 阻塞队列也满的时候, 会将添加的任务执行拒绝策略, JDK 线程池中实现了四种拒绝策略, 默认 AbortPolicy, 抛出异常

02、线程池任务添加流程

相信大家在网上看到过许多类似的线程池执行流程图哈, 这里还是简要赘述下, 源码如下:

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    } else if (!addWorker(command, false))
        reject(command);
}

1、线程池提交任务首先判断当前线程数是否大于核心线程数, 否则创建核心线程执行任务

2、如果当前线程超过了核心线程数, 判断阻塞队列是否已满, 否则将任务添加到队列中

3、如果阻塞队列已满, 判断当前线程是否大于最大线程数, 否则创建非核心线程执行任务

4、如果当前线程大于或等于最大线程数, 执行拒绝策略

image

线程池任务提交流程

这道问题的意图就是要将第二步进行改写

如果当前线程大于核心线程数, 不将任务放入阻塞队列, 而是创建非核心线程执行任务

举例说明一下:

public static void main(String[] args) {
    ThreadPoolExecutor threadPoolExecutor =
            new ThreadPoolExecutor(1, 3, 60,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue(10));

    for (int i = 0; i < 7; i++) {
        threadPoolExecutor.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "-执行任务");
            LockSupport.park();
        });
    }
    threadPoolExecutor.shutdown();
    /**
     * pool-1-thread-1执行任务
     */
}

看到这段代码, 正常情况下只会有一个任务会被执行, 其余任务会被放置阻塞队列中

而我们需要做的就是, 发现池内线程大于核心线程数, 不放入阻塞队列, 而是创建非核心线程进行消费任务

本地代码实现参考 Dubbo 源码中 EagerThreadPoolExecutor, 确实能实现对应效果, 这里就不演示了, 一起看一下 Dubbo 如何做的

03、Dubbo 中实现的快速消费

Dubbo 中涉及到的类有两个, EagerThreadPoolExecutor[2]TaskQueue[3]

这里贴一下重点代码

3.1 TaskQueue

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {
  ...
    // 队列中持有线程池的引用
    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
    ...
        // 获取线程池中线程数
        int currentPoolThreadSize = executor.getPoolSize();
        // 如果有核心线程正在空闲, 将任务加入阻塞队列, 由核心线程进行处理任务
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

       /**
        *【重点】当前线程池线程数量小于最大线程数
        * 返回false, 根据线程池源码, 会创建非核心线程
        */
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // 如果当前线程池数量大于最大线程数, 任务加入阻塞队列
        return super.offer(runnable);
    }
}  

存在一个疑点, getSubmittedTaskCount() 是如何获取提交任务数量的?

这里就需要看一下 EagerThreadPoolExecutor 实现了, 也比较简单, 只是 重写了线程池的两个方法: afterExecute()、execute()

3.2 EagerThreadPoolExecutor

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * task count
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    /**
     * @return current tasks which are executed
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}

EagerThreadPoolExecutor 继承了 ThreadPoolExecutor, 在 execute() 上做了个性化设计

并在线程池内新增了一个任务数量的字段, 是一个原子类, 添加任务时自增, 任务异常及结束时递减

这样就能保证 TaskQueue#offer(Runnable runnable) 做出逻辑处理

原文链接:
https://mp.weixin.qq.com/s/3txfaq9pbmIG4P3hBd6Cig

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,997评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,603评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,359评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,309评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,346评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,258评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,122评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,970评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,403评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,596评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,769评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,464评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,075评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,705评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,848评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,831评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,678评论 2 354

推荐阅读更多精彩内容