[toc]
概念
线程池是什么
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销,调度线程的开销等等,同时也降低了计算机的整体性能,线程池维护多个线程,等待监督管理者分配并发执行的任务,这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
而本文描述的线程池是JDK中提供的ThreadPoolExecutor类。当然使用线程池可以带来一系列好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,减低线程创建和销毁造成的损耗
- 提高响应速度:任务到达时无需等待线程创建即可立即执行
- 提高线程的可管理性:线程时稀缺资源,如果无限制创建,不仅会销毁系统资源,还会因为线程的不合理分布导致资源调度不平衡,降低系统稳定性,使用线程池可以进行统一的分配,调优和监控
- 提供更多强大的功能:线程池具备可扩展性,允许开发人员向其中增加更多的功能,比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
线程池解决的问题是什么
线程池解决的核心问题就是资源管理问题,在并发环境下,系统不能够稳定在任意时刻中,有多少任务需要执行,有多少资源需要投入,这种不确定性将带来以下若干问题:
- 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大
- 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险
- 系统无法合理管理内部的资源分布,会降低系统的稳定性
为解决资源分配问题,线程池采用了池化思想,池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。
典型的池化策略包括:
- 内存池:预先申请内存,提升申请内存速度,减少内存碎片
- 连接池:预先申请数据库连接,提升申请连接的速度,降低系统开销
- 实例池:循环使用对象,减少资源在初始化和释放时的昂贵损耗
线程池核心设计与实现
总体设计
Java中的线程池核心实现类是ThreadPoolExecutor,本章基于JDK1.8的源码分析Java线程池的核心设计与实现,我们先看一下ThreadPoolExecutor的UML类图,了解一下ThreadPoolExecutor的继承关系。
ThreadPoolExecutor实现的顶层接口是Executor,顶层接口Executor提供了一种思想:将任务提交和任务执行解耦,用户无需关注如何创建线程,如果调度线程来执行任务,用户只需要提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调度和任务执行的部分,ExecutorService接口增加了一些能力:
- 扩充执行任务能力,补充可以为一个或一批异步任务生成Future的方法
- 提供了管控线程池的方法,比如停止线程池的运行,AbstractExecutorService则是上层的抽象类,将执行的任务的流程串联起来,保证下层的实现只需要关注一个执行任务方法即可,最下层实现类ThreadPoolExecutor实现最复杂的运行部分,ThreadPoolExecutor将会一方面维护自身的生命周期,另一方面同时管理线程和任务,是两者良好的结合从而执行并行任务。
ThreadPoolExecutor是如何运行,如何同时维护线程和执行任务呢?其运行机制如下图所示:
线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。线程池的运行主要分为两部分:任务管理、线程管理。任务管理部分充当了生产者的角色,当任务提交后,线程池会判断该任务后续流转:
- 直接申请线程执行该任务
- 缓冲到队列中等待线程执行
- 拒绝该任务。
线程管理部分是消费者,他们被统一维护在线程池内,根据任务请求进行维护线程的分配,当线程执行任务后则会继续获取新的任务去执行,最终当线程获取不到任务时候,线程就会被回收(核心线程除外)。
接下来我们按照以下三个部分去详细讲解线程池运行机制:
- 线程池如何维护自身状态
- 线程池如何管理任务
- 线程池如何管理线程
生命周期的管理
线程池运行的状态,并不是用户显式的设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量(workerCounnt)。具体实现中,线程池将运行状态(runState)、线程数量(workerCount)两个关键参数的维护放在了一起,如下代码所示:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,他同时包含两部分的信息:线程池的运行状态(runState)和线程池内有效线程的数量(workerCount),高三位保存runState,低29位保存workerCount,两个变量之间互相不干扰。用一个变量去存储两个值,可避免在做相关决策时,出现不一致的情况,不必为了维护两者一致,而占用锁资源,通过阅读线程池源码可以发现,经常出现同时判断线程池运行状态和线程数量的情况。
关于内部封装的获取生命周期状态,获取线程池线程数量的计算方法如下代码所示:
private static int runStateOf(int c) { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl
ThreadPoolExecutor的运行状态有五种分别为:
运行状态 | 状态描述 |
---|---|
RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中的任务 |
SHUTDOWN | 关闭状态,不能接受新提交的任务,但可以继续处理阻塞队列中已保存的任务 |
STOP | 不能接受新的任务,也不处理队列中任务,会中断正在处理任务的线程 |
TIDYING | 所有任务都已终止了,workerCount(有效线程)为0 |
TERMINATED | 在terminated()方法执行完成后进入该状态 |
其生命周期转换如下所示:
任务执行机制
任务调度
任务调度是线程池的主要入口,当用户提交一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解线程池的核心运行机制。
首先,所以任务的调度都是由execute方法(即使调用的是submit,最终也是通过execute执行)完成的,这部分完成的工作是:检查现在线程池中的运行状态、运行线程数,运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝任务,其执行过程如下:
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务
- 如果workerCount<corePoolSize,则创建并启动一个线程来执行新提交的任务
- 如果workerCount>=corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中
- 如果workerCount>=corePoolSize&&workerCount<maxumumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务
- 如果workerCount>=maximumPoolSize,并且线程池内的阻塞队列已经满,则根据拒绝策略处理该任务,默认的处理方式是直接抛异常。
流程图如下所示:
任务缓冲
任务缓冲模块是线程池能够管理任务的核心部分,线程池的本质是对任务和线程的管理,而做到这一点的关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者和消费者模式,通过一个阻塞队列来实现的。阻塞队列中缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockintQueue)是一个支持两个附加操作的队列,这两个附加操作是:在队列为空时,获取元素的线程会等待队列变为非空,当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景。生产者是往队列里添加元素,下面展示线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:
使用不同的队列可用实现不一样的任务存储策略,在这里我们可以在介绍一下阻塞队列的成员:
名称 | 描述 |
---|---|
ArrayBlockingQueue | 一个数组实现的有界的阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁 |
LinkedBlockingQueue | 一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序,此队列默认长度为Integer.MAX_VALUE,所以默认创建的该队列有容量危险 |
PriorityBlockingQueue | 一个支持优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能同保证优先级元素的顺序 |
DelayQueue | 一个实现PriorityBlockingQueue实现的延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素,只有延时期满后才能从队列中获取元素 |
SynchronousQueue | 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素,支持公平锁和非公平锁,SynchronousQueue一个使用场景是在线程池里,Exectors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后就会被回收。 |
LinkedTransferQueue | 一个由联邦结构组成的无界阻塞队列,相当于其他队列,LinkedTransferQueue对了transfer和tryTransfer方法 |
LinkedBlockingDeque | 一个链表结构组成的双向阻塞队列。队列头和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。 |
任务申请
由上文的任务分配部分可知,任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会字词从队列中申请任务去执行。第一种情况仅仅出现在线程初始创建的时候,第二种情况是线程获取任务的绝大数情况。
线程需要从任务缓存模块中不断获取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信,这部分策略由getTask方法实现,其执行流程如下图所示:
getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态,如果线程池现在不应该持有那么多线程,则会返回null值,工作线程Worker会不断接受新任务去执行,而当工作线程Worker接受不到任务的时候,就会开始被回收。
任务的拒绝
任务拒绝模块是线程池的保护部分,线程池有一个最大容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉任务,采取任务的拒绝策略,保护线程池。
拒绝策略是一个接口,其设计如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
用户可以通过实现这个接口去定值拒绝策略,也可以选择JDK提供的四种已有的拒绝策略,其特点如下:
名称 | 描述 |
---|---|
AbortPolicy | 丢弃任务并抛出RejectedExecution异常。这是线程初次默认的拒绝策略,在任务不能再提交的时候,抛出异常,及时反馈程序运行状态,如果是比较关键的业务,推荐使用此拒绝策略,这样在系统不能承受更大并发量的时候,能够及时的通过异常发现 |
DiscardPolicy | 丢弃任务,但是不抛出异常,可能会使我们无法大仙系统的异常状态,建议一些无关紧要的业务采取此策略 |
DiscardOldPolicy | 丢弃队列最前面的任务,然后重新提交被拒绝的任务,是否采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量 |
CallerRunsPolicy | 由调用线程(提交任务的线程)处理该任务,这种情况需要让所有的任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅增大了吞吐量的手段,最终必须要让每个任务都执行完毕 |
Worker线程管理
Worker线程
线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。我们看一下他的部分代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;//Worker持有的线程
Runnable firstTask;//初始化的任务,可以为null
}
Worker这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask,thread是在调用构造方法时通过ThreadFactory创建的线程,可以用了执行任务,firstTask用他保存传入的第一个任务,这个任务可以有也可以为null,如果这个值是非null,那么线程就会在启动初期立即执行这个任务,也就是核心线程创建时的情况,如果这个值是null,那么就需要创建一个线程取执行任务列表(workerQueue)中的任务,也就是非核心线程的创建,Workeer执行任务的模型如下图所示:
线程池需要管理线程的生命周期,需要在线程长时间不运行的时候进行回收,线程池使用一张Hash表去持有线程的引用,这样就可以通过添加引用,移除引用这样的操作来控制线程的生命周期,这个时候需要的就是如何判断线程是否在运行。
Worker是通过继承了AQS,使用AQS来实现独占锁这个功能,没有使用可重入锁ReentrantLock,而是使用AQS,为的是实现不可重入的特性去反应线程现在的执行状态。
- lock方法一旦获取了独占锁,表示当前线程正在执行任务中。
- 如果正在执行任务,则不应该中断线程
- 如果该线程现在不是独占锁,也就是空闲状态,说, 他没有在处理任务,这是可以对线程进行中断
- 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法,中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态,如果线程时空闲状态则回收。
在线程回收过程中就是用了这种特性,回收过程如下图所示:
Worker线程增加
增加线程时通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上一个步骤完成的,该步骤仅仅完成增加线程,并使用他运行,最后返回是否成功这个结果,addWorker方法有两个参数:firstTask、core。
- firstTask:参数用于指定新增的线程执行的第一个任务,该参数可以为null
- core参数为true表示新增的线程时会判断当前活动线程是否少于corePoolSize,false表示新增线程前需要判断当前活动线程是否少于maximumPoolSize,其执行流程如下图所示:
Worker线程回收
线程池中线程的销毁依赖于JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池觉得哪些线程需要回收时,只需要将其引用消除即可,Worker被创建出来后,就会不断进行轮询,然后获取任务执行,核心线程可以无限等待获取任务,非核心线程需要限时获取任务,当Worker无法获取到任务,也就是获取任务为空时,循环结束,Worker会主动消除自身在线程池内的引用
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}
//getTask轮询获取任务,直到获取成功,核心线程可以一直阻塞,但是非核心线程不能一直阻塞
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//线程是否需要回收,如果核心线程也可以被回收那么就是true,如果wc>corePoolSize说明线程池内的线程大于核心线程数,可以被回收的,如果allowCoreThreadTimeOut不能被回收,并且线程池的线程数小于等于核心线程,那么不能回收可以一直阻塞等待
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果需要回收,那么调用poll方法设置获取任务阻塞时长,如果不能回收调用take,可以一直阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程回收的工作是在processWorkerExit方法完成的。
事实上,这个方法中,将线程引用移出线程池就已经结束苏了线程销毁的部分,但由于引起线程销毁的可能性很多,线程池,还要判断是什么引发了这次销毁,是否要改变线程池现阶段状态,是否会根据新的状态,重新分配线程。
Worker线程执行任务
在Worker类中的run方法调用了runWork方法来执行任务的,runWork方法的执行过程下:
- while循环不断通过getTask()方法获取任务
- getTask()方法从阻塞队列中取任务
- 如果线程池正在停止,那么要保证当前线程是中断状态,要么保证当前线程不是中断状态
- 执行任务
- 如果getTask结果为null,则跳出循环,执行processWorkerExit()方法,销毁线程。
流程图如下: