ThreadPoolExecutor
线程池的实现类。
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize——最大核心线程数。核心线程即使空闲也不会被销毁,除非调用allowCoreThreadTimeOut(true)。
- maximumPoolSize——线程池最多可运行的线程数。该值不小于corePoolSize。
- keepAliveTime——非核心线程在空闲状态的存活时间。
- unit——keepAliveTime的时间单位。
- workQueue——存放等待执行的任务的队列,只有通过execute(Runnable)提交的任务才可能进入该队列。
- threadFactory——线程池创建通过该工厂创建线程。
- handler——在线程池满载的情况下,提交的任务交由handler处理。
以上各参数均有响应的setter方法。
ThreadPoolExecutor提供了四种handler:
- CallerRunsPolicy——在线程池未关闭的情况,直接在调用execute(Runnable)方法的线程执行任务。
- AbortPolicy——抛出一个RejectedExecutionException。
- DiscardPolicy——丢弃
- DiscardOldestPolicy——丢弃等待队列中最早提交的那个任务,然后重新提交新的任务。
ThreadPoolExecutor默认使用AbortPolicy。
线程池状态和线程数量的指示字段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是一个32位原子整型,高3位表示线程状态(runState),剩余位代码线程数量(workerCount)。所以线程池可容纳的最大线程数是(2^29)-1。
wokerCount未必和存活的线程数一致,比如使用ThreadFactory创建线程失败时,或者线程终结前仍然进行着某些工作时。
线程池状态有以下几种:
- RUNNING(-1)——可接收新的任务,可执行队列中的任务。
- SHUTDOWN(0)——不再接收新的任务,仍可执行队列中的任务。
- STOP(1)——不再接收新的任务,不再执行队列中的任务,中断正在执行的任务。
- TIDYING(2)——过渡状态。所有任务都已终结,workerCount等于0,terminated()方法执行之前的状态。可以覆写terminated()做一些清理工作。
- TERMINATED(3)——terminated()执行之后的状态。
状态的更改是递增的,可能的状态改变如下:
- RUNNING -> SHUTDOWN——调用了shutdown()方法,可能是通过finalize()隐式调用的。
- (RUNNING or SHUTDOWN) -> STOP——调用了shutdownNow()。
- SHUTDOWN -> TIDYING——线程池和队列都为空。
- STOP -> TIDYING——线程池为空。
- TIDYING -> TERMINATED——terminated()方法执行完毕。
awaitTermination()方法在状态为TERMINATED时返回。
public void execute(Runnable command)
提交任务有几种情况:
- 工作线程数 < 核心线程数——创建新的核心线程,并处理该任务。
- 工作线程数 >= 核心线程数,队列未满——添加到队列。
- 队列满,工作线程数 < 最大线程数——创建非核心线程处理任务。
- 交由handler处理。
public boolean prestartCoreThread()
手动启动一个核心线程,这样新来的任务就可以直接运行,从而减少线程启动的时间。如果所有核心线程都已启动,返回false。
public int prestartAllCoreThreads()
手动启动所有核心线程。返回启动的核心线程数。
public void allowCoreThreadTimeOut(boolean value)
设置空闲时,核心线程是否允许超时关闭。存活时间同非核心线程。
public boolean allowsCoreThreadTimeOut()
查询核心线程在空闲时,是否允许超时关闭。
public BlockingQueue<Runnable> getQueue()
获取等待队列。
public boolean remove(Runnable task)
从等待队列中删除task。
public void purge()
将等待队列中所有已经取消的Future任务立即移除。
public int getActiveCount()
返回正在执行任务的线程数。
public int getLargestPoolSize()
返回线程池中出现过的最大的线程数量,不大于最大线程数。
public long getTaskCount()
返回线程池总共执行过的以及正在执行的任务数,该值是个大概值。
public long getCompletedTaskCount()
返回线程池总共执行过的任务数,该值是个大概值。
以上是线程池的基本理解和使用,不想深究的话,到这里也就可以了。
下面是需要注意的点。
关于新建线程
线程池使用ThreadFactory来创建线程,如果没有显示提供ThreadFactory,则使用默认的Executors#DefaultThreadFactory来创建线程:
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
从以上代码可以得到如下信息:
- 线程名称均是“pool-XX-thread-XX”形式。
- 均是非daemon线程。
- 优先级均是
Thread.NORM_PRIORITY
。
关于队列选择
常用的队列选择策略有以下几种:
- 直接传递——使用SynchronousQueue同步队列实现。该队列不保存任务,可以理解为一个单纯的管道。其“放入”和“取出”都是阻塞的,每个“放入”操作都要等待一个“取出”操作,反之亦然。直接传递,通常要求最大线程数量不受限制,以避免新提交的任务交由handler处理;但这就会导致线程数量不可控。该策略在任务间有内部依赖时,可避免锁住。
- 无限队列——使用一个无容量限制的队列,比如LinkedBlockingQueue(FIFO队列)或者PriorityBlockingQueue(可自定义Comparator)。这样在核心线程都在工作时,新的任务会被添加到队列中,最大线程数不会超过核心线程数,也就是说maximumPoolSize这个参数将不起作用。该策略适合任务间完全独立,相互不影响的情况。所谓无限,并非是真的无限,只是容量非常大而已,可能是Integer.MAX_VALUE,也可能是其他值。
- 有限队列——比如ArrayBlockingQueue。可以避免资源的过度消耗,但控制起来比较复杂,需要权衡队列容量和最大线程数的关系:使用大队列和小线程数量,会减少CPU的使用,以及操作系统资源的占用,但会降低吞吐率;使用小队列和大线程数,可以充分利用CPU资源,但可能会加大调度开支,同样降低吞吐率。
关于RejectedExecutionHandler
上面介绍了四种默认的RejectedExecutionHandler,同样也可以自己定义。需要注意的是,RejectedExecutionHandler的选择需要参照线程数量和队列选择策略。比如无限队列的情况,可以随意设置。
关于覆写ThreadPoolExecutor
ThreadPoolExecutor提供了3个protected的hook方法。beforeExecute(Thread, Runnable)
和afterExecute(Runnable, Throwable)
分别在每个任务的执行前/后调用,terminated()
方法在线程池终结时调用。
以下是一个覆写的例子,添加了pause|resume方法:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) {
super(...);
}
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused)
unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
下面学习线程池的实现原理。
线程在哪里
线程池的线程由内部类Worker持有。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker类继承自AbstractQueuedSynchronizer,该父类这里不深究,只需要知道它是一个锁的实现即可。Worker类还实现了Runnable接口。
由构造方法可知,Worker在创建时会通过ThreadFactory.newThread
方法创建一个线程,并将自身作为Runnable对象传递给该线程。
protected修饰的方法是覆写的父类方法,暂且不用管。lock、tryLock、unlock、isLocked是自身锁的调用方法。
Worker的run方法中调用了runWorker方法。如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//获取Worker类的thread
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//getTask为阻塞方法
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||//当前线程池状态至少为STOP,不考虑offset,其值为1。则满足的状态为STOP、TIDYING、TERMINATED。
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//hook方法,可重写
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//hook方法,可重写
}
} finally {
task = null;
w.completedTasks++;//已完成的任务数加1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//Worker退出时的清理操作
}
}
由源码可知,该方法不断从队列中取出任务,交由该Worker所在的线程执行,是执行任务的最终场所。
顺腾摸瓜,这里涉及到getTask
和processWorkerExit
两个方法。
先看processWorkerExit:
/**
* @param completedAbruptly if the worker died due to user exception
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();//由异常引起的,需要ctl变量中存储的工作线程数量减1。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);//workers是一个Set,存储了所有活动的Worker。
} finally {
mainLock.unlock();
}
tryTerminate();//该方法留意一下,下面会介绍
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//线程池的状态低于STOP,包括RUNNING、SHUTDOWN
if (!completedAbruptly) {//非异常引起的死亡,比如空闲状态超过了存活时间。
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);//重新添加一个Worker。
}
}
该方法为死掉的Worker做一些清理工作。首先将死亡的Worker已经完成的任务数添加到线程池已完成的任务数中,紧接着从线程池的Worker集合中删除该Worker,然后调用tryTerminate,最后根据线程池的状态等,判断是否需要重新添加Worker到Worker集合中。
getTask方法从队列中取出一个任务并返回。在Worker由于一些原因退出时,返回null。这些原因包括:
- 线程数量超过maximumPoolSize(比如通过setMaximumPoolSize修改了最大线程数量)。
- 线程池被stop。
- 线程池被关闭,并且队列为空。
- 等待取出任务超时,而超时的Worker需要终结。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);//线程池状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//对应情况2、3
//该条件等价于if (rs >= STOP || (rs >= SHUTDOWN && workQueue.isEmpty()))
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);//当前线程数量
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//当前Worker是否允许超时关闭
//对应情况1、4
if ((wc > maximumPoolSize || (timed && timedOut))//线程数量大于maximumPoolSize或者达到超时条件
&& (wc > 1 || workQueue.isEmpty())) {//并且此时队列为空,或者线程数大于1(在队列非空时,至少需要保留一个Worker来执行任务)
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://允许超时关闭则调用poll超时阻塞方法
workQueue.take();//否则调用take,一直阻塞下去,直到新任务到来
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在processWorkerExit方法中,我们接触到了addWorker方法。
该方法根据线程池的状态的最大线程数量,决定是否向线程池添加新的Worker。遇到以下条件之一时,添加失败,返回false:
- 线程池已经stop或者达到shutdown的条件。
- 当前线程数达到上限。
- ThreadFactory创建线程失败。
Worker创建失败时,会回滚一些数据。
/**
* @param firstTask 新Worker需要执行的第一个任务,可为null
* @param core 是否是核心线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry://添加一个标签
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))//对应情况1
//等价于if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty()))
//之所以有firstTask != null是因为SHUTDOWN后不再接收新任务。
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||//CAPACITY为线程池所能容纳的最大线程数量2^29-1
wc >= (core ? corePoolSize : maximumPoolSize))//对应情况2
return false;
if (compareAndIncrementWorkerCount(c))//ctl字段中的Worker数量加1,此时Worker还没有被实际创建,
//也证实了讲ctl字段时提到的“wokerCount未必和存活的线程数一致”
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//线程池状态发生变化时,重新执行前面的逻辑
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {//线程创建成功
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {//SHUTDOWN后不再接收新任务
if (t.isAlive()) // precheck that t is startable
//t.isAlive()返回true,则表明t.start已被调用过,而正常来说,此时还未调用t.start
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)//largestPoolSize是线程池中出现过的最大线程的数量
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//执行数据回滚
}
return workerStarted;
}
接下来看addWorkerFailed方法。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();//讲ctl字段的workerCount减1
tryTerminate();//查看是否需要终结线程池
} finally {
mainLock.unlock();
}
}
processWorkerExit和addWorkerFailed都涉及到了tryTerminate方法。
仅tryTerminate方法会将线程池状态置为TIDYING和TERMINATED,且需要满足以下条件之一:
- 当前状态为SHUTDOWN,并且线程池和队列都为空。
- 当前状态为STOP,并且队列为空。
在执行了可能导致线程池TERMINATED的操作后,必须调用该方法,比如减少了worker的数量,或者shutdown之后从队列中移除了任务。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||//线程池正在运行中
runStateAtLeast(c, TIDYING) ||//线程池正在terminate
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))//或者不满足条件1
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);//workerCount不为0时,中断一个空闲的worker,将中断信号传递下去(如何传递请往下看)
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();//hook方法,可重写
} finally {
ctl.set(ctlOf(TERMINATED, 0));//此处可以看出TIDYING只是一个过渡态。
termination.signalAll();//用于唤醒awaitTermination阻塞方法
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
至此我们就见到了可重写的3个hook方法:beforeExecute
、afterExecute
、terminate
。
由termination.signalAll()
这一语句,简单看一下awaitTermination:
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
while (!runStateAtLeast(ctl.get(), TERMINATED)) {
if (nanos <= 0L)
return false;
nanos = termination.awaitNanos(nanos);//状态不是TERMINATED,则阻塞下去,等待termination.signalAll()唤醒
}
return true;
} finally {
mainLock.unlock();
}
}
tryTerminate方法中调用了interruptIdleWorkers,仅此处调用传递的onlyOne参数为true:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
前面说了,调用interruptIdleWorkers方法是为了将终结信号传递下去,那究竟是如何传递的呢?
- interruptIdleWorkers会中断一个Worker。
- Worker中断,则runWorker方法就会调用finally块中的processWorkerExit方法,参数completedAbruptly为true。
- processWorkerExit方法中会再次调用tryTerminate方法,从而完成终结信号的传递。
至此我们就学习了线程池的实现原理。