线程池ThreadPoolExecutor

前面我们了解了一些线程相关的东西,在Android的开发生涯中谈到线程总是不可避免的要提到线程池这个东西,也许我们线程池用的挺多的,但大部分都只局限于调用API,对于线程池内部实现不甚了解。今天就让我们继续走进源码世界,看看线程池到底是什么样的一个东西。

ThreadPoolExecutor继承了抽象类AbstractExecutorService,AbstractExecutorService实现了接口ExecutorService,ExecutorService继承了接口Executor。所以在分析源码的时候我们从简入繁,从Executor开始一直往下到ThreadPoolExecutor,最终完成此次源码之旅。

Executor以及ExecutorService

接口Executor比较简单,只定义了一个执行方法execute(Runnable command),Runnable将在此方法中得到执行。

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     *在将来的某个时候执行给定的命令。
     *该命令可以在新线程、池化线程或调用线程中执行,具体由{@code Executor}实现决定。
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

ExecutorService在Executor的基础上添加了其它方法定义,大体上可以分为3个模块:
1、线程池关闭相关接口
2、任务提交
3、任务执行

public interface ExecutorService extends Executor {

    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService

抽象类AbstractExecutorService实现了ExecutorService任务提交和任务执行两个模块的接口,我们选取几个具有代表性的方法来看看具体实现:

public abstract class AbstractExecutorService implements ExecutorService {
    ...
    /**
     * Returns a {@code RunnableFuture} for the given runnable and default
     * value.
     *为给定的runnable和默认值返回{@code RunnableFuture}。
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        //将runnable和默认值value封装成FutureTask对象
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        //执行任务ftask ,方法的具体实现在ThreadPoolExecutor类中
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
        try {
            //遍历执行
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            //遍历查找没成功执行的任务
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    //在出现中断或超时时,等待完成或中止任务。
                    try { f.get(); }
                    catch (CancellationException ignore) {}
                    catch (ExecutionException ignore) {}
                }
            }
            return futures;
        } catch (Throwable t) {
            cancelAll(futures);
            throw t;
        }
    }

    private static <T> void cancelAll(ArrayList<Future<T>> futures) {
        cancelAll(futures, 0);
    }

    /** Cancels all futures with index at least j. */
    private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
        //遍历取消任务
        for (int size = futures.size(); j < size; j++)
            futures.get(j).cancel(true);
    }
    ....
}

从AbstractExecutorService的部分源码来看,里面方法的实现相对来说比较简单,都是围绕Runnable、Callable生成FutureTask然后执行FutureTask的逻辑来展开。此处比较重要的东西就是FutureTask类了!所以FutureTask的代码很值得一看,下面我们就以AbstractExecutorService中提到的FutureTask类的接口为切入点,来了解FutureTask的内部实现:

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        //赋值callable,并设置任务状态为新任务
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
         //用Runnable创建并赋值callable,并设置任务状态为新任务
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }


    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              U.compareAndSwapInt(this, STATE, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        //中断线程
                        t.interrupt();
                } finally { // final state
                    U.putOrderedInt(this, STATE, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     *删除并通知所有等待的线程,调用done()方法,并将callable置空。
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);
            }
            else
                LockSupport.park(this);
        }
    }

    public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //任务执行(Callable执行call();Runnable执行run()。)
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
}

从上面FutureTask的源码来看,FutureTask的主要任务是二次封装Callable和Runnable,主要是添加任务相应的新添加、执行中、取消、执行完成等状态相关逻辑,以便之后对任务进度的控制。

ThreadPoolExecutor

最后我们再来看看ThreadPoolExecutor线程池的部分源码:

public class ThreadPoolExecutor extends AbstractExecutorService {
    //主池控制状态ctl是一个原子整数,包含两个概念字段workerCount,指示线程运行状态的有效数量,指示是否运行、关闭等
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    /**
     * 用于保存任务并将任务交给工作线程的队列。
     */
    private final BlockingQueue<Runnable> workQueue;

