1.线程池概览
线程池主要用于线程资源的管理,防止频繁的创建以及销毁线程,提升资源的利用率。JDK中的线程池实现,本质上来说就是基于生产者-消费者模型来实现的,如图所示:
向线程池中提交待执行任务,首先进入阻塞队列中排队等待,然后统一由消费者worker执行(这里的说法不是太严谨,如果worker没有超过核心线程数的话,会被直接创建出来的worker执行,具体后面会有分析),本文基于JDK8讲解ThreadPoolExecutor的原理。
2.一些基本变量
- workerCount:indicating the effective number of threads(线程池中的线程数量)
- runState:indicating whether running, shutting down etc(线程池状态)
- ctl: the combination of workerCount and runState(In order to pack them into one int, we limit workerCount to (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 billion) otherwise representable.)(该变量为int类型,前3位保存线程池状态,剩下29位保存线程数量,线程池中对该变量的更新都是使用CAS操作来保证线程安全)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//计算ctl值
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池一共有五种状态
- RUNNING: Accept new tasks and process queued tasks(接收新的任务并且处理阻塞队列中的任务)
- SHUTDOWN: Don't accept new tasks, but process queued tasks(不再接收新的任务,但是处理阻塞队列中的任务)
- STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks(新的任务以及阻塞队列中的任务都不再处理,中断正在被处理的任务)
- TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING,will run the terminated() hook method(所有的任务都被终止,线程数量为0,线程池状态改变,执行terminated()钩子函数,terminated函数是线程提供出去的扩展点,使用者可以重写该函数,在该阶段执行自己的逻辑)
- TERMINATED: terminated() has completed(terminated()执行完毕)
整个状态的流转注释里面也写得很清楚
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
流程图如下:
shutdown
方法执行时比较优雅,会允许线程池把阻塞队列中的任务执行完成再进行下一步,shutdownNow
方法就比较暴力,直接会丢弃掉阻塞队列中未执行的任务
3.创建线程池任务流程
整体流程如下:
线程池处理任务的主要流程分为4步骤,如上图中step所示,主要逻辑为
- 提交任务,判断线程数决定是否创建worker对象
- 创建worker对象(worker对象内部保存线程池中的线程)
- 启动worker对象中的线程执行
- 直接执行提交任务或者从阻塞队列中获取待执行的任务并执行
上述步骤中每次在创建线程对象时,都会判断当前线程池中的线程数是否符合设置并做相应的调整,具体细节会在下文讨论;针对于获取任务的途径,依据于当前线程数量与核心线程数量的值来决定是直接让创建出的worker对象来执行或是先放入阻塞队列中,让运行中的worker从队列中获取任务来执行,也会在后文中详细讨论
3.1 Step1 提交任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取表示线程数量以及线程池状态的变量
int c = ctl.get();
//线程数量小于核心线程数,直接将提交的任务传入addWorker,
//创建出worker对象后直接执行该任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程数量大于核心线程数,所有提交任务直接放入到阻塞队列中,
//由运行中的worker从队首获取任务来执行
if (isRunning(c) && workQueue.offer(command)) {
//double check,再次获取线程池状态以及线程数,
//以防中途线程池状态改变或者线程数量减少为0
int recheck = ctl.get();
//线程池非running状态,则将该任务从阻塞队列中移除并且调用tryTerminate()方法
if (! isRunning(recheck) && remove(command))
reject(command);
//线程数为0时,创建新的worker对象从阻塞队列中获取任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//1.阻塞队列已满,丢弃任务,调用reject()方法
//2.线程池为非running状态,丢弃任务,调用reject()方法
else if (!addWorker(command, false))
reject(command);
}
3.2 Step2 创建worker
对象
addWorker
主要负责创建worker对象,每一个worker对象中有拥有一个线程,创建worker的本质就是创建线程池中的执行任务的线程,同时worker继承AbstractQueuedSynchronizer,自身携带锁的机制,可以控制并发,比如在后面提到的runWorker调用中,执行任务前都会先获取自身的锁,在调用shutdown方法后,中断线程的调用中,也会先获取锁。addWorker
整体流程如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取当前线程池状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
//SHUTDOWN状态时,允许线程池把阻塞队列中的任务执行完,
//如果workQueue还有任务,是允许增加线程数量的,
//firstTask == null代表不是外部新提交的任务
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程数量
int wc = workerCountOf(c);
//是否大于能存储线程数量的最大值
if (wc >= CAPACITY ||
//根据core的值选择是对核心线程数还是最大线程数比较
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通过CAS操作增加线程池中记录的线程数量
if (compareAndIncrementWorkerCount(c))
//更新线程池线程数成功,继续向下执行
break retry;
//更新失败说明线程池数量发生改变,重新获取线程池线程数量以及状态
c = ctl.get(); // Re-read ctl
//如果状态发生改变,则回到retry位置,重新执行,判断状态等操作
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//(状态没有发生改变,继续重新计算线程数,通过CAS操作更新线程数)
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/**创建worker对象,worker对象在创建的时候会调用threadFactory创建一个新的线程,
新的线程会把worker对象传入构造函数
小于核心线程数的时候,firstTask不为null,提交的任务会被该线程直接执行
**/
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
//线程池为shutdown状态并且没有向线程池中提交任务的情况
(rs == SHUTDOWN && firstTask == null)) {
//检查线程是否被提前启动
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//把worker添加到workers集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动该worker中的线程
t.start();
workerStarted = true;
}
}
} finally {
//如果线程没有正常启动
if (! workerStarted)
//1.如果workers集合中存在worker,则移除 2.减少线程数 3.调用tryTerminate()方法
addWorkerFailed(w);
}
return workerStarted;
}
3.3 Step3 调用runWorker
方法
runWorker
方法是由每个worker
对象中的线程来执行,具体执行的任务放在worker
对象中的firstTask中,如果firstTask为null,则会调用getTask
方法从阻塞队列中获取任务,如下图流程所示:
简单来说,
runWorker
就是线程执行任务的地方,但是实际runWorker
的设计是有许多地方的考量,包括:
- 线程的回收:线程池状态或者参数的变化时,已经启动的线程如何被回收,针对于线程池状态的改变,该方法中会获取一次线程池的状态,如果是STOP,TIDYING,TERMINATED状态,就会调用
interrupt
方法释放中断信号;线程池参数的变化是依赖getTask
来实现的,只要getTask
返回为null
,线程就会跳出while循环,执行结束。例如:默认allowCoreThreadTimeOut
为false,阻塞队列为空,线程数量大于设置的核心线程数之后,在特定时间内被回收的逻辑,或者线程池在运行中,动态修改核心线程数之后,线程的是否会回收的逻辑均在getTask
方法中,具体会在后文详细分析getTask
方法。 - 扩展性:线程执行任务的上下文中穿插hook函数用以实现扩展逻辑,包括
beforeExecute
方法用以实现执行任务前的逻辑以及afterExecute
用以实现执行任务后的逻辑,使用者可以针对这两个函数自行扩展 - 异常情况的处理:异常发生的地方主要有,
beforeExecute
方法产生的异常,线程执行任务中产生的异常(最常见),afterExecute
方法产生的异常,当产生异常的时,completedAbruptly
会一直是true
,同时异常会被抛出到开发者的代码层面,在后续执行processWorkerExit
的方法中调用addWorker
方法,产生新的线程来弥补因为异常终止的线程。
runWorker
详细流程如下:
源码参照:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//第一次被创建时如果自带任务则执行该任务,除此之外则从阻塞队列中获取
while (task != null || (task = getTask()) != null) {
//获取自身的锁,线程池调用shutdown方法,终止线程时也会抢夺该锁
w.lock();
//再次判断线程状态,如果大于STOP状态,则释放中断信号
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行提交任务前的hook函数,留给用户自己实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正的执行提交给线程池的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行提交任务后的hook函数,留给用户自己实现
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//发生异常以及获取task=null时的兜底方法
processWorkerExit(w, completedAbruptly);
}
}
processWorkerExit
作为runWorker
方法中的重要一环,主要承担(cleanup)清理和(bookkeeping)簿记dying worker
的作用,dying worker
其实就是指因为task==null或者执行过程中因为异常走到finally代码块中的线程。执行processWorkerExit
时,线程池的参数设定以及状态相对于runWorker
方法中的代码块,也有可能是已经发生变化了的,processWorkerExit
也会有相应的判断,线程池的设计目的本来就是动态维护一定的线程数量,因此但凡涉及到添加线程的操作,都会进行状态以及参数的判断。
processWorkerExit
参照的源码如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果是发生异常时进入到该方法中,先将线程计数减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//从workers集合中移除该worker
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
//再次对线程池状态进行判断
if (runStateLessThan(c, STOP)) {
//task==null时进入该方法,如果设置核心线程始终存活,当线程池数大于设置的核心
//线程数时,则不需要补充线程;如果设置核心线程有存活时间并且当前阻塞队列中有
//任务,当线程数大于1时则不需要补充线程,同样当阻塞队列中没有任务时,
//也不需要补充
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//异常退出时,直接调用addWorker向线程池补充新的线程,无论此时线程池配置
//是否发生变化,都会在addWorker校验
addWorker(null, false);
}
}
3.4 Step4 调用getTask
方法
getTask
方法主要用于根据当前线程池配置(线程池中的配置是能在启动后动态设置的)获取任务,该过程主要包括获取任务的过程中被阻塞以及在特定时间内获取任务,如果无法及时获取到任务则会直接返回null,getTask返回null后,在上层的runWorker中就会跳出while
循环,继续往下执行直到被回收。当无法获取任务即return null主要包括以下情况:
- 最大线程数减少,线程在运行过程中调用
setMaximumPoolSize
可以重新设置核心线程数 - 线程池处于STOP状态
- 线程池处于SHUTDOWN并且阻塞队列中任务为0
- 从阻塞队列中获取任务超时并且超时的worker需要被终止(当设置allowCoreThreadTimeOut为true允许核心线程数拥有存活时间限制或者当前线程池中的线程数大于核心线程数)
主要流程图如下:
参照源码:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//检查线程池状态,线程池处于STOP状态或者线程池处于SHUTDOWN并且阻塞队列中
//任务为0时减少线程数量并return null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//查看是否设置了核心线程数超时或者当前线程线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1.如果在上一次循环中获取任务超时,只要当前线程池中线程数量大于1,
//就会回收worker并且return nll(该逻辑就是当线程池中线程数量大于核心线程数小于最
//大线程数时,空闲状态下的线程会被回收的逻辑,同理当设置核心线程数有存活时间
//时,核心线程也拥有了被回收掉的逻辑)
//2.如果当前线程数大于最大线程数,也会被回收掉
if ((wc > maximumPoolSize || (timed && timedOut))
//兜底策略,不论线程池配置如何设置,只要阻塞队列中有任务,线程池中至少都要保
//留一个线程
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
//设定获取任务的时间,超时则在下一次循环中return null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//无timed逻辑则在此处直接阻塞,直到能获取到任务
workQueue.take();
//成功获取任务则直接返回
if (r != null)
return r;
//获取任务超时后设置超时标志
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3.5 拒绝策略
当线程池中的阻塞队列任务已满,继续提交任务便会调用reject
方法,触发拒绝策略,四种RejectedExecutionHandler
接口的实现如下:
-
AbortPolicy
:直接抛出异常
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
-
CallerRunsPolicy
:当线程池处于运行状态中时,直接在调用方线程中执行提交给线程池的任务
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
-
DiscardOldestPolicy
:当线程池处于运行状态中时,丢弃阻塞队列中队首的任务,然后再尝试向线程池中提交任务
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
-
DiscardPolicy
:针对丢弃的任务不做任何的处理
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
4.线程池的关闭
处于运行中状态的线程池调用shutdown
或许shutdownNow
方法就能终止线程,这两者方法的主要区别在于shutdown
会将线程池的状态置为SHUTDOWN,该方法会允许阻塞队列中的
任务被执行完后才会流转到TIDYING状态,而shutdownNow
方法会将线程池的状态置为STOP,该方法会直接丢弃阻塞队列中任务。
shutdown
方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查调用方是否有权限操作
checkShutdownAccess();
//更新线程池状态
advanceRunState(SHUTDOWN);
//中断空闲状态的线程,调用worker的tryLock方法,如果能获取到说明该线程处于
//空闲状态,没有在执行任务
interruptIdleWorkers();
//为ScheduledThreadPoolExecutor提供的钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//调用最后的terminated方法,线程池状态变为TERMINATED
tryTerminate();
}
shutdownNow
方法
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//设置线程池状态为STOP
advanceRunState(STOP);
//遍历workers集合,中断所有线程
interruptWorkers();
//返回阻塞队列中没有执行完的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//调用最后的terminated方法,线程池状态变为TERMINATED
tryTerminate();
return tasks;
}
5.结尾
线程池的本质就是动态维护一定数量的线程来执行任务,采用生产者-消费者模型。但是整体分析下来发现实现其实并不简单,主要原因有:
- 线程池状态是变化的
- 线程数量是变化的
- 线程池的配置参数也可能在运行中被外部调用方法改变,比如最大线程数,核心线程数等等
难点在于变化,因此不论是增加线程或者是中断线程设置是获取任务,都离不开状态与线程数的判断。