成员变量
BlockingQueue 任务阻塞队列
corePoolSize 核心线程数
maximumPoolSize 最大线程数
allowCoreThreadTimeOut 是否允许核心线程超时退出
keepAliveTime 非核心线程超时时间
ThreadFactory 线程池工厂,线程从该工厂中new
RejectedExecutionHandler 拒绝策略
AbortPolicy 直接抛出异常
DiscardPolicy 直接抛弃
DiscardOldestPolicy 抛弃最老的任务
CallerRunsPolicy 调用线程池线程去执行。
自定义 实现RejectedExecutionHandler,并自己定义策略模式
其中很重要的一个变量ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
是原子整型变量,主要存线程池状态和工作线程数
高三位存储线程池状态,低29位存储工作线程数
private static final int RUNNING = -1 << COUNT_BITS; 运行态
private static final int SHUTDOWN = 0 << COUNT_BITS; 关闭
private static final int STOP = 1 << COUNT_BITS; 已停止
private static final int TIDYING = 2 << COUNT_BITS; 整理
private static final int TERMINATED = 3 << COUNT_BITS; 结束
线程池状态转换
初始是running状态,能够接受新提交的任务,同时也能够处理阻塞队列中的任务
调用shutdown方法,转换成shutdown状态,不接收新任务,只能够继续处理阻塞队列中的任务
调用shutdownnow方法,转换成stop状态,不接收新任务,同时也不能够继续处理新任务,中断线程。
调用shutdown 或者shutdownnow方法,线程池从running状态转换成shutdown状态或者stop状态后,会调用tryTerminate方法,该方法中,等所有线程执行完之后,线程池状态会转换成TIDYING 状态,随后会调用terminated方法,完成后
最后finally方法中线程池状态转换成TERMINATED 状态。
附上 tryTerminate 代码
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 等到线程池线程个数为0时转换成TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//最后转换成TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
方法解析
execute
线程池入口在execute函数 主要流程如下:
1、新进来一个runnable 检查工作线程是否已经超过核心线程,如果没有,启动核心线程去执行addwokder方法
2、如果已经超过核心线程个数,runnable放到阻塞队列中去
3、如果阻塞队里已满,会启动非核心线程执行addwoker
4、如果启动非核心线程去执行失败,执行拒绝策略
核心代码如下
int c = ctl.get();
// 步骤1
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 步骤二
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 步骤三
else if (!addWorker(command, false))
// 步骤四
reject(command);
addWorker
该函数主要功能是封装runnable为Woker,然后去执行。
先了解下woker,是ThreadPoolExecutor的内部类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
继承了aqs和Runnable
主要成员变量
/** Thread this worker is running in. Null if factory fails. /
final Thread thread; 工作线程
/* Initial task to run. Possibly null. */
Runnable firstTask; 封装任务
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//这里主要是一些线程池状态校验,和线程数校验
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取工作线程个数,校验是否超过核心线程或者最大线程
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//封装woker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 这里查看线程池状态,如果是running状态可以添加worker,或者是SHUTDOWN 状态,不是新增加的任务(shuntdown状态不接收新任务,只接受阻塞队列任务)
//这种情况会在processWorkerExit方法中可见,具体看该方法解析
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动woker线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 如果失败从workers中移除
addWorkerFailed(w);
}
return workerStarted;
}
runWorker
woker继承runnable接口,run方法的实现是该函数,即线程执行执行该方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
// 允许中断
w.unlock(); // allow interrupts
// 是否因为异常退出循环
boolean completedAbruptly = true;
try {
// 如果task为空,则通过getTask从阻塞队列中来获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果是STOP状态,中断线程,不在继续执行。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//空函数,可被重写
beforeExecute(wt, task);
Throwable thrown = null;
try {
//任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//空函数,可被重写
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
总结一下runWorker方法的执行过程:
while循环不断地通过getTask()方法获取任务;
getTask()方法从阻塞队列中取任务;
如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
调用task.run()执行任务;
如果task为null则跳出循环,执行processWorkerExit()方法;
runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。
getTask
该方法主要是从阻塞队列中来获取任务
private Runnable getTask() {
// timeOut变量的值表示上次从阻塞队列中取任务时是否超时
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
* 1. rs >= STOP,线程池是否正在stop;
* 2. 阻塞队列是否为空。
* 如果以上条件满足,则将workerCount减1并返回null。
* 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
// 对于超过核心线程数量的这些线程,需要进行超时控制 ,即非核心线程有超时限制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
* timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
* 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
* 如果减1失败,则返回重试。
* 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null,针对非核心线程。
* 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,说明已经超时,timedOut设置为true
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
timedOut = false;
}
}
}
如果返回null runwoker方法会跳出循环 执行processWorkerExit方法
processWorkerExit
该方法是线程退出,修改线程池监控信息,包括增加完成任务数,减少工作线程数
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// completedAbruptly线程执行时是否发生异常。
// 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
// 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计完成的任务数
completedTaskCount += w.completedTasks;
// 从workers中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
/*
* 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
* 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
tryTerminate
该函数是尝试去终止线程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 如果是运行状态,退出返回
* 如果是TIDYING状态,说明已经正在终止,不必继续尝试终止,返回
* 如果是SHUTDOWN状态,但是阻塞队列不为空,说明还有些任务没执行完,返回。
**/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 加锁设置状态TIDYING,
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//结束动作,可重写
terminated();
} finally {
//设置最终状态TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒termination Condition等待队列,相应的等待操作在awaitTermination函数
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdown
该方法主要就是把线程池状态设置成SHUTDOWN,然后尝试终止线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow
该方法主要就是把线程池状态设置成STOP,然后终止运行中线程,返回阻塞队列剩余任务
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
finalize
ThreadPoolExecutor重写了finalize方法,即在垃圾回收的时候会执行,调用shutdown方法,保证线程池垃圾回收时,线程池没有线程了
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}