接口Executor比较简单,只定义了一个执行方法execute(Runnable command),Runnable将在此方法中得到执行。
public interface Executor {
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*该命令可以在新线程、池化线程或调用线程中执行,具体由{@code Executor}实现决定。
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
void execute(Runnable command);
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
public abstract class AbstractExecutorService implements ExecutorService {
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*为给定的runnable和默认值返回{@code RunnableFuture}。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
//执行任务ftask ,方法的具体实现在ThreadPoolExecutor类中
return ftask;
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
return ftask;
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try { f.get(); }
catch (CancellationException ignore) {}
catch (ExecutionException ignore) {}
return futures;
} catch (Throwable t) {
throw t;
private static <T> void cancelAll(ArrayList<Future<T>> futures) {
cancelAll(futures, 0);
/** Cancels all futures with index at least j. */
private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
for (int size = futures.size(); j < size; j++)
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
public boolean isCancelled() {
return state >= CANCELLED;
public boolean isDone() {
return state != NEW;
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
} finally {
return true;
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
WaitNode next = q.next;
if (next == null)
q.next = null; // unlink to help gc
q = next;
callable = null; // to reduce footprint
* @throws CancellationException {@inheritDoc}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
else if (Thread.interrupted()) {
throw new InterruptedException();
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
else if (!queued)
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
return state;
parkNanos = nanos - elapsed;
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
if (ran)
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
* 用于保存任务并将任务交给工作线程的队列。
private final BlockingQueue<Runnable> workQueue;
* Set containing all worker threads in pool. Accessed only when holding mainLock.
* 包含池中的所有工作线程的集合。只有在持有主锁时才能访问。
private final HashSet<Worker> workers = new HashSet<Worker>();
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
* @param corePoolSize 核心线程数,长期保留在池中的线程数(即使它们是空闲的)
* 除非设置了{@code allowCoreThreadTimeOut},
* @param maximumPoolSize 池中允许的最大线程数
* @param keepAliveTime 当线程的数量大于核心线程数时,多余的空闲线程在终止之前等待新任务的最大时间。
* @param unit 参数{@code keepAliveTime}的时间单位
* @param workQueue 用于在执行任务之前保存任务的队列。
* 这个队列将只包含由{@code execute}方法提交的{@code Runnable}任务。
* @param threadFactory 创建新线程时使用的工厂
* @param handler 当达到了线程边界和队列容量后导致执行被阻塞时,需要使用的handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
* Proceed in 3 steps:
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
1. 如果运行的线程小于corePoolSize,则尝试使用给定的命令作为第一个任务来启动新线程。
2. 如果一个任务可以成功地排队,那么我们仍然需要再次检查是否应该添加一个线程(因为现有的线程在最后一次检查后死亡),
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//运行的线程数小于核心线程数
if (addWorker(command, true))//任务添加到执行队列中
c = ctl.get();
if (isRunning(c) && workQueue.offer(command)) {//线程池正在运行状态且任务插入缓存队列成功
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//线程池不在运行状态,将任务从缓存队列中移除
else if (workerCountOf(recheck) == 0)//线程池正在的线程数量为0
addWorker(null, false);//添加空任务
else if (!addWorker(command, false))//任务插入缓存队列失败,直接创建新任务线程执行任务
private boolean addWorker(Runnable firstTask, boolean core) {
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.仅在必要时检查队列是否为空。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
try {
// Recheck while holding lock.锁上后再检查。
// Back out on ThreadFactory failure or if shut down before lock acquired.
int rs = runStateOf(ctl.get())
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable.预先检查t是否可以启动
throw new IllegalThreadStateException();
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
} finally {
if (workerAdded) {
workerStarted = true;
} finally {
if (! workerStarted)
return workerStarted;
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
try {
if (w != null)
} finally {
* Attempts to CAS-increment the workerCount field of ctl.
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
/** Delegates main run loop to outer runWorker */
public void run() {
// Lock methods
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
return true;
return false;
protected boolean tryRelease(int unused) {
return true;
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
} catch (SecurityException ignore) {
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts.允许打断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
} finally {
task = null;
//当 getTask()返回null时,说明此线程可以回收了
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
final ReentrantLock mainLock = this.mainLock;
try {
completedTaskCount += w.completedTasks;
} finally {
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
addWorker(null, false);
* 执行阻塞或定时等待任务,取决于当前的配置设置。如果在下面的情况下,这个worker必须退出,并返回null:
* 1。超过maximumPoolSize(调用setMaximumPoolSize设置)数量的worker。
* 2。线程池停了。
* 3。线程池关闭,队列为空。
* 4。这个worker超时等待一个任务,超时的worker在超时等待之前和之后都可能终止
* (即{@code allowCoreThreadTimeOut || workerCount > corePoolSize}),
* 如果队列不是空的,那么这个worker不是池中的最后一个线程。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.仅在必要时检查队列是否为空。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
return null;
int wc = workerCountOf(c);
// Are workers subject to culling? workers 会被回收淘汰吗?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
从实现在AbstractExecutorService 类中的submit(Runnable task)方法到实现在ThreadPoolExecutor类中的execute(Runnable command)方法,然后是ThreadPoolExecutor类中的addWorker(Runnable firstTask, boolean core)方法,最后到ThreadPoolExecutor类中runWorker(Worker w)方法,任务从提交到运行的所有逻辑都包含在其中。下面我们总结一下各个方法各自做了什么,最终达到理解线程池整个架构实现:
1、AbstractExecutorService 类中的submit(Runnable task)方法:
此方法将task封装成FutureTask类,然后执行execute(Runnable command)方法并返回,此方法比较简单没有什么复杂的东西。需要注意的是FutureTask类对Runnable 的二次封装主要是添加了任务的各种状态(包括新添加、执行中、取消、执行完成等状态),方便管理。
2、ThreadPoolExecutor类中的execute(Runnable command)方法:
3、ThreadPoolExecutor类中的addWorker(Runnable firstTask, boolean core)方法:
这个方法的主要任务是将Runnable 任务封装到Worker(包含了线程和任务)中,并触发了线程的运行。
4、ThreadPoolExecutor类中runWorker(Worker w)方法:
步骤3中Worker执行后最终都会来到runWorker方法,这个方法的主要作用是拿到Worker 中的线程,然后用这个线程来执行Worker以及缓存队列workQueue中的任务。线程的复用此方法中得到体现。
另外从这个方法的实现中我们还可以知道:任务线程Worker 分两种,一种是可回收的,另一种是不可回收的,这两者的主要区分原则就是当前工作线程数是否达到默认设置的核心线程数。所以当没那么多任务的时候闲置线程就可以被回收了。