基础
学习一个类,我们应该先从其字段开始。首先看看ThreadPoolExecutor对应的属性有哪些。
private volatile int corePoolSize; // 核心线程数,线程池在阻塞获取任务时可以保持永久存活的线程的最大值。当线程池内的线程超过此值的线程会通过poll(keepAliveTime)获取任务
private volatile int maximumPoolSize; // 线程池中允许的最大的线程数,这里使用volatile修饰,保证多线程下的可见性
private volatile long keepAliveTime; // Woker从workQueue获取任务的最大等待时间,超过这个时间后,worker会被回收掉(run方法执行完毕,线程不可复生)
private final BlockingQueue<Runnable> workQueue; // 提交的任务的排队队列,这是一个接口,通过不同的策略实现不同的线程池机制
private int largestPoolSize; // 线程池中最大的pool size,只会增加不会减少,其是一个统计信息
private final HashSet<Worker> workers = new HashSet<Worker>(); // 内部运行的Worker存放的地方,通过mainLock保证线程安全
private final ReentrantLock mainLock = new ReentrantLock(); //内部的一个独占锁,主要保证线程池的一些统计信息(最大的线程数、完成的任务数)和worker添加到集合的安全性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //线程安全类型,最高位为符号位,次高3位为状态值,低28位为当前的线程数
private volatile boolean allowCoreThreadTimeOut; // 是否允许核心线程从阻塞队列获取任务时销毁。默认为false
private volatile ThreadFactory threadFactory; // 内部为worker提供任务执行的线程的生成工厂。我们通过自定义的工厂来使得业务日志更为清晰或者执行不同的业务逻辑
private volatile RejectedExecutionHandler handler; // 拒绝策略,默认拒绝策略为抛出异常。线程池的拒绝策略是策略模式在JDK中的一个应用点。可以自定义拒绝策略,在生产者的速度远远大于消费者时将超出的任务持久化到外部存储。
其中corePoolSize、maximumPoolSize、keepAliveTime等变量使用volatile修饰,是因为线程池提供了public的set方法让我们可以对其进行修改,这里需要使用volatile来使得修改对多线程可见。
其他属性的修改在mainLock的控制下进行。
线程池状态
了解线程池必须了解其状态机制。线程池内部使用AtomicInteger类型的clt属性来进行状态控制。其中次高三位分别表示running、shutdown、stop、tidying、teminated这5种状态
常用的方法
- 任务提交
public void execute(Runnable command) {
// NPE检查,线程池不允许提交NULL任务
if (command == null)
throw new NullPointerException();
int c = ctl.get(); // 获取当前的clt,AtomicInteger类型保证线程安全
if (workerCountOf(c) < corePoolSize) { //如果当前运行的线程数小于核心线程数
if (addWorker(command, true)) //如果添加核心线程数成功则方法返回
return;
c = ctl.get();//执行到这里必定是添加核心线程失败,重新读取最新的clt
}
/**
* 这里分析一下添加核心态worker失败的几种场景:
* 1、线程池为shutdown以上的状态
* 2、当前线程池中运行的worker的数量超过其本身最大限制(2^29 -1 )
* 3、当前线程池中运行的worker的数量超过corePoolSize
*/
// 如果线程池处于running状态,则将当前提交的任务提交到内部的阻塞队列进行排队等待worker处理
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/**
* double check是否线程池仍在运行中
* 如果线程池不在running状态则将刚才进行排队的任务移除,并拒绝此次提交的任务
* 如果此时在线程池中运行的worker数量减少到0(corePoolSize为0的线程池在并发的情况下会出现此场景)
* 则添加一个不携带任何任务的非核心态的worker去处理刚才排队成功的任务
*/
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//如果排队失败(有界的阻塞队列)则添加一个非核心态的worker
//添加失败:当前运行的worker数量超过maximumPoolSize或者本身最大的限制;线程池状态在shutdown以上
reject(command);
}
- 新增处理线程(worker)
private boolean addWorker(Runnable firstTask, boolean core) {
//自旋进行线程状态check
retry:
for (;;) {
int c = ctl.get(); //读取最新的clt,其本身具有可见性
int rs = runStateOf(c);
// 检查线程池状态是否在shutdown以上
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
/**
* 自旋进行worker数量自增
* 如果当前新增的是核心态的worker则与corePoolSize进行比较
* 如果当期新增的是非核心态的worker则与maximumPoolSize进行比较
* 不满足数量限制则直接添加失败,进入后续的排队 or 拒绝流程
*/
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/**
* 通过CAS进行worker数量+1。为什么不直接调用AtomicInteger提供的incrementAndGet() 方法?
* 因为我们是需要将worker数量+1,而后者并不能提供单纯的+1功能。将c-> c+1而不是变成c -> c + N
*/
if (compareAndIncrementWorkerCount(c))
break retry; //如果CAS成功则跳出自旋
c = ctl.get(); // 重新读clt,代码执行到这里意味着clt的值必定被其他线程修改,本次读会从主存读取最新的值到工作内存
if (runStateOf(c) != rs)// 如果线程池状态发生变化(只有running状态才接受新任务),则跳到外层循环执行拒绝
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 代码执行到此处,意味着worker的数量成功+1,则可以进行worker的构造过程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// new 一个worker,将本次提交的任务封装到其内部
w = new Worker(firstTask);
final Thread t = w.thread; // worker内部真正用来执行任务的线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
/**
* 进行线程池状态检查,thread状态检查,进行运行的最大线程数(largestPoolSize)统计
* 将worker添加到wokrers容器(HashSet)中
* 修改workerAdded为true
*/
try {
...省略此处代码
} finally {
mainLock.unlock();
}
//在这里workerAdded为false:thread已经调用该start方法;线程池状态为shutdown以上
if (workerAdded) {
// 启动worker内部的线程,其会调用worker内部的run方法
t.start();
// 添加成功
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 执行任务 (worker的工作流程)
什么是worker?
private final class Worker extends AbstractQueuedSynchronizer implements Runnable Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; //外部提交的任务 this.thread = getThreadFactory().newThread(this); // 真实执行任务的线程 }
从这里我们可以看出其实际是一个Runnable,并且是AQS的子类,那么我们可以简单的猜测到其能够进行并发的控制(lock、unlock)
final void runWorker(Worker w) {
//在添加worker的流程中执行thread.start()之后真实执行的方法
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 获取当前worker携带的任务
w.firstTask = null;
/**
* 直接unlock???在unlock之前一定要lock吗?从这里我们可以看出不一定
*/
w.unlock(); // 修改state为0,将占用锁的线程设为null(第一次执行之前没有线程占用)
boolean completedAbruptly = true;
try {
// 自旋。先执行自己携带的任务,然后从阻塞队列中获取一个任务直到无法获取任务
while (task != null || (task = getTask()) != null) {
// 将state修改为1,设置占有锁的线程为自己
w.lock();
/**
* check线程池的状态,如果状态为stop以上(stop以上不执行任务),则中断当前线程
* 如果当前线程已被中断(其他线程并发的调用线程池的shutdown()或shutdownNow()方法),则check线程池状态是否为stop以上
* 最后如果当前线程未被中断则中断当前线程(不可能!笔者还未想到此种场景)
*/
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);// 空方法,留给子类实现
Throwable thrown = null;
try {
task.run(); //执行外部提交的任务,通过try-catch来保证异常不会影响线程池本身的功能
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);// 空方法,留给子类实现
}
} finally {
task = null;
w.completedTasks++; //已完成任务数量统计
w.unlock();
}
}
// 如果执行到这里代表非核心线程在keepAliveTime内无法获取任务而退出
completedAbruptly = false;
} finally {
/**
* 从上面可以看出如果实际业务(外部提交的Runnable)出现异常会导致当前worker终止
* completedAbruptly 此时为true意味着worker是突然完成,不是正常退出
*/
processWorkerExit(w, completedAbruptly);// 执行worker退出收尾工作
}
}
- 获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 自旋获取任务(因为是多线程环境)
for (;;) {
int c = ctl.get();// 读取最新的clt
int rs = runStateOf(c);
/**
* 1、线程池状态为shutdown并且任务队列为空
* 2、线程池状态为stop状态以上
* 这2种情况直接减少worker数量,并且返回null从而保证外部获取任务的worker进行正常退出
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/**
* 1、允许核心线程退出
* 2、当前的线程数量超过核心线程数
* 这时获取任务的机制切换为poll(keepAliveTime)
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 1、线程数大于maximumPoolSize(什么时候会出现这种情况? 当maximumPoolSize初始设置为0或者其他线程通过set方法对其进行修改)
* 2、线程数未超过maximumPoolSize但是timed为true(允许核心线程退出或者线程数量超过核心线程)
* 并且上次获取任务超时(没获取到任务,我们推测本次依旧会超时)
* 3、在满足条件1或者条件2的情况下进行check:运行线程数大于1或者任务队列没有任务
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) // CAS进行worker数量-1,成功则返回null进行worker退出流程,失败则继续自旋
return null;
continue;
}
try {
// 如果允许超时退出,则调用poll(keepAliveTime)获取任务,否则则通过tack()一直阻塞等待直到有任务提交到队列
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;// 当等待超过keepAliveTime时间未获取到任务时,标记为true。在下次自旋时会进入销毁流程
} catch (InterruptedException retry) {
// 什么时候会抛出异常?当调用shutdown或者shutdownNow方法触发worker内的Thread调用interrupt方法时会执行到此处
timedOut = false;
}
}
}
- 关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 利用排它锁进行上锁,保证只有一个线程执行关闭流程
mainLock.lock();
try {
// 安全检查
checkShutdownAccess();
// 内部通过自旋+CAS修改线程池状态为shutdown
advanceRunState(SHUTDOWN);
// 遍历所有的worker,进行线程中断通知
interruptIdleWorkers();
// 钩子函数
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 进行最后的整理工作
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
...和shutdown类似,将状态修改为stop并返回在任务队列排队的任务 ...
return tasks;
}
总结
线程池能为我们减少线程创建的开销,但是相应参数的设置需要不断测试从而到达一个相对最优的配置
- 过大的线程数可能导致CPU切换过于频繁从而导致效率降低
- 过小的线程数可能导致CPU利用率不高
- 有界队列可以防止资源耗尽,但是我们需要考虑在生产速度大于消费速度时提交任务带来的拒绝问题
- 无界队列在消费速度小于生产队列时可能导致频繁的GC从而降低系统响应速度
以上所述都是个人学习源码之中的一点心得体会,如果不实之处,望大家谅解和指正