ThreadPoolExecutor中的Worker

首先看ThreadPoolExecutor的核心方法execute

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//workerCountOf(c) 就是当前线程数
            if (addWorker(command, true))//步骤1
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//步骤2
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//这里注意为传入的task为null
        }
        else if (!addWorker(command, false))//步骤3
            reject(command);
    }

这个方法很简明,就是线程池的基本原理:
1.线程池数量小于核心线程池数量,则通过addWorker(稍后分析该方法)将任务加入线程池中,如果成功则返回。
2.如果步骤1返回失败,则看线程池是否在running状态,如果在则把任务送进等待队列。如果这一步成功,再检查一次线程池状态,如果线程池不是running状态并且当前任务从队列移除成功,则执行拒绝策略,否则如果worker数量等于0的话,则相当于新建一个线程。如果没有这个调用,当你把coreSize设置为0时,往线程池里添加任务,任务会被放在任务队列了,永远得不到执行。
3.如果addWoker失败,即超过了最大线程池数量,则执行拒绝策略。

BTW,线程池中的有个ctl变量,这个变量的定义是

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

道格老爷子用的这个变量的前3位定义线程池的状态,后29位作为worker的数量。

我们可以看出,整个流程的核心方法就是addWoker,在看它的代码前,先来看一看Worker这个类,该类定义如下

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

先看它的成员变量和构造方法:
Worker的变量中有thread和runnable,那么大概可以猜出Worker就是一个Thread的包装类,负责运行任务。
构造方法中,就使用线程池的ThreadFactory来new了线程。

        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); //不希望在runWorker之前中断
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

看到这个类也继承了AQS,看一下实现的方法,使用的互斥模式,也就是很正常的并发访问控制。state=0代表解锁状态,state=1代表上锁状态。但是要注意的是这个并不是像ReentrantLock一样的重入锁。因为当执行interruptIdleWorkers时(shutdown等会调用),会获取Worker的锁,而我们不希望这时候Worker能获取锁中断线程,因为会增大线程管理和中断控制的难度。
再看为什么初始化Worker时要setState(-1),就是要避免开始执行之前的Worker不会被中断。那么什么时候会中断Worker的线程?就是调用shutdownNow时(shutdownNow不像shutdown需要提前获取worker的锁才能中断线程),里面会调用interruptIfStarted方法,判断了state>=0才会被中断(见下interruptIfStarted方法)。

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {//state=-1不会被中断
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

ok,这个类也实现了Runnable接口,看下它的run方法,一个while循环,先运行自己传进来的任务,如果传进来的任务为null,则从队列里面取任务运行。代码中看到如果要走出这个循环的话,要么Worker的线程被中断,要么getTask=null。顺便一提的是,Worker本身不运行run,而是里面thread通过start运行这个方法。再进去看下getTask方法。

public void run() {
            runWorker(this);
     }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 因为Worker初始化state=-1,这里先设置为0,否则获取不了锁
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//getTask是从队列取出任务,取到就继续运行。
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();//线程池shutdownNow后中断worker
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//运行自己的业务方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask方法就是不断从队列里面取任务。如果是核心线程,就一直取任务;如果是非核心线程,在keepAliveTime没取到任务就返回null。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            //...省略代码
            int wc = workerCountOf(c);
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  //看到如果当前线程数大于coreSize,则启动从队列取任务时采用超时的方法取。

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {//可以看到如果线程池只有一个线程,那么这个线程就算是非核心线程,也不会销毁的。
                if (compareAndDecrementWorkerCount(c))
                    return null;//如果当前线程数大于coreSize,并且队列是空,返回null
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://这里终于看到keepAliveTime的作用了。
                    //如果是非核心线程,在keepAliveTime时间内没有任务进来,那么根据上面的runWorker方法,取到的值是null,那么就跳出循环,线程自动销毁。
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

  分析完Worker后,又回到addWorker方法。每一步的注释如下

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 省略代码...
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))//这里就是看是否是添加的核心线程池数的任务还是核心线程之外的任务。
                    return false;
            // 省略代码...
            }
        }
        //经过一系列校验后
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);//这个就是线程池中的核心内部类。
            final Thread t = w.thread;//取出new好的线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());//线程池的状态

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {//再次检查线程池状态
                        if (t.isAlive()) //如果线程池SHUTDOWN或者RUNNING,但是线程被启动了,抛异常
                            throw new IllegalThreadStateException();
                        workers.add(w);//缓存worker
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;//更新池中最大线程数量
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {如果添加成功
                    t.start();//最终调用runWorker的地方。
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);//如果添加失败就会尝试关闭线程池。
        }
        return workerStarted;
    }

总结:首先大致分析了线程池运行的基本流程,简单来说execute就是一直往队列中扔任务,创建好的Worker不断从中取任务运行。这里要注意的是,线程池在初始化时并没有将核心线程数的线程一起初始化,而是来一个任务,创建一个线程。还有就是当线程数超过核心线程数并且开始销毁多出核心线程数的线程时,有可能销毁的是在小于核心线程数时创建出来的旧线程。
  然后是内部类Worker,这里Worker就是thread和task的一个包装类,它的职能就是控制中断和任务的运行。Worker是一个集成了AQS,实现了Runnable方法的内部类。Worker创建好后,通过new好的线程来运行任务。核心Worker通过while不断从队列中取出任务,任务队列为空线程就阻塞;非核心Worker也是通过while不断取任务,只是有个取任务时keepAliveTime的超时时间,在时间之内取不到的任务的话线程就跳出循环,自动销毁了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,204评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,091评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,548评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,657评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,689评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,554评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,302评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,216评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,661评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,851评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,977评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,697评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,306评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,898评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,019评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,138评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,927评论 2 355

推荐阅读更多精彩内容