前言
JDK中为我们提供了一个并发线程框架,它是的我们可以在有异步任务或大量并发任务需要执行时可以使用它提供的线程池,大大方便了我们使用线程,同时将我们从创建、管理线程的繁琐任务中解放出来,能够更加快速的实现业务、功能。合理的使用线程池可以为我们带来三个好处:
- 降低资源消耗。通过重复利用已创建的线程来减少线程创建与销毁的开销。
- 提高响应速度。当任务到达时,任务可以不需要等待线程创建就直接运行。
- 提高线程的可管理性。线程是稀缺资源,不可能无限的创建,不仅会消耗大量的系统资源,还会影响系统的稳定性,通过使用线程池可以对线程进行分配、监控等。
ThreadPoolExecutor就是JDK提供的线程池的核心类,我们使用的Executors框架底层就是对ThreadPoolExecutor进行了封装。下面我们一起通过分析ThreadPoolExecutor的源码来了解JDK线程池的实现原理。
线程池的创建-ThreadPoolExecutor的构造
创建一个ThreadPoolExecutor需要传入一些参数,我们常用的一种ThreadPoolExecutor的构造函数如下所示。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue);
这些参数对应的是ThreadPoolExecutor的成员变量,我们通过这些内部成员变量也可以先一窥ThreadPoolExecutor的特性。ThreadPoolExecutor的主要成员变量如下:
private volatile ThreadFactory threadFactory;//创建线程的工厂类
private volatile RejectedExecutionHandler handler;//当线程池饱和或者关闭时,会执行该句柄的钩子(hook)
private volatile long keepAliveTime;//空闲线程的等待时间(纳秒)
private volatile boolean allowCoreThreadTimeOut;//默认为false,此时核心线程会保持活跃(即使处于空闲状态);如果为true,则核心线程会在空闲状态超时等待keepAliveTime时间等待任务
private volatile int corePoolSize;//线程池中保持的线程数,即使有些线程已经处于空闲状态,任然保持存活
private volatile int maximumPoolSize;//线程池最大值,最大边界是CAPACITY
private final BlockingQueue<Runnable> workQueue;//等待执行的任务队列
private final HashSet<Worker> workers = new HashSet<Worker>();//线程池中包含的所有worker线程
线程池创建后,来了一个新的任务需要执行,此时我们调用
public void execute(Runnable command)
方法,线程池此时指派一个线程来执行该任务,我们通过跟踪分析该方法的源码,理解线程池的运行、管理细节。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//线程池的状态控制变量
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
ctl是线程池的状态控制变量。该变量是一个AtomicInteger类型,它包装了两个域:workerCount,活跃的线程数;runState,表示线程池状态,RUNNING,SHUTDOWN等。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
workerCount由29位表示,因此线程池的线程数最多有(2^29-1)。runStae用来表示线程池 中的线程在它的整个生命周期中的不同状态,现在线程池提供了5中状态:
/** runState会随着时间单调递增,线程池的运行状态有以下这些转换:
* RUNNING -> SHOUTDOWN 线程池显示调用shutdown()方法
* (RUNNING or SHUTDOWN) -> STOP
* 调用shutdownNow()
* SHUTDOWN -> TIDYING
* 任务队列与线程池都为空时
* STOP -> TIDYING
* 线程池为空
* TIDYING -> TERMINATED
* 当钩子terminated()执行完毕
*
// runState is stored in the high-order bits
//线程池收到了一个新的任务,并且执行队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
//此时 线程池不接受新的任务,但还会执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//此时线程池不接受新的任务,不执行队列中的任务,同时中断正在 执行的任务
private static final int STOP = 1 << COUNT_BITS;
//所有任务都已经终止,同时workerCount为0,过渡到TRYING状态 的线程会运行钩子方法terminated()
private static final int TIDYING = 2 << COUNT_BITS;
//terminated()方法 执行完毕
private static final int TERMINATED = 3 << COUNT_BITS;
在awaitTermination方法 上等待 的线程将会 在 线程的 runState变为TERMINATED时返回。
继续分析execute方法,接下来会有连续的三步:
- 如果正在运行的线程数小于corePoolSize,会启动一个线程来执行该任务,同时addWorker方法会原子的检查runState状态来保证线程 现在 处于 可以 运行的状态,同时修改workerCount数量。
- 如果线程池中活跃线程数大于corePoolSize,且线程池处于RUNNING状态,于是会将任务加入 等待队列。
-
如果任务 不能入队,我们会尝试添加一个新的线程,如果还是失败,我们会根据抛弃策略调用对应拒绝方法。
以上execute方法就包含了线程池执行一个新的任务的全部流程,如下图示:
线程池中各模块的工作示意图如下:
线程池中的线程-Worker原理分析
提交到线程池的任务会被封装成一个Worker,worker封装了一个线程和任务。由于Worker本身继承自AQS,是可以直接加锁的。提交任务的具体逻辑如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//判断线程池状态是否可以提交新的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判断线程池中的workerCount数目是否达到了线程池的边界值
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加workerCount数目
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//workers是一个集合,包含了所有池中的worker线程
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//worker被你加入集合后,线程开始执行任务
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
我们可以发现,在这里当workerCount通过CAS正确加1后,后需要获取一个全局锁mainLock,在加锁期间先对线程池的状态以及线程池内的线程数进行再次检查,正常后会把该新的worker线程加入workers集合,然后线程开始执行该任务。线程是怎么开始执行任务的呢?我们先看一下Worker的构造:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{...}
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//由于Worker本身就是Runnable的,所以创建一个新的线程的时候,就已自身作为参数了,当线程thread调用start启动了线程开始执行时,就会运行传入的Woker的run方法。
}
线程调用start方法启动的时候就是Worker的run方法开始执行。
public void run() {
runWorker(this);
}
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
worker重复的从队列里取出任务执行,同时处理以下一些问题
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
只要线程池处于RUNNING状态,就不停的从任务队列取出任务
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and
* clearInterruptsForTaskRun called to ensure that unless pool is
* stopping, this thread does not have its interrupt set.
取出任务后执行任务前,需要对Woker加锁,防止任务执行时发生中断
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
任务执行前会有一个前置的方法,该方法可能会抛出异常从而导致任务还未执行线程就退出
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to
* afterExecute. We separately handle RuntimeException, Error
* (both of which the specs guarantee that we trap) and arbitrary
* Throwables. Because we cannot rethrow Throwables within
* Runnable.run, we wrap them within Errors on the way out (to the
* thread's UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//循环从任务队列里取出任务执行
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//执行task的前置拦截
Throwable thrown = null;
try {
task.run();//任务运行
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//执行task的后置拦截器
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//任务最后的清理工作
}
}
我们先看看是如何从任务队列取出任务的:
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 基于当前线程池的配置线程会在该方法上阻塞或是超时等待任务。
* 如果该worker由于以下几种情况必须退出,该方法会返回null:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 由于调用setMaximumPoolSize使得线程池的worker数量超过了maximumPoolSize
* 2. The pool is stopped.
* 线程池处于STOPPED状态
* 3. The pool is shutdown and the queue is empty.
* 线程池被关闭同时队列为空
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//addWorker方法中已经先增加了workerCount的数目,此时既然该任务不能够执行,则需要通过CAS减小workerCount的数目
return null;
}
boolean timed; // Are workers subject to culling?worker是否要被踢出
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//踢出任务队列首元素返回
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker方法的最后清理操作是这样的:
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
* 对执行完run方法的worker进行清理和记录操作。该方法会从workers线程集合移除
* 当前worker对应的线程。如果Worker在run方法执行期间发生异常导致退出,那么completedAbruptly
* 是会被设置为true,此时我们会添加一个新的null任务。
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
tryTerminate会在符合条件的情况下转换线程池状态至TERMINATED,具体如下分析:
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//线程池处于RUNNING状态或者
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
线程池的销毁-shutDown/shutDownNow
线程池使用完毕,主动关闭线程池。此时我们会调用shutDown()方法。
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
* 进行有序的任务关闭,此时线程池不接受新的任务,但是前面提交的任务还是会继续执行完
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
* 该方法不会等待前面提交的任务完全执行完,如需要可以使用awaitTermination
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//线程池池的全局锁
try {
checkShutdownAccess();//安全验证,确认线程池有权限关闭线程
advanceRunState(SHUTDOWN);//线程池状态转换为SHUTDOWN
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();//通过线程池的状况判断是否转移线程池的状态至TERMINATED
}
shutDownNow方法与shutDown有些不同,shutDown是所谓的‘Elegant’关闭模式,而shutDownNow则比较‘Rude’方式。shutDownNow会立即停止所有正在执行的任务,具体代码如下:
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
* 立即停止所有正在执行的任务,停止等待任务的执行,并返回正在等待执行的任务列表
* 该方法返回前会被从任务列里移除
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
* 该方法只会尽最大努力去停止正在运行的任务-通过Thread.interupt方法取消任务,因此如果任何一个任务无法响应中断就不会执行停止。
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);//线程池状态转换为STOP
interruptWorkers();
tasks = drainQueue();//
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
线程池的饱和策略-RejectedExecutionHandler
前面在介绍ThreadPoolExecutor的主要成员变量时,我们简单介绍了包和策略参数:
private volatile RejectedExecutionHandler handler;//当线程池饱和或者关闭时,会执行该句柄的钩子(hook)
在默认情况下,ThreadPoolExecutor使用抛弃策略。
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
ThreadPoolExecutor为我们提供了四种线程池饱和策略,也即对应的四种静态内部类。这些策略是在线程池与任务队列都满了的情况下,对新提交给线程池的任务执行的操作。也即前面我们分析过的execute方法在所有情况都无效的情况下执行的一步,调用对应饱和策略的钩子:
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
这四种策略如下:
- CallerRunsPolicy: 在线程池没有关闭(调用shut Down)的情况下,直接由调用线程来执行该任务。否则直接就丢弃该任务,什么也不做。
- AbortPolicy:直接抛出异常。
- AbortPolicy:直接丢弃该任务,什么也不做。
- DiscardOldestPolicy: 在线程池没有关闭(调用shutDown)的情况下,丢弃线程池任务队列中等待最久-即队列首部的任务,并尝试直接执行该触发饱和策略的任务。
最后的总结
我们观察前面分析的源码,包括线程池处于生命周期的一些阶段(如线程池提交任务,还是线程池的退出,销毁)都会发现一个问题,这些地方都会用到线程池的全局锁。
private final ReentrantLock mainLock = new ReentrantLock();
全局锁的使用在多线程调用ThreadPoolExecutor的情况下会导致性能问题。但是我们仔细思考一下会发现,向线程池提交任务时获取全局锁是在线程池还未预热完成(即线程池的活跃线程还小于corePoolSize)的情况发生的事情,当线程池的活跃线程超过corePoolSize后,以后在执行execute方法提交新的任务,主要还是执行我们前面分析execute方法时说的第二步,把任务添加到等待队列。所以后面不会出现对全局锁的争抢场景。也就是说,对全局锁的争抢只会出现在线程池预热的初期,但这个预热的过程是和corePoolSize有关的,我们需要关注。
最后,我们对ThreadPoolExecutor进行一下总结:
- 线程池有一个预热阶段,在线程池的活跃线程数未达到corePoolSize时,并发提交任务会出现对线程池全局锁的争抢。
- 线程池中的Worker数超过corePoolSize时,后续提交的任务都会进入任务等待队列。
- corePoolSize个活跃线程被线程池创建后,会循环从任务等待队列获取任务执行。
- 当线程池饱和或者关闭时,会执行对应的饱和策略。ThreadPoolExecutor默认使用使用抛弃策略。
以上就是ThreadPoolExecutor的源码分析,有没认识到的或理解有误的,欢迎指出、讨论。
参考文献
- 《Java并发编程的艺术》
- JDK1.7源码