Java 线程池


Java线程池是java cocurrent包下提供的类,使用非常方便。本文希望整理下Java 线程池相关的知识以及其实现原理。


类结构

Executors, ExecutorService, ThreadPoolExecutor等概念,各个类容易混淆,其实类关系整理如图。ExecutorService接口扩展了Executors接口,在基本的execute()方法上,提供了一些线程池管理方法、同步/异步获取线程执行结果的方法。


屏幕快照 2017-06-28 下午11.10.32.png

通常,我们使用线程池的模版代码如下

ExecutorService executorService = Executors.newFixedThreadPool(5,
                new ThreadFactory() {
                    private AtomicInteger count = new AtomicInteger(0);
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread("task thread-" + count.incrementAndGet());
                    }
                });
 for (int i = 0; i < 10; i++) {
      executorService.submit(new Task());
}

从类图结构就可以很好的理解了, Executors即为工厂方法类,提供多种类型线程池的构造。ExecutorService即为所有线程池的接口类。
我们使用的线程池可以分为两类ScheduledExecutorService、ThreadPoolExecutor。实际所有线程池均为其实现类或者wrapper类。所以分析线程池可以从这两类入手。


ThreadPoolExecutor

常用的代表有Executors.newFixedThreadPool(),newCachedThreadPool() 等,构造出来的都是设定了不同参数的ThreadPoolExecutor实例。
ThreadPoolExecutor的核心参数有:

  • corePoolSize
  • maxPoolSize
  • keepAliveTime
  • workingQueue: 同步队列,无界队列,有界队列

具体每个参数的定义可以看Java doc,文档对于何时创建线程、何时将任务排队、以及排队策略的说明都写的非常详细。本人经常忘记的一点就是线程创建的策略:

  1. alive线程数< corePoolSize,来一个新task则新建一个线程
  2. alive线程数>corePoolSize,则新task先排队,排队排不下了再尝试新建线程。
  3. 排队时队满了,尝试新建线程。线程数目到达maxPoolSize,根据定义的策略方式来处理后面继续到来的任务,比如直接拒绝任务。

从基本使用代码 executorService.submit(new Task());入手分析,threadPool到底做了什么?是如何让一个任务跑起来的?

submit

代码中可以看到,逻辑为将传入的runnable封装成一个runnableFuture,再调用execute方法执行该任务。runnableFuture可提供接口,将任务取消执行等。其他相关runnableFuture变种可以提供一些异步获取task执行返回结果的接口,具体需要看callable,future的说明。

 /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

execute

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
        /* worker线程数 < corePoolSize,直接新建一个线程,且该task作为新建线程的第一个任务直接执行*/
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
        /*线程池running状态且队列中还能排下该task,则task进入队列。此时有二次检查,若二次检查不过,则task移除队列,成功移出后,调用reject策略。否则,task成功在排队队列中等待执行,若此时worker线程数=0,则新建个线程来处理该任务*/
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                /* 注意此时addWorker里第一个参数为null
                addWorker(null, false);
        }
       /*尝试直接线程,该task作为该新建线程的第一个任务*/
        else if (!addWorker(command, false))
            reject(command);
    }

看完有3个疑问:

  1. 线程数目>corePoolSize时且队列中有剩余槽位时,task在队列中等待执行,则何时会被worker线程执行?
    2.线程数目>corePoolSize且队列中无剩余槽位时,task直接由新建一个线程执行(�< maxPoolSize),那是否是造成了后提交的task是可能先执行的?

addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容