    /**
     * Set containing all worker threads in pool. Accessed only when holding mainLock.
     * 包含池中的所有工作线程的集合。只有在持有主锁时才能访问。
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize 核心线程数,长期保留在池中的线程数(即使它们是空闲的)
     *                除非设置了{@code allowCoreThreadTimeOut},
     * @param maximumPoolSize  池中允许的最大线程数
     * @param keepAliveTime 当线程的数量大于核心线程数时,多余的空闲线程在终止之前等待新任务的最大时间。
     * @param unit 参数{@code keepAliveTime}的时间单位
     * @param workQueue 用于在执行任务之前保存任务的队列。
     *                 这个队列将只包含由{@code execute}方法提交的{@code Runnable}任务。
     * @param threadFactory 创建新线程时使用的工厂
     * @param handler 当达到了线程边界和队列容量后导致执行被阻塞时,需要使用的handler
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        /*分三步进行:
          1. 如果运行的线程小于corePoolSize,则尝试使用给定的命令作为第一个任务来启动新线程。
          对addWorker的调用会自动地检查runState和workerCount,从而通过返回false来防止在不应该添加线程的情况下添加错误警报。
          2. 如果一个任务可以成功地排队,那么我们仍然需要再次检查是否应该添加一个线程(因为现有的线程在最后一次检查后死亡),
          或者池在进入这个方法后关闭。因此,我们重新检查状态,如果有必要,如果停止,则回滚排队;如果没有,则启动新线程。
          3.如果无法对任务排队,则尝试添加新线程。如果它失败了,我们知道我们被关闭或饱和,因此拒绝任务。*/
        //获取当前线程数量
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {//运行的线程数小于核心线程数
            if (addWorker(command, true))//任务添加到执行队列中
                return;
            //添加执行队列失败,重新获取当前线程数量
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//线程池正在运行状态且任务插入缓存队列成功
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//线程池不在运行状态,将任务从缓存队列中移除
                reject(command);//抛出异常
            else if (workerCountOf(recheck) == 0)//线程池正在的线程数量为0
                addWorker(null, false);//添加空任务
        }
        else if (!addWorker(command, false))//任务插入缓存队列失败,直接创建新任务线程执行任务
            reject(command);//创建新任务线程执行任务失败,抛出异常
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //获取当前线程数量
            int c = ctl.get();
             //获取线程池当前运行状态
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.仅在必要时检查队列是否为空。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
               //获取当前工作线程数量
                int wc = workerCountOf(c);
                //当前工作线程数超出默认线程数,或者超出指定的核心线程数、最大线程数时,返回失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //工作线程数加1,跳出死循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                //CAS由于workerCount更改而失败;重试内循环
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //firstTask封装成Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //线程锁,避免并发运行导致异常
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.锁上后再检查。
                    // Back out on ThreadFactory failure or if shut down before lock acquired.
                    //退出ThreadFactory故障,或者在获取锁之前关闭。
                    int rs = runStateOf(ctl.get())
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable.预先检查t是否可以启动
                            throw new IllegalThreadStateException();
                        //Worker添加到工作集合中
                        workers.add(w);
                        //记录当前最大线程数量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                       //worker标志添加成功
                        workerAdded = true;
                    }
                } finally {
                    //释放锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //添加成功后启动线程运行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                //任务线程启动失败的后续处理
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    private void addWorkerFailed(Worker w) {
        //获取锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //工作线程集合移除Worker 
            if (w != null)
                workers.remove(w);
            //当前工作线程数减一
            decrementWorkerCount();
            //尝试结束线程池
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Attempts to CAS-increment the workerCount field of ctl.
     *增加ctl的workerCount字段。
     */
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            //任务执行代码最终还是在ThreadPoolExecutor类中
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

    final void runWorker(Worker w) {
         //获取w对应的线程和任务
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        //置空w的任务
        w.firstTask = null;
        w.unlock(); // allow interrupts.允许打断
        boolean completedAbruptly = true;
        try {
            //开启循环,执行Worker中的任务和缓存队列workQueue中的任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //如果线程池停止了,确保线程被中断;如果没有,请确保线程没有中断。
                //在第二种情况下需要重新检查,以处理清除中断时出现的shutdownNow
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //任务执行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //任务技术自增
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //当 getTask()返回null时,说明此线程可以回收了
            completedAbruptly = false;
        } finally {
            //后续线程回收处理
            processWorkerExit(w, completedAbruptly);
        }
    }

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

    /**
     * 执行阻塞或定时等待任务,取决于当前的配置设置。如果在下面的情况下,这个worker必须退出,并返回null:
     * 1。超过maximumPoolSize(调用setMaximumPoolSize设置)数量的worker。
     * 2。线程池停了。
     * 3。线程池关闭,队列为空。
     * 4。这个worker超时等待一个任务,超时的worker在超时等待之前和之后都可能终止
     *   (即{@code allowCoreThreadTimeOut || workerCount > corePoolSize}),
     *  如果队列不是空的,那么这个worker不是池中的最后一个线程。
     *
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        //死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.仅在必要时检查队列是否为空。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //获取工作线程数
            int wc = workerCountOf(c);

            // Are workers subject to culling?      workers 会被回收淘汰吗?
            //允许核心线程被回收||工作线程数>核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //工作线程数超出最大值,工作线程数超出核心线程数,等待超时,缓存队列为空,这些情况下返回null
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //worker线程如果可以被回收,则获取任务时需要设置超时
                //worker线程如果不可以被回收,且workQueue为空则一直等待
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //返回任务
                if (r != null)
                    return r;
                //等待超时
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
}

从实现在AbstractExecutorService 类中的submit(Runnable task)方法到实现在ThreadPoolExecutor类中的execute(Runnable command)方法,然后是ThreadPoolExecutor类中的addWorker(Runnable firstTask, boolean core)方法,最后到ThreadPoolExecutor类中runWorker(Worker w)方法,任务从提交到运行的所有逻辑都包含在其中。下面我们总结一下各个方法各自做了什么,最终达到理解线程池整个架构实现:
1、AbstractExecutorService 类中的submit(Runnable task)方法:
此方法将task封装成FutureTask类,然后执行execute(Runnable command)方法并返回,此方法比较简单没有什么复杂的东西。需要注意的是FutureTask类对Runnable 的二次封装主要是添加了任务的各种状态(包括新添加、执行中、取消、执行完成等状态),方便管理。
2、ThreadPoolExecutor类中的execute(Runnable command)方法:
这个方法的第一个作用是确认是否需要创建新任务线程Worker:判断当前线程数量是否达到核心线程数量,没达到则创建新的任务线程Worker来执行任务;达到了则将任务添加到缓存队列workQueue中等待闲置线程执行,当添加缓存队列workQueue失败时,直接创建新Worker执行任务。
另一个作用是线程池关闭或饱和,抛出异常通知。
3、ThreadPoolExecutor类中的addWorker(Runnable firstTask, boolean core)方法:
这个方法的主要任务是将Runnable 任务封装到Worker(包含了线程和任务)中,并触发了线程的运行。
4、ThreadPoolExecutor类中runWorker(Worker w)方法:
步骤3中Worker执行后最终都会来到runWorker方法,这个方法的主要作用是拿到Worker 中的线程,然后用这个线程来执行Worker以及缓存队列workQueue中的任务。线程的复用此方法中得到体现。
另外从这个方法的实现中我们还可以知道:任务线程Worker 分两种,一种是可回收的,另一种是不可回收的,这两者的主要区分原则就是当前工作线程数是否达到默认设置的核心线程数。所以当没那么多任务的时候闲置线程就可以被回收了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,820评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,648评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,324评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,714评论 1 297
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,724评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,328评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,897评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,804评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,345评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,431评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,561评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,238评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,928评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,417评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,528评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,983评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,573评论 2 359