1 线程池原理
所谓线程池,就是有一个池子,里面存放着已经创建好的线程,当有任务提交个线程池执行时,池子中的某个线程会主动执行该任务.如果池子中的线程不够应付数量众多的任务时,则需要自动扩充新的线程到池子中,但是该数量是有限的;当任务较少时,池子中的线程又会自动回收,释放资源。
一个完整的线程池应该具备如下要素:
- 任务队列:用于缓存提交的任务。
- 线程数量管理功能:一个线程池必须能够很好地管理和控制线程数量,可通过如下三个参数实现:创建线程池时初始地线程数量init;线程池自动填充时最大地线程数量max;在线程池空闲时需要释放线程但是也要维护一定数量地活跃数量或者核心数量core。
- 任务拒绝策略:如果线程数量已达到上线且任务队列已满,则需要有相应地拒绝策略来通知任务提交者。
- 线程工厂:主要用于个性化定制线程,比如将线程设置为守护线程以及设置线程名称等。
- QueueSize:任务队列主要存放提交地Runnable,但是为了防止内存溢出,需要有limit数量对其进行控制。
- KeepAlive时间:该时间主要决定线程各个重要参数自动维护地时间间隔。
2 线程池具体实现
想要实现线程池就需要实现以下几个要素功能:
- 线程池状态
- 任务的执行
- 线程池中的线程初始化
- 任务缓存队列及排队策略
- 任务拒绝策略
- 线程池的关闭
- 线程池容量的动态调整
因为java5引入了concurrent
包,里面就包含一些线程池的实现类,我们就不必再自己手动实现一遍了。
大体类关系:
2.1 线程池状态
在ThreadPoolExecutor中定义了一个ctl变量用于保存线程池状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
不过这个ctl不只是保存线程池状态那么简单。根据注释:
ctl是一个原子整型变量,它是由worker数量(有效的线程数量)和runState(是否处于运行,关闭等状态)两个概念组成的。
为了将两个概念打包成一个整数,作者限制workerCount到2^29-1(约等于500,000,000)而不是2^31-1(约为二十亿)。
COUNT_BITS
表示移位个数,Integer.SIZE的值为32,所以29就可以表示为Integer.SIZE-3
CAPACITY
表示容量,那么2^29-1就可以表示为(c << COUNT_BITS)-1
相当于就是用整型的0到28位表示线程数量,用29到31位表示线程池状态。
运行状态表示:
- RUNNING:
- 可接受新任务且可以处理已添加的任务
- 线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- SHUTDOWN
- 不能再接受新任务,但是可以继续执行已添加的任务
- 调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN
- STOP:
- 不能再接受新任务,也不处理已添加的任务,并且会中断执行中的任务。
- 调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
- TIDYING:
- 当前所有任务已终止,ctr记录的workCount为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中实现是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
- 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING
- TERMINATED
- 线程池彻底终止,就变成TERMINATED状态
- 线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
还有几个对ctl进行计算的方法:
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
- runStateOf:获取运行状态
- workerCountOf:获取活动线程数
- ctlOf:获取运行状态和活动线程数的值
另外还有6个方法与ctr相关:
// 比较大小
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 比较大小
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// 判断state是否在运行状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
2.2 任务的执行
首先要了解下线程池的一些成员:
// 这个队列用于缓存任务,将任务转交给线程。
private final BlockingQueue<Runnable> workQueue;
// 线程池的主要状态锁,对线程池大小,runState都使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();
// 存放线程池所有工作线程的集合,当拥有mainLock时才能访问
private final HashSet<Worker> workers = new HashSet<Worker>();
// 支持等待终止的等待条件
private final Condition termination = mainLock.newCondition();
// 记录有过的最大的线程数
private int largestPoolSize;
// 完成的任务个数,只能由工作线程更新
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝策略执行器:当执行时线程池饱和、shutdown会用到
private volatile RejectedExecutionHandler handler;
// 保活时间:即空闲线程最长的等待时间
private volatile long keepAliveTime;
// 允许核心线程在空闲时用此时间来等待
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数
private volatile int corePoolSize;
// 最大线程数
private volatile int maximumPoolSize;
然后还要了解下worker:因为 Doug Lea 把线程池中的线程包装成了一个个 Worker,翻译成工人,就是线程池中做任务的线程。所以到这里,我们知道任务是 Runnable(内部叫 task 或 command),线程是 Worker。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** 真正执行任务的thread */
final Thread thread;
/** 为什么有firstTask?因为新建线程时可能会有一个现成的task可以执行,它就可以当作该新建线程执行的第一个任务,如果没有,可以后面到任务队列里去取 */
Runnable firstTask;
/** 用于存放此线程完全的任务数,注意了,这里用了 volatile,保证可见性*/
volatile long completedTasks;
// Worker 只有这一个构造方法,传入 firstTask,也可以传 null
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 调用 ThreadFactory 来创建一个新的线程,这里创建的线程到时候用来执行任务
this.thread = getThreadFactory().newThread(this);
}
/** 代理Thread的run方法,这里调用了外部类的 runWorker 方法 */
public void run() {
runWorker(this);
}
...
然后直接看execute()实现:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 如果当前线程数少于核心线程数,那么直接添加一个 worker 来执行任务,
// 创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务(firstTask)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
// 添加任务成功,那么就结束
return;
c = ctl.get();
}
// 如果到这里,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
// 如果线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查,如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果当前线程数为0,则新建worker线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果线程池添加worker失败,一般是线程数达到极限值,这样的话作拒绝处理
else if (!addWorker(command, false))
reject(command);
}
总的逻辑注释也有,主要有3步:
- 1 如果运行的线程数少于核心线程数,就会新建一个线程并且将该任务作为该线程的第一个任务。用addWorker()来添加新线程,它会自动检查runState和workerCount,如果它返回false,可以防止在不应该添加线程时发生错误。
- 2 如果一个task任务被成功加入队列,我们仍然需要再次检查我们是否成功添加work线程或者线程池已经执行了shutDown(),如果线程池已经停止则需要回滚任务队列事务,或者是之前添加work线程失败则重新尝试新添加一个worker线程。
- 3 如果我们不能入队一个任务,则尝试添加一个新线程。如果此操作失败,那么应该是线程池被shutDown或者饱和了,所以应处理为拒绝
addWorker如何实现?
// 检查一个worker在当前线程池状态和给定的边界(核心值或最大值)是否能被添加。如果是这样,worker个数将得到相应地调整,如果有可能,将创建并启动一个新worker,并将运行firstTask作为其第一个任务。
// 如果线程池已停止或者shutdown,本方法会返回false;如果线程创建失败也会返回false,原因要么是线程工厂返回null,要么是其它什么异常,这会方便回滚。
// firstTask 新线程应该首先运行该任务(如果没有该任务,则为null)。当线程数少于corePoolSize线程时(在方法execute()中),使用初始化的第一个任务创建worker(在方法execute()中)来绕过队列。最初,空闲线程通常是通过prestartCoreThread创建的,或者用来替换其他垂死的worker。
// core ture是设置corePoolSize作为边界,否则用maximunPoolSize作为边界
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
首先死循环检查,注意这里有个retry用法:
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果状态是stop或者是处于shutdown,队列为空,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果线程数超过最大极限值或者设定的边界值,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// workCount自增1结束循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 否则,再次循环检查,知道满足上述条件
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
然后是新建一个worker,加入到集合,并且执行worker,返回worker添加结果:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 如果线程池状态是running或者是shutdown,但是当前task不为空,满足条件可以添加并执行worker
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// task对应的Thread处于活跃状态,则抛出状态异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 更新最大线程数
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// worker成功添加,则执行worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;