execute执行流程
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl低3位表示线程池状态,高19位表示当前线程数
int c = ctl.get();
// 如果线程池未达到核心线程数,则创建线程执行
if (workerCountOf(c) < corePoolSize) {
// 创建线程执行任务成功,则返回
if (addWorker(command, true))
return;
// 由于并发未创建成功,获取最新ctl值
c = ctl.get();
}
// 线程池状态是running,且任务入队成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程池不是running,则将任务出队
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 线程池是running状态,但是工作线程为0,则创建空任务线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 入队失败,则进行创建线程执行
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
// 这个for基本逻辑就是累加统计的线程数,用CAS重试实现
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池状态不是running状态
if (rs >= SHUTDOWN &&
// 这里对应前面任务为空的情况,如果状态为shutdown,且队列为空,则返回失败
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果线程数量大于线程数,则返回失败,这里有两种情况,超过核心数,后面还会重试
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果累加当前线程数成功则跳出重试
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新获取,进行重试
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 线程启动成功标志
boolean workerStarted = false;
// 线程添加works成功标志
boolean workerAdded = false;
Worker w = null;
try {
// 创建work线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 线程池状态是running 或 状态为shutdown 且 任务为空,则添加到wokers
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加works成功则启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 任务不为空 或 阻塞队列中有任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 进行中断线程判断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 线程任务前置处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 线程任务后置执行
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
// 当调用shutdown时,getTask阻塞停止调用该方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试中断所有线程任务
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}