一:execute相关逻辑
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* 在将来的某个时间执行给定的任务。该任务可能执行在一个新的线程里或者一个已经缓存的线程里。
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
* 如果任务不能提交以供执行,要么因为此执行器已关闭,要么因为容量已满,
* 该任务将被当前的RejectedExecutionHandler执行。
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 分3步进行:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果当前运行中的线程数小于corePoolSize,尝试使用给定的指令作为其第一个任务来启动一个新的线程,
* 对addWorker的调用以原子方式检查runState和workerCount,
* 因此,通过返回false来警告,可以防止在不应该添加线程时错误添加。
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果一个任务可以被成功的排队,然后,我们仍然需要再次检查是否应该添加一个线程
* (因为自上次检查以来,已有线程已死亡),或者自进入此方法以来,池是否已关闭。
* 因此,我们重新检查状态,如有必要,在停止排队时回滚排队,或者在一个线程都没有时启动一个新线程。
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 如果我们不能将任务排队,将尝试添加一个新的线程。
* 如果添加失败,我们知道我们已经被关闭或饱和,因此拒绝这个任务。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//第一步:当前线程数量小于corePoolSize,则调用addWorker()直接创建新的线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//第二步:到这里就说明线程数量大于等于corePoolSize了
if (isRunning(c) && workQueue.offer(command)) {
//当前状态是运行中,并且成功将任务加入workQueue队列。
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//再次检测状态,如果不在运行中了,则移除刚添加的任务,并执行拒绝处理
//如果说offer进去了,然后shutdown或shutdownNow了,就要将这个任务移除。
reject(command);
else if (workerCountOf(recheck) == 0)
/**
* 情况1: 是RUNNING状态,但是worker数量为0。
* 第一步判断当前线程数大于corePoolSize,但是执行到这里的时候核心核心线程全部超时终止了,
* 这时候如果不创建一个新的worker,那这个任务将一直停留在queue中,直到下次有提交任务触发创建worker。
*
* 情况2:SHUTDOWN状态,但是remove(command)失败。
* 这个command还存在于任务队列中,SHUTDOWN是允许任务队列中已存在的任务被执行的,但是没有worker线程了,
* 就需要添加一个新的worker来将这个任务队列里的任务执行掉。
* 下面的addWorker逻辑其实也有这种状态的处理逻辑。
*
* 注意,这里addWorker初始任务参数为null,因为该任务已经加到队列中了,
* 如果将command作为新的线程初始任务,加上队列里的任务,就会重复执行两次了。
* */
addWorker(null, false);
}
else if (!addWorker(command, false))
//第三步:直接创建新的线程,如果失败,执行拒绝处理
reject(command);
}
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
* 根据当前池状态和给定边界(核心或最大值)检查是否可以添加新的worker。
* 如果可以,则相应的调整worker计数,并且如果可能的话,一个新的worker将创建并启动,
* 将firstTask作为其第一个任务来运行。如果pool已关闭或者有资格关闭则该方法返回false。
* 当线程工厂在请求创建一个线程失败时也将返回false。如果线程创建失败,
* 或者是由于线程工厂返回null,或者是由于异常(通常是Thread.start()的OutOfMemoryError),
* 我们将完全回滚。
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
* 新线程应该首先运行的任务(如果没有则为null)。
* 当线程少于corePoolSize时(在这种情况下,我们总是启动一个线程),
* 或者当队列已满时(在这种情况下,我们必须绕过队列),
* 将使用初始的第一个任务(在方法execute()中)创建工作线程以绕过队列。
* 最初空闲线程通常是通过prestartCoreThread创建的,或用于替换其他正在死亡的workers线程
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* 如果为true,则使用corePoolSize作为绑定,否则使用maximumPoolSize。
* (这里使用boolean标志而不是值来确保在检查其他池状态后读取新值)
*
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//这里使用了标签逻辑,类似于goto
retry:
//这个for循环是用来检查状态和更新计数器的
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//获取当前状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
/**
* 这里和execute方法第二大步里的一种情况是对应的。
* 就是说当前状态是SHUTDOWN,但是任务队列不为空,可是没有可用的worker线程了,
* firstTask也为空,表示本次创建worker不是为了执行新提交的任务,
* 而是要将将任务队列里遗留的任务给处理掉,这时候是不能return掉的。
* */
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//如果当前线程数大于等于限制数量,则不创建
return false;
if (compareAndIncrementWorkerCount(c))
//通过CAS操作来增加当前线程数,如果成功则跳出这两个for循环,继续往后执行了。
break retry;
c = ctl.get(); // Re-read ctl 重新获取ctl
if (runStateOf(c) != rs)
//新获取的状态和前面获取的rs不一致了,再次从头循环判断
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//compareAndIncrementWorkerCount 的CAS失败因为worker数量改变了,则再次执行内部循环
}
}
//开始创建worker
boolean workerStarted = false;//worker是否启动成功
boolean workerAdded = false;//worker是否添加成功
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock. 在保持锁的同时重新检查
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//在ThreadFactory出现故障或在获取锁之前关闭时退出。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果是RUNNING状态 或者 SHUTDOWN状态但是firstTask为null(要关闭,但是没有worker,但是任务队列不为空)
if (t.isAlive()) // precheck that t is startable
//如果新创建的已经被启动了,则抛出异常
throw new IllegalThreadStateException();
workers.add(w);//将新创建的worker加到workers集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//当前个数大于largestPoolSize,则更新之
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果线程启动失败,则之后失败处理逻辑
addWorkerFailed(w);
}
return workerStarted;
}
/**
* Removes this task from the executor's internal queue if it is
* present, thus causing it not to be run if it has not already
* started.
* 如果存在此任务,则将其从执行器的内部队列中移除,从而导致尚未启动的任务无法运行。
*
* <p>This method may be useful as one part of a cancellation
* scheme. It may fail to remove tasks that have been converted
* into other forms before being placed on the internal queue. For
* example, a task entered using {@code submit} might be
* converted into a form that maintains {@code Future} status.
* However, in such cases, method {@link #purge} may be used to
* remove those Futures that have been cancelled.
* 此方法可作为取消方案的一部分使用。它可能无法删除在放入内部队列之前已转换为其他形式的任务。
* 例如,使用submit输入的任务可能会转换为保持Future状态的形式。
* 然而,在这种情况下,可以使用方法purge删除已取消的Futures。
*
* @param task the task to remove
* @return {@code true} if the task was removed
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); //尝试终止,终止的主要逻辑,后面再细看。
return removed;
}
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
* 为给定命令调用被拒绝的执行处理程序。
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
}
总结:
- 总体步骤代码注释已经很清晰了。
- 需要格外注意的是shutdown状态,但是任务队列不为空,
且没有worker线程了,这种情况下是允许创建新的worker的。
二: runWorker 任务、线程执行逻辑
execute创建的worker是一个Runnable,其run方法被委托给了runWorker方法。
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
* worker运行循环。重复从队列中获取任务并执行它们,同时处理许多问题:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
* 我们可以从一个初始任务开始,在这种情况下,我们不需要得到第一个任务。
* 否则,只要池在运行,我们就可以从getTask获取任务。
* 如果返回null,则worker将由于池状态或配置参数的更改而退出。
* 其他退出源于外部代码中的异常抛出,在这种情况下completedAbruptly保持不变,
* 这通常会导致processWorkerExit替换此线程。
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
* 在运行任何任务之前,都会获取锁以防止任务执行时发生其他池中断,
* 然后我们会确保除非池停止,否则该线程不会设置其中断。
*
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
* 每个任务运行之前都会调用beforeExecute,这可能会引发异常,在这种情况下,
* 会导致线程在不处理该任务的情况下死亡(使用CompletedThroughtly true中断循环)。
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
* 假设beforeExecute正常完成,我们运行任务,收集其抛出的任何异常以发送给afterExecute。
* 我们分别处理RuntimeException、Error(这两个规范都保证我们可以捕获)和任意的Throwable。
* 因为我们无法在Runnable.run中重新抛出Throwable,所以在退出时将它们包装在Error中(到线程的UncaughtExceptionHandler)。
* 任何抛出的异常都会保守地导致线程死亡。
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
* task.run 完成后,我们调用afterExecute,其也会抛出一个异常,其也会导致线程死亡。
* 根据JLS第14.20节,即使是task.run抛出的异常,此异常也将生效。
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
* 异常机制的最终效果是afterExecute和线程的UncaughtExceptionHandler具有尽可能准确的信息,
* 我们可以提供有关用户代码遇到的任何问题的信息。
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//state初始值为-1, unlock之后,state就从-1变成0了,就可以中断了
w.unlock(); // allow interrupts
boolean completedAbruptly = true;//用户任务是否抛出异常
try {
while (task != null || (task = getTask()) != null) {
//执行任务的第一步,先获取锁,这个很重要。锁定状态表示该worker正在执行任务。
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果pool正在停止,确保线程已经被中断;否则,确保线程没有被中断。
// 这需要在第二种情况下进行重新检查,以在清除中断的同时处理与shutdownNow的竞争
// 啥意思呢? 就是说可能执行到这里的时候发现被shutdownNow,状态已经为STOP了,要重新中断线程。
// 那为什么shutdownNow了,还能走到这里呢?总的来说是因为shutdownNow和runWorker在两个线程里几乎同时执行。
// 原因1: shutdownNow只是中断已启动的线程,如果新的worker还未第一次unlock,state还是-1,就不会被中断。
// 原因2: 在shutdownNow的drainQueue之前,getTask已经拿到了任务。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && //该方法是会clear掉中断状态的
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//重新中断线程
wt.interrupt();
try {
//先执行钩子方法beforeExecute
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务,并捕获各种异常
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//用户任务执行完后,再次执行钩子方法afterExecute
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;//顺利执行到这里代表没有抛出异常
} finally {
//如果该worker没有再获取到task,则执行退出逻辑。
processWorkerExit(w, completedAbruptly);
}
}
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 根据当前的配置设置,执行任务的阻塞或定时等待,如果由于以下任何原因必须退出此工作进程,则返回null:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 线程数超过了maximumPoolSize。
*
* 2. The pool is stopped.
* pool已停止。
*
* 3. The pool is shutdown and the queue is empty.
* pool被关闭并且queue是空的。
*
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
* 此worker在等待任务时超时,超时的worker在超时等待前后都会终止,
* 如果队列非空,则此工作线程不是池中的最后一个线程。
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
* 如果worker必须退出则返回null,这种情况下workerCount将递减
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? 上次执行poll是否超时
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//rs>=SHUTDOWN && workQueue为空,执行了shutdown,并且队列已经处理完,则返回null,让线程(worker)消亡。
//rs=STOP 表示执行了shutdownNow,返回null,让线程(worker)消亡
decrementWorkerCount();//减少当前worker数量并返回null
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling? worker是否会被淘汰
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//(当前线程数>最大线程数 || (允许淘汰线程 && 已超时)) && (当前线程数>1 || 队列为空)
if (compareAndDecrementWorkerCount(c)) //减少当前worker数量并返回null
return null;
continue;
}
try {
//如果没有任务,线程在此处,阻塞到workQueue该阻塞队列上。
// 如果该线程可以被淘汰,则使用带超时时间的poll方法,否则使用take一直阻塞到有任务添加到queue中。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
//捕获中断异常,shutdown时会中断这些阻塞的线程。
//被中断了,说明不是超时
timedOut = false;
}
}
}
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
* 为即将死亡的worker进行清理和簿记。仅从worker线程调用。
* 除非设置了completedAbruptly,否则假定workerCount已被调整以考虑退出。
* 此方法从工作线程集中删除线程,如果由于用户任务异常而退出工作线程,
* 或者如果正在运行的工作线程少于corePoolSize,或者队列为非空但没有工作线程,
* 则可能终止池或替换工作线程。
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
* worker是否由于用户任务异常导致死亡
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();//尝试终止线程池,后面终止的逻辑再细看,知道这时候会调用该方法即可。
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//状态小于STOP,代表是RUNNING或者SHUTDOWN
if (!completedAbruptly) {
//如果不是用户任务报错而执行worker的退出逻辑
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
//如果min==0,并且workerQueue不为空,则最小线程数为1,因为要有线程去执行queue中的任务呀
min = 1;
if (workerCountOf(c) >= min)
//如果当前线程数大于等于min,则无须处理。
return; // replacement not needed
}
//走到这里就说名上面的return没有执行,要添加一个worker了,然后也不给初始任务。
//情况1:用户任务报错执行了销毁,则创建一个新的worker来代替
//情况2:RUNNING状态,核心线程允许超时销毁,并且也销毁完了,当前线程数是0,但是任务队列非空,要创建worker来执行。
//情况3:shutdown状态,当前线程数是0,但是任务队列非空,要创建worker来执行。
//后两种逻辑其实和execute方法中的第二大部的处理一个意思。
addWorker(null, false);
}
}
}
总结:
runWorker执行的步骤大体如下几步:
- worker.unlock(),将worker的状态从-1变为0(从未启动变为已启动未锁定)。
- while(getTask()) 循环获取任务
- 2.1:worker.lock() 先锁定,表示正在执行任务,其它业务不用中断我。
- 2.2:beforeExecute() 执行钩子方法,如果报错,则不会继续执行,线程将跳出循环执行终止逻辑。
- 2.3:task.run() 执行用户任务,捕获异常,并将异常传递给afterExecute方法。
- 2.4:afterExecute(task, throw)执行钩子方法,如果报错,则不会继续执行,线程将跳出循环执行终止逻辑。
- 2.5:worker.unlock() 释放锁,表示目前没有再执行业务。
- processWorkerExit 跳出了while循环,没有获取到任务(超时时间已过,或者已关闭),线程执行终止逻辑。
特别注意:worker.lock之后对于STOP状态的判断逻辑,注释写的很清楚喽。
getTask获取任务:
- 如果是SHUTDOWN状态 并且 workerQueue为空,说明执行了shutdown,并且任务队列也已经处理完了,
则返回null,让线程消亡。 - 如果是STOP状态,表示执行了shutdownNow,返回null,让线程消亡。
- 根据该线程是否允许超时,决定使用pool(keepAliveTime)还是take方法。
processWorkerExit任务退出逻辑:
- 要尝试调用tryTerminate() 终止线程池。
- 要判断下是否需要再新建一个不带初始任务的Worker。
三:关闭逻辑shutdown
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
* 启动有序的关闭,执行以前已提交的任务,但不接受新任务。
* 如果调用时已经关闭,没有其它影响。
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
* 此方法不会等待以前提交的任务完成执行。
* 使用awaitTermination来完成此操作。
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有权限关闭线程
checkShutdownAccess();
//状态改为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲的worker(注意,正在执行任务的worker是不会被中断的)
interruptIdleWorkers();
// 该类中onShutdown为空,啥都不执行
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();//尝试终止
}
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
* 中断可能正在等待任务的线程(表示未被锁定--如果在执行任务,worker首先会将自己进行锁定,runWorker中的逻辑),
* 以便它们可以检查终止或配置更改。
* 忽略SecurityExceptions(在这种情况下,某些线程可能会保持不中断)。
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
* 如果是true,最多中断一个worker。
* 只有在以其他方式启用终止但仍有其他worker时,才从tryTerminate调用此函数。
* 在这种情况下,如果所有线程当前都在等待,
* 则最多会中断一个等待的工作线程来传播关闭信号(这个传播关闭信号的很关键,后面在tryTerminate中就会调用体现)。
* 中断任意线程可以确保自关闭开始以来新到达的worker最终也会退出。
* 为了保证最终的终止,总是只中断一个空闲的工作进程就足够了,
* 但是shutdown()会中断所有空闲的工作进程,
* 这样多余的工作进程就会迅速退出,而不是等待一个散乱的任务完成。
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
//未被中断,并且tryLock成功表示未在执行任务(runWorker中执行任务的时候会先lock的)
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
//如果是就循环处理一次,break出去不再继续循环,但是可不一定就能成功中断一个线程,比如说所有的worker都在执行任务,tryLock都失败。
//这时候其实也就没有传播了。
break;
}
} finally {
mainLock.unlock();
}
}
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
* 如果(状态为SHUTDOWN且池和队列为空)或(状态为STOP且池为空),则转换为TERMINATED状态。
* 如果有资格终止,但workerCount不为零,则中断一个空闲worker,以确保关闭信号传播。
* 必须在可能导致终止的任何操作之后调用此方法 -- 减少worker数量或者关闭期间从queue中移除任务。
* 该方法是非私有的,允许从ScheduledThreadPoolExecutor进行访问。
*
* 该方法是最终终止线程池的方法,
* 状态变化从SHUTDOWN->TIDYING->TERMINATED 或者 STOP->TIDYING->TERMINATED
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
//这里的三个if条件表明不能终止或不用继续执行:
//1. isRunning(c) 如果是RUNNING状态,则返回退出,不终止
//2. runStateAtLeast(c,TIDYING) 如果状态至少是TIDYING,就是说状态为TIDYING或者TERMINATED,则返回退出。
// 如果为TIDYING表示已经有一个线程调用该方法走到后面的逻辑了,该线程就退出不用处理了;如果为TERMINATED表示已经终止了,直接退出。
//3. runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty() SHUTDOWN状态是允许将队列中的任务执行完的,所以不能终止,则返回退出。
return;
//过了上面的if走到这里,表明可以终止了
//原因1:调用shutdown,目前状态为SHUTDOWN,并且queue为空了。
//原因2:调用shutdownNow,目前状态是STOP(STOP时queue肯定空了,看shutdownNow方法)。
if (workerCountOf(c) != 0) { // Eligible to terminate
/**
* 当前worker数量不为0,哪种场景呢?
* shutdown时,调用了interruptIdleWorkers(中断并关闭了空闲的worker),
* 那不空闲(正在执行任务,已经被lock)的worker还会继续存在,这里就可能不为0了。
*
* 传播终止信号(propagate shutdown signals)的逻辑。
* 这里的ONLY_ONE是true, interruptIdleWorkers的注释描述ONLY_ONE时说,用于传播终止的信号。
* 该方法tryTerminate是唯一一个执行终止的方法,但是这里就要return出去不继续执行了,
* 那么就需要传播给其它方法来再次调用此方法来进行终止,这就是传播的概念。
* 有那些方法会调用该方法呢?注释写的很清楚:可能导致终止的任何操作之后调用此方法 -- 减少worker数量或者关闭期间从queue中移除任务。
* 有一下几个方法:
* 1. shutdown() --关闭
* 2. shutdownNow() --立即关闭
* 3. addWorkerFailed(Worker) --添加worker失败
* 4. processWorkerExit(Worker, boolean) --worker执行完退出
* 5. remove(Runnable) --移除任务
* 6. purge() --移除任务
*
* 1. 剩余的worker都已经空闲,目前阻塞在queue.take(),则interruptIdleWorkers会中断其中一个,
* 然后再调用到这里,然后再去中断其中一个worker。直到所有都中断死亡,再跳过该if往后执行。
* 2. 剩余的worker都还在处理任务,interruptIdleWorkers一个都中断不成功,这里也就返回了,
* shutdown方法也就执行完了,当前状态就处在SHUTDOWN。那何时才能真正终止完成呢?
* runWorker()中,当worker执行完当前任务,再次getTask()获取任务时,当前状态会导致getTask()返回null,
* 会继续执行到processWorkerExit中,再被执行到本方法。直到所有的worker都按这套路走一遭,最后一个就跨过这个if往后执行了。
* 3. 剩余的worker有的已经空闲,有的还在处理,那就看interruptIdleWorkers循环的第一个worker是什么状态了。
*
* */
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//状态改为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();//钩子方法,该方法本类中为空方法,子类可以自己实现
} finally {
//状态改为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//给所有等待在该终止Condition上的线程发信号。
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
//这个方法就是其它线程可以来等待线程池终止,然后await在Condition上,
//等着终止完被唤醒。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
}
总结:
shutdown分为一下几步:
- mainLock.lock() 上锁
- 校验是否有权限关闭线程。
- 将运行状态改为SHUTDOWN。
- 中断空闲的worker,非空闲的(被lock的)不会被中断。
- 执行onShutdown()钩子方法。
- mainLock.unlock() 解锁
- 执行tryTerminate()终止方法,该方法返回后可不一定线程池就终止了。
tryTerminate方法还是比较重要的,东西不多,直接看代码里的注释吧。
awaitTermination允许其它线程等待线程池终止时被通知。
四:关闭逻辑shutdownNow
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
* 尝试停止所有正在执行的任务,停止正在等待的任务的处理,
* 并返回正在等待执行的任务的列表。从该方法返回时,
* 这些任务将从任务队列中排出(删除)。
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
* 此方法不会等待主动执行的任务终止。使用waittermination完成此操作。
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
* 除了尽最大努力尝试停止处理积极执行的任务之外,没有其他保证。
* 此实现通过{@link-Thread#interrupt}取消任务,
* 因此任何未能响应中断的任务可能永远不会终止。
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查权限,和shutdown一样
checkShutdownAccess();
//状态改为STOP
advanceRunState(STOP);
interruptWorkers();
//排空queue,并返回出去,所以一旦shutdownNow了,状态改为STOP了,queue就空了。
//execute的时候也就不会再往里offer了,即使临界状态offer进去了,也会再次判断给移除掉。
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止,和shutdown一样,上面已经分析过了
tryTerminate();
//返回所有queue中未执行的任务
return tasks;
}
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
* 中断所有的线程,尽管时活跃的。忽视SecurityExceptions。
* (这种情况下一些线程可能仍然未中断,可能阻塞在一些未想用中断的方法上,比如说AQS的acquireQueued就会吞掉中断响应)
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//只中断已启动的线程,这里就和runWorker中的再次判断是否时STOP状态的逻辑呼应上了。
//可能有些线程这时还未启动,然后立即又被启动了,就是shutdownNow和runWorker方法几乎同时执行导致。
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
}
总结:
shutdownNow和shutdown差别不大。
- 将状态改为STOP。
- 中断所有的已启动的线程(不管是否空闲),但是未启动的中断不了,所以再runWorker方法中有一段判断STOP状态的逻辑。
- 会将任务队列中的未执行的任务返回出去。