1.对线程池的理解
1.1 艰辛摸索
看过许多关于线程池的介绍和讲解,看过方腾飞的 《并发编程的艺术》 也看过很多博客关于线程池的讲解,但是总觉得自己理解的不太好,总觉得哪里缺点。后来自己也花时间查看源码,自己去琢磨,多问为什么,结合一些博客的讲解。现在我想把这段时间的成果记录下来,也是为了加深自己的理解吧。
下面所有的源码都是java8 源码。
1.2 线程池的好处
- 降低性能消耗、提高响应速度:对于应用需要频繁创建线程,而且线程任务都比较简单,比如一些IO任务,线程的生命周期都很短;而线程的创建需要花费一定的CPU时间,所以当任务到来时如果线程已经准备就绪了,而不是重新创建,则会大大提高系统的响应速度。
- 对线程的集中管理监控:将创建的线程规约在线程池里,则可以对线程的数量和运行状态进行管理并进行监控,可对系统的线程资源进行集中管理。
2.线程池内的一些属性
2.1 线程池参数
看下面的这个线程池的构造器,他有许多参数,这些参数都代表什么意思?
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize 核心的线程池数量
- maximumPoolSize 线程池内最大的线程数量
- keepAliveTime 线程存活时间
- unit 时间单位 比如 秒 分钟
- workQueue 存储任务的阻塞队列
ArrayBlockingQueue 有界阻塞队列,此队列会满。
SynchronousQueue 没有容量的阻塞队列,一次添加必须等待一次获取。反之亦然。
LinkedBlockingQueue 无界阻塞队列 这个阻塞队列用于不会满
- threadFactory 自定义线程工厂
1.通过自定义线程池 可以给线程池内,线程都赋予更有意义的线程名。
也可以根据喜好做些别的事情,比如记录一下何时创建线程之类的。
2.如果没有则会采用默认的线程池。Executors.defaultThreadFactory()
- handler 拒绝策略
拒绝策略java 默认提供了四种实现。都是ThreadPoolExecutor的内部类。
1 CallerRunsPolicy :如果当前线程池 处于运行状态 ,直接使用当前线程执行任务,如果是终止状态,则悄悄的直接抛弃。
2. AbortPolicy :对拒绝任务抛弃处理,并且抛出异常。默认采用。
rejectedExecution 内直接抛异常。
3. DiscardPolicy :对拒绝任务直接无声抛弃,没有异常信息。查看源码发现他的rejectedExecution 函数就是一个空实现。
4. DiscardOldestPolicy :对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个任务,然后把拒绝任务加到队列。查看源码
if (!e.isShutdown()) {
e.getQueue().poll();//获取并移除队列head
e.execute(r);//再次execute任务
}
3.Executors 提供的几种线程池
3.1 newFixedThreadPool
- 固定线程数的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
1.核心 线程数 和 最大线程数一致。
2.采用的阻塞队列是 无界阻塞队列,LinkedBlockingQueue。也就是在极端情况下,阻塞队列 会一直增长,直到堆内存溢出,需要谨慎使用该线程池。
3.2 newCachedThreadPool
- 缓冲线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
0.在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
1.初看该构造函数时我有这样的疑惑:核心线程池为0,那按照前面所讲的线程池策略新任务来临时无法进入核心线程池,只能进入 SynchronousQueue中进行等待
,而SynchronousQueue的大小为1,那岂不是第一个任务到达时只能等待在队列中,直到第二个任务到达发现无法进入队列才能创建第一个线程?
2.这个问题的答案在上面讲SynchronousQueue时其实已经给出了,要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。因此即便SynchronousQueue一开始为空且大小为1,第一个任务也无法放入其中,因为没有线程在等待从SynchronousQueue中取走元素。因此第一个任务到达时便会创建一个新线程执行该任务。
3.这个最大线程数是 Integer.MAX 使用中需要注意。
3.3 newWorkStealingPool
jdk 1.8 新加入的。创建一个带并行级别的线程池,并行级别决定了同一时刻最多有多少个线程在执行,如不穿如并行级别参数,将默认为当前系统的CPU个数
3.4 newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。
4. 源码
下面介绍下线程池的源码,由于对写作没有天赋,我就尽我最大努力把源码分析的浅显易懂一些。我介绍下线程池的一些状态属性,然后再从提交一个任务开始跟着源码进行描述一下我对源码的理解。
4.1 线程池状态 和 线程统计
ctl
先看看线程池中最重要的一个属性 ctl,我觉得是Control的简写,ctl控制着整个线程池的运转。ctl是AtomicInteger类型,线程池利用ctl 的高3位作为记录当前线程池的状态。利用低29位记录线程池中线程数,所以线程池中线程的最大容量为 2^29。
默认值是 1110 0000 0000 0000 0000 0000 0000 0000 = -536 870 912 ;COUNT_BITS
COUNT_BITS = Integer.SIZE - 3;
COUNT_BITS 的意思是 一个整型数 有29位用于统计线程池内线程数;RUNNING 运行状态
RUNNING 是线程池的初始状态,是一个int 类型的常量,值为 -1 左移 29 位即为 -536 870 912,线程池 的状态
只有RUNNING 是负数的。SHUTDOWN
RUNNING --shutDowm()--> SHUTDOWN
RUNNING状态调用shutDowm()函数进入SHUTDOWN状态。是一个int类型的常量 值为0 ;
此时线程池不接收新任务,但能处理已添加的任务。STOP
(RUNNING or SHUTDOWN) ---shutdownNow()--> STOP
RUNNING状态或者SHUTSOWN状态调用shutDowmNow()函数进入SROP状态。是一个int类型的常量 值为536 870 912 ;
不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。TIDYING
是一个int类型的常量 值为1 073 741 824 ;
SHUTDOWN -> TIDYING:当线程池内线程数为0并且队列内任务数量为0时
STOP -> TIDYING:当线程池内线程数量为0时TERMINATED
是一个int类型的常量 值为1 610 612 736 ;
线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。CAPACITY (线程的最大容量)
1 转移 29位 ,就是 0010 0000 0000 0000 0000 0000 0000 0000,
然后 减1 ,就变成了0001 1111 1111 1111 1111 1111 1111 1111,刚好就是 低29位 为1 ,就是线程内线程的最大容量 。
4.2 execute 执行任务
在未来的某个时刻执行给定的任务,这个任务会被一个新线程或者一个已经存在的线程执行。
如果任务不能提交执行,那是因为线程池不处于运行状态或者已经到达容量上限。
那么这个任务就会被RejectedExecutionHandler处理。
execute()代码逻辑:
1.当通过excute(Runable) 提交一个任务时,如果当前线程池内线程数量小于corePoolSize 数量,即使线程池内线程处于闲置状态,线程池也会创建一个新的线程。
2.如果线程内线程数大于corePoolSize 数量小于 maximumPoolSize 数量,则会将任务 提交到 workQueue(阻塞队列)。
3.如果workQueued队列已经满了,而当前线程池内线程数量小于maximumPoolSize 则创建一个新的线程执行。
4.如果此时线程池内线程数量 不小于maximumPoolSize 数量,则执行拒绝策略。
/**
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1.如果正在运行的线程数 小于 corePoolSize,创建一个线程执行command.
if (workerCountOf(c) < corePoolSize) {
//创建一个线程执行command
if (addWorker(command, true))
return;
//如果失败 获取ctl recheck runstate
c = ctl.get();
}
//2.如果是运行状态 并且追加command 到队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//重新检查运行状态 不是运行中状态 则 从队列移除 command,让RejectedExecutionHandler 处理command。
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
//如果当前运行中的线程数是0 ,则创建一个新线程。
addWorker(null, false);
}
//3.如果添加任务到队列失败,则新建一个线程执行任务。
else if (!addWorker(command, false))
//如果新建线程失败,让RejectedExecutionHandler 处理command。
reject(command);
}
4.3 addWorker
检查是否可以根据当前线程池状态和绑定条件(核心线程或者最大线程)添加一个新的工作线程。
如果可以的话那么调整运行的线程数量。
并且 如果线程成功创建和运行的话,那么firstTask将是新线程的第一个任务被执行。
如果线程池已经关闭或者正在关闭,函数将返回false.
如果线程创建失败,可能的原因是因为线程工厂返回null,或者异常(最可能的异常就是在执行Thread,start()时产生OutOfMemoryErro )。
接着回滚以上操作。
参数:core:如果为true 增加核心线程,false 增加最大线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry://外层循环 循环检查运行状态
for (;;) {//
int c = ctl.get();//获取 ctl 值
int rs = runStateOf(c);//获取当前线程池状态
// Check if queue empty only if necessary.
//如果有必要的话 检查任务阻塞队列是否为空
//这里第一个条件检查 运行状态 是否为RUNNING 状态。因为只有RUNNING状态不满足 rs>= SHUTDOWN 条件。
//第二个条件 咋一看 不好懂,有点晕。
//仔细分析下,当运行状态为SHUTDOWN 状态时,线程池会执行完阻塞队列内的任务。但是不再接收新任务。
//也就是说 如果 是shutdown 状态,firstTask is null ,阻塞队列里面还有任务时,是允许创建新的新工作线程的。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//内循环 调整工作线程数
for (;;) {
//获取当前工作线程数
int wc = workerCountOf(c);
//如果 大于 最大容量 capacity, 或者 根据绑定条件 大于 核心线程 或者 最大线程数。return false。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas 调整 工作线程数
if (compareAndIncrementWorkerCount(c))
break retry;//如果成功 跳出外层循环
c = ctl.get(); // Re-read ctl
//cas 调整 工作线程数失败
//检查 线程状态 是否改变
//如果没有改变 继续执行 内循环
//如果状态 改变 执行外循环。
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//开始添加 worker
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个 工作线程
//worker 构造器内部 调用 线程工厂 创建一个线程。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//同步锁 阻塞的获取锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取池运行状态
int rs = runStateOf(ctl.get());
//rs 即为当前线程池运行状态,如果状态是RUNNING 或者
//第一个条件 是RUNNING 状态
//第二个条件 是SHUTDOWN状态时 firstTask 必须是 null ,因为 shutdown 状态 不能提交新的任务。但可以创建新工作线程。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//t.isAlive() 如果线程已经start 则返回true ,所以这里是判断 线程t 是不是还可以执行start()。
if (t.isAlive()) // precheck that t is startable
//
throw new IllegalThreadStateException();
//工作线程 添加到 集合workers 中
workers.add(w);
//修改 poolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//释放 同步锁
mainLock.unlock();
}
if (workerAdded) {
//如果添加成功 启动线程
t.start();
workerStarted = true;
}
}
} finally {
//如果线程启动失败 回滚操作
if (! workerStarted)
addWorkerFailed(w);w
//线程安全的 从工作线程集合中移除 如果w存在的话,就CAS 调整 工作线程数。并且尝试停止线程池
}
return workerStarted;
//返回启动成功 与否
}
4.3 addWorkerFailed
执行addWorker添加任务失败之后 调用该方法执行回滚操作。
回滚操作主要做了以下三件事情:
1. 从workers 工作线程集合中移除 worker
2. worker Count 减 1
3. 尝试终止线程池,如果满足终止条件的话 就会终止线程池。
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取主锁
try {
if (w != null)
//从workers 移除指定worker
workers.remove(w);
//workerCount 减 1
decrementWorkerCount();
//尝试终止线程池 终止的条件比较苛刻
tryTerminate();
} finally {
mainLock.unlock();//释放锁
}
}
4.4 runWorker 执行任务
添加worker 成功之后,worker就开始执行runWorker()了。
这个方法是比较重要的方法,是线程池的核心。是worker 线程 运行循环,重复地从队列中获取任务并执行它们,同时处理以下一些问题:
1.从worker 的firstTask 开始执行,随着线程的运行,worker不停的执行getTask() 获取任务,如果拿不到任务,就会根据线程的池配置信息 来决定是否让 worker退出。如果worker执行过程中抛出异常,会导致worker退出,并试图创建一个新的线程替代当前worker。
2.执行任务之前,获取锁确保任务执行不被其他线程干扰。如果不是STOP状态,worker线程需要清除中断标记。
3.处理每个任务之前都要执行钩子函数 beforeExecute(),如果钩子函数抛出异常那么也会出发worker 过早的退出,并尝试创建一个新的worker 替代它。
4.假如执行beforeExecute()顺利,接下来就是 执行任务了。如果执行中出现异常,那么就会捕获这些异常,进行包装成 runtimeException 和 error,Throwable也被包装成 Error的因为Ruunabl.run 不能抛出Throwabl 异常。最后把这些异常 和 worker 发给 afterExecute()处理。
5.任务执行完成后,执行afterExecute(),也可能会产生异常,任何的异常都会导致worker 销毁。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//获取firstTask
w.firstTask = null;//清除 firstTask
w.unlock();
// 允许worker 中断,为什么unlock()就可以允许中断?因为在worker构建实列时。worker的state 属性被设置为-1,在调用池shutDownNow()函数分别对worker进行终止时,会判断worker 的 state >= 0 ,如果不符合 就不会终止。
boolean completedAbruptly = true;//是否突然完成,就是如果执行任务时,意外退出,该标记就不会被修改为false.
try {
1.先执行woker的firstTask 如果firstTask 已经执行,从任务阻塞队列获取任务。
2.直到获取不到任务跳出while循环。
while (task != null || (task = getTask()) != null) {
w.lock();//获取锁
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
1.如果 小于 STOP 状态 ,就进行 中断标记的清除,
2.Thread.interrupted() 会清除当前线程的 中断标记。
3.清除中断标记是为了消除 外部代码 对worker 的中断操作。
4.如果大于等于 STOP状态 就会中断线程。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务前 处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后处理操作
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;//执行任务数++;
w.unlock();//施放锁。
}
}
completedAbruptly = false;//不是意外退出。
} finally {
//处理worker 退出。
processWorkerExit(w, completedAbruptly);
}
}
4.5 getTask() 获取任务
阻塞或者限时的通过getTask() 函数获取任务。不仅仅是获取任务,同时通过是不是返回null控制者worker 线程的退出与否。以下情况会导致函数返回null,导致worker 退出:
1.当前池执行的线程数超过了maximumPoolSize设置。
2.线程池stop状态
3.线程池时shutdown 状态 并且 任务阻塞队列 是空的。
4.当前worker 获取任务时等待超时。具体的需要根据是否允许核心线程退出,是否大于核心线程,设置判断
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
//timedOut 是否已经超时的标记,如果超时获取任务 ,timedOut 设置为 true,就会影响到后面的逻辑判断。
for (;;) {//循环的判断线程池状态
int c = ctl.get();//获取ctl 值
int rs = runStateOf(c);//获取线程池运行状态
// Check if queue empty only if necessary.
1.如果是大于等于 shutdown 状态 ,并且 是大于等于stop状态 那么获取任务失败,线程退出
2.如果是shutdown 状态,并且 任务阻塞队列 workQuenu is empty 那么获取任务失败,线程退出。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();//调整工作线程数减1 ;
return null;//返回 task is null
}
//获取池正在运行工作线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
//是否 执行定时的获取任务标记
// 如果 运行核心线程退出 或者 正在运行线程数大于核心线程 ,执行定时的获取任务。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//判断当前worker 是否满足 退出条件,如果满足就执行cas 调整线程数 减 1 。并返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//三元表达式
//wordQuenu.poll 是等待keepAliveTime 时间 获取任务,如果超时返回null
//workQuenu.take 阻塞的获取任务,直到获取到一个任务为止。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;//如果 获取到的 r is null,那么说明 执行了 poll 获取任务超时。
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
4.6 processWorkerExit 处理worker退出
- 对即将死亡worker 进行清理 和 记账(统计worker执行的任务数)
- 如果worker突然死亡(执行中有异常) 就需要调整运行的线程数
- 将worker 从worker线程集合中移除。(除名)
- 可能会终止线程池
- 在小于STOP状态前提下,如果由于执行 用户任务异常导致线程退出,创建新线程替代
- 在小于STOP状态前提下,不允许核心线程退出,如果运行中的线程小于核心线程,创建新线程替代。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果runWorker中出现异常,那么调整woker 线程数量
decrementWorkerCount();//调整数量,循环cas 保证调整成功
final ReentrantLock mainLock = this.mainLock;//
mainLock.lock();//获取锁
try {
completedTaskCount += w.completedTasks;//给即将死亡的工人记录
workers.remove(w);//花名册除名
} finally {
mainLock.unlock();//释放锁
}
tryTerminate();//尝试终止池
int c = ctl.get();
//寻找新的worker 替代死亡的worker
if (runStateLessThan(c, STOP)) {//如果 池STOP 那么就没必要找新的worker啦
if (!completedAbruptly) {//如果 正常执行 终止
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//判断需要保留的最小worker数量
if (min == 0 && ! workQueue.isEmpty())//如果是 0 并且 任务 队列 不为 0
min = 1;//至少保留一个 把活干完
if (workerCountOf(c) >= min) //如果 当前worker 数量 大于 至少 保留数量
return; // 不需要 寻找替代者
}
addWorker(null, false);// 找新的worker 替代 firstTask is null
}
}
4.7 tryTerminate 尝试终止池
1.如果池是shutdown 并且 阻塞队列 为空 并且 工作线程数 为空 转换为终止状态
2.如果 池是 stop 并且 工作线程为空 转换为 终止状态
3.满足终止条件,但是工作线程不为 0 ,终止一个空闲线程
final void tryTerminate() {
for (;;) {
int c = ctl.get();
1.如果池处于 允许状态,不终止池。
2.如果池状态 >= TIDYING,说明池正在终止,不需要再次终止
3.如果 池状态 == shutdown 并且 workQuenu 不为空,需要执行完剩下的任务,不终止。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果池可以被终止 但是 工作线程 不为 0 ,即使 都池状态 和 队列 都满足终止条件
//但是 工作线程数 还存在 那么就要终止一个空闲线程
//为什么终止一个空闲线程?
//因为此时任务队列为空, 有worker 在getTask 时 从阻塞队列中 take 任务 ,
//take 方法会一直阻塞worker 直到有任务,但是现在池不再接受新的任务,所以worker 会被一直阻塞
//由于take通过LockSupport.park()阻塞线程,而LockSupport.park()能响应中断信号
//所以通过终止 被阻塞的 worker 使其抛出 InterruptedException
//抛出异常之后 worker最终会被处理退出 ,就会调用该函数,又会终止下一个被take阻塞的worker
//这样所有被take 阻塞的线程 会 一个一个的 被终止。
//这一点线程池的设计者 真是很厉害啊 ,想了好久 才发现这一点。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取锁
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {//设置pool 为 TIDYING ,线程数为0
try {
terminated();//转换为终止状态
} finally {
ctl.set(ctlOf(TERMINATED, 0));//设置pool 为shutdown ,线程数为0
termination.signalAll();//唤醒所有等待在终止condition上的线程
}
return;
}
} finally {
mainLock.unlock();//施放锁
}
// else retry on failed CAS cas 修改 ctl 失败 ,重试。
}
}
4.8 interruptIdleWorkers
中断可能等待任务的线程。如果队列任务是空的,有的线程会被一直阻塞,调用该方法会中断那些被一直阻塞的线程:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}