ThreadPoolExecutor
ThreadPoolExecutor是Executor执行框架最重要的一个实现类,提供了线程池管理和任务管理是两个最基本的能力。这篇通过分析ThreadPoolExecutor的源码来看看如何设计和实现一个基于生产者消费者模型的执行器。
线程池有多重要
线程是一个程序员一定会涉及到的一个概念,但是线程的创建和切换都是代价比较大的。所以,我们有没有一个好的方案能做到线程的复用呢?这就涉及到一个概念——线程池。合理的使用线程池能够带来3个很明显的好处:
1.降低资源消耗:通过重用已经创建的线程来降低线程创建和销毁的消耗
2.提高响应速度:任务到达时不需要等待线程创建就可以立即执行。
3.提高线程的可管理性:线程池可以统一管理、分配、调优和监控。
ThreadPoolExecutor 设计原则
线程池就好像生活中的工厂,对于一个工厂最重要的是“订单”和创建商品的“员工”。对应到线程池中就是订单=任务,员工=worker,
作为工厂必然满足生产者消费者模型,工厂接到订单分解为一个个任务会放入一个流水线上,而员工则会从流水线中拿到订单做处理。处理完毕后重新在流水线上获取任务。对应线程池来说外部线程产生一个任务调用线程池处理,线程池首先会将任务放入工作队列,线程池中worker线程不断从工作队列取出任务去处理(有条件无限循环),执行完毕继续从工作队列获取,直到线程池关闭或worker被销毁。
生产者是任务的提交者,是外部调用ThreadPoolExecutor的线程
工作队列是一个阻塞队列的接口,具体的实现类可以有很多种。
消费者是封装了线程的Worker类的集合
创建线程池,需要准备哪些 ?
线程池和工厂一样首先需要创建出来。而创建工厂需要明确如下准备工作:
工厂生成需要的员工数量,这里存在2个值,一个是工厂正常运作需要的最少员工数量我们在线程池中称为核心work数量corePoolSize,一个是工厂最多能够招聘员工是多少,我们在线程池中称为maximumPoolSize,
为了节省成本有些员工如果长期无法从工作队列获取任务(员工之间存在竞争关系)。那么需要淘汰一定数量的员工,我们在线程中规定超过keepAliveTime时间空闲的线程被淘汰。
需要购买一条流水线,我们在线程池中被称工作队列workQueue
需要招聘一名HR来雇佣员工,我们在线程中称为threadFactory
需要招聘售前团队,这里主要是负责工厂超负苛工作无力承担新的订单时拒绝策略,我们在线程中称为线程池的拒绝策略handler
我们线程池的工厂在创建之初为了节约成本并没有招聘员工,而是等到有订单任务时在运作起来。
源码实现
/**
* 创建一个线程池,
* @param corePoolSize 线程池中核心wor线程数量。
* @param maximumPoolSize 线程池中允许的最大worker数量
* @param keepAliveTime worker线程(非核心线程)空闲的时间,大于此时间是被销毁
* @param unit keepAliveTime的单位。TimeUnit
* @param workQueue 用来保存等待执行的任务的阻塞队列
* @param threadFactory 创建work工厂
* @param handler 线程池的拒绝策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
任务来了,如何运作 ?
前面说到一个线程需要运作起来需要任务。当一个订单被指派到工厂时是如何运作的呢?
1 当员工人数 < corePoolSize,每接到一个任务就会去雇佣一个新员工来完成这个任务.对于线程池来说就创建一个work标记为core work线程执行接收的任务
2 如果 (员工人数> corePoolSize) 且(员工人数 < maximumPoolSize),将任务放入流水线,不在雇佣员工。对于线程来说线程会将任务放入工作队列。不在创建新的worker.
3 如果情况2中流水线容量满了,说明当前任务已经超负荷。需要雇佣新员工来处理新的任务。对于线程池来说就创建一个worker来执行新任务
4 如果雇佣的员工已达到上线maximumPoolSize,且流水线容量也满,则新任务只好让售前拒绝。对于线程池来说就是交给RejectedExecutionHandler处理。
[图片上传失败...(image-5cf12c-1562081688936)]
作为一个工厂,也要有生命周期
对于一个工厂伴随着创建到倒闭都需要经历一个生命周期。对于一个工厂来说有如下几个状态
RUNNING:表示工厂正常运行,对应到线程正常运行。
SHUTDOWN:表示工厂正常申请倒闭,这时工厂不接收新的订单任务,对原先接收的订单任务(包括还在工作队列未执行,和正在执行的);对应到线程就是不再接收任务,但仍然会处理已接收的任务
STOP:表示工厂异常倒闭,这时工厂不仅不再接收新的订单任务,还会清理流水线中任务,对正在处理的任务进行终止;对应到线程池就是停止接收新的任务,清空工作队列中的任务。同时对work线程提交中断interrupt
TIDYING: 表示工厂已经没有需要执行的订单任务,等待执行最后的清理动作;对应到线程池表示不在存在任务,等待执行terminate函数(模板方法)
TERMINATED:工厂倒闭
源码实现
线程池用一个整数记录了线程状态和work线程数量
前3位记录线程池状态,后29位记录运行work数量
/**
* 用于记录线程池池的 状态和work线程数量
* 前3位记录线程池状态
* 后29位记录运行work数量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
对于线程池不同状态对应二进制状态
RUNNING -- 对应的高3位值是111。
SHUTDOWN -- 对应的高3位值是000。
STOP -- 对应的高3位值是001。
TIDYING -- 对应的高3位值是010。
TERMINATED -- 对应的高3位值是011。
具体实现和相关方法
/**
* 用于记录线程池的 状态和work线程数量
* 前3位记录线程池状态
* 后29位记录运行work数量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/** Java 中Integer 类型长度为32位,线程池用一个int类型的前3位表示线程池的状态**/
private static final int COUNT_BITS = Integer.SIZE - 3;
/** 用来计算出当前线程池状态中间变量,同时也表示work最大数量
* 00011111 11111111 11111111 11111111
**/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/** -----------------线程池状态----------------- **/
/**
* 线程池RUNNING状态,当前状态下线程池可以接收新的任务,对新接收的任务进行处理,
* 工厂正常运行
*
* -1 二进制 11111111111111111111111111111111 左移动 29位 前三位 111
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 线程池SHUTDOWN状态,当前状态下线程池不在接收新任务,对之前接收的任务(其中包括还在队列等待和正在执行的任务)
* 工厂不在接收新的订单,工厂运行出现了问题
*
* 0 二进制 00000000000000000000000000000000 左移动 29位 前三位 000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 线程池STOP状态,当前状态下线程池不在接收新任务,对之前接收的任务存在队列没有处理的不在处理,正在执行做中断
* 工厂不在接收新的订单,工厂要倒闭了
*
* 1 二进制 00000000000000000000000000000001 左移动 29位 前三位 001
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 线程池TIDYING状态,当前没有待执行的任务,等待执行函数terminated()
* 工厂走倒闭程序,需要做最后清理工作
*
* 2 二进制 00000000000000000000000000000010 左移动 29位 前三位 010
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 执行函数terminated()
* 工厂关闭
* 3 二进制 00000000000000000000000000000011 左移动 29位 前三位 011
*/
private static final int TERMINATED = 3 << COUNT_BITS;
/** 计算获取当前线程池状态 **/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/** 计算获取当前运行work数量**/
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
* 即根据线程池的状态和worker数量合并成整形 ctl
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
/** 判断当前线程池是否小于s,c表示当前线程池状态 **/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
/** 判断当前线程池是否大于等于s,c表示当前线程池状态 **/
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/** 判断当前线程池是否正在正常运行 RUNNING状态**/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 使用CAS增加线程池中work数量(后29位可以直接整数运算)
* 成功返回true,失败返回false
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 使用CAS减少线程池中work数量(后29位可以直接整数运算)
* 成功返回true,失败返回false
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 使用CAS减少线程池中work数量(后29位可以直接整数运算),失败则继续循环直到成功
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
流水线 原来是阻塞队列
前面总是说到工厂是流水线用来放任务,那流水线具体有哪些功能呢?流水线在线程池中是一个BlockingQueue,一个可以阻塞的队列。说具体就是当流水线满了再放任务会阻塞,当流水线不存在任务,work线程去获取任务时同样会阻塞
private final BlockingQueue<Runnable> workQueue;
BlockingQueue API
public interface BlockingQueue<E> extends Queue<E> {
//将元素插入队列尾部,方法在添加失败(比如队列已满)时会报 一些运行时错误.
boolean add(E e);
//将元素插入队列尾部,方法在添加失败(比如队列已满)时,不会抛出异常,只会返回false
boolean offer(E e);
//插入元素e至队尾, 如果队列已满, 则阻塞调用线程直到队列有空闲空间.
void put(E e) throws InterruptedException;
//插入元素e至队列, 如果队列已满, 则限时阻塞调用线程,直到队列有空闲空间或超时.
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//将队首的元素删除,队列为空则抛出异常
boolean remove(Object o);
//将队首的元素删除,队列为空则返回null(继承方法,方便统一写在这)
E poll();
//从队首删除元素,如果队列为空, 则阻塞调用线程直到队列中有元素.
E take() throws InterruptedException;
//从队首删除元素,如果队列为空, 则限时阻塞调用线程,直到队列中有元素或超时.
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//返回剩余可用容量,没有容量限制返回Integer.MAX_VALUE
int remainingCapacity();
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
和Queue接口比较会发现BlockingQueue接口扩展了4个和阻塞相关的核心方法:put(e)、take();offer(e, time, unit)、poll(time, unit)。总结如下
抛出异常:满队列时,执行入队会抛出异常;空队列时执行出队会抛出异常 。
返回特殊值:入队操作会返回布尔值;出队操作成功返回操作值,失败返回空值。
一直阻塞:满队列时,执行入队会进入条件等待队列,线程阻塞;空队列时,执行出队会进入条件等待队列,线程阻塞 。
超时退出:满队列时,执行入队会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
线程池中工作队列的策略
无界队列
可以使用LinkedBlockingQueue(基于链表的有界队列,FIFO),理论上是该队列可以对无限多的任务排队
将导致在所有corePoolSize线程都工作的情况下将新任务加入到队列中。这样,创建的线程就不会超过corePoolSize,也因此,maximumPoolSize的值也就无效了
有界队列
可以使用ArrayBlockingQueue(基于数组结构的有界队列,FIFO),并指定队列的最大长度
使用有界队列可以防止资源耗尽,但也会造成超过队列大小和maximumPoolSize后,提交的任务被拒绝的问题,比较难调整和控制
不排队,直接提交
将任务直接交给线程处理而不保持它们,可使用SynchronousQueue
如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中(corePoolSize-->maximumPoolSize扩容)
Executors.newCachedThreadPool()采用的便是这种策略
如何雇佣一个员工
线程池中一个work的本质是一个Runnable(线程)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
而ThreadFactory创建一个work线程,本质就是创建一个Thead,并将work这个Runnable设置到其属性种,启动这个Thead
public class DefaultThreadFactory implements ThreadFactory{
...省略代码
@Override
public Thread newThread(Runnable r) {
/** 这里r就是work **/
Thread t = new Thread(r, prefix + nextId.incrementAndGet());
try {
if (t.isDaemon()) {
if (!daemon) {
t.setDaemon(false);
}
} else {
if (daemon) {
t.setDaemon(true);
}
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
}
为什么要实现AQS同步状态,作为一个工厂员工同一个时间只能完成一个任务。因而需要在其开始工作和结束工作获取同步状态(加锁),释放同步状态(解锁)。
员工又是如何工作的?
我们知道work是一个Runnable(线程),那么执行线程最重要的是run方法。我们来看下work的run方法,run存在一个有条件的无限循环,work会不段获取任务执行。
/** 工作线程执行,调用外部TheadPoolExecutor.runWorker方法 */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
/** 获取当前线程 **/
Thread wt = Thread.currentThread();
/** 获取执行任务**/
Runnable task = w.firstTask;
/** 将任务从work清理 **/
w.firstTask = null;
...省略代码
try {
/**
* 如果当前work中存在任务则执行,不存在则从WorkQueue获取任务
* getTask()!=null 时work永远不停止
**/
while (task != null || (task = getTask()) != null) {
/** 获取work独占同步状态(表示任务) **/
w.lock();
...省略代码
/** 处理任务 **/
task.run();
...省略代码
/**释放work独占同步状态 **/
w.unlock();
...省略代码
}
}
社会的残酷,淘汰机制
社会是残酷的,当工厂员工多而任务少,作为老板当然想淘汰一些员工,怎么淘汰,如果淘汰?这就需要实现一个淘汰机制。即一个work长期无法获取任务时。而何时开启淘汰机制有2种情况。
何时开启淘汰机制
int c = ctl.get();
int rs = runStateOf(c);
/** 是否允许回收核心work线程 **/
private volatile boolean allowCoreThreadTimeOut;
/** 判断是否需要开启work淘汰机制 **/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
当allowCoreThreadTimeOut=true时,淘汰机制随时开启
当allowCoreThreadTimeOut=false时,wc>corePoolSize 时淘汰机制才开启。
如何实现淘汰
我们知道线程是不可控的。work之所以能够无限运行是因为那个有条件的无限循环,如果我们退出那个循环那么work线程自然销毁,也就说work被淘汰了。那么那个条件是什么?对就是getTask()返回null,
那么如何实现超时呢?你应该知道BlockingQueue有一个获取任务超时的方法
poll,如果长时间没有获取任务则返回null
/**
* 从WorkQueue获取任务
* 同时用来判断work何时退出销毁
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
/** 无限循环,
* 当work超过指定时间没有获取时,设置timedOut = true进行二次遍历时销毁当前work **/
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
...省略代码
/** 获取work数量 **/
int wc = workerCountOf(c);
/** 判断是否需要开启work淘汰机制 **/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 以下几种情况直接销毁当前work
* 超时没有获取任务timedOut=tue,for循环遍历第二次时
* 当前任务超过maximumPoolSize
* **/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
* 如果开启work淘汰机制超时获取任务,调用poll阻塞获取任务,存在超时,如果超时没有获取到任务
* 设置timedOut = true 进入第二次循环销毁
*
* 如果没开启work淘汰机制超时获取任务,调用take阻塞获取任务
* 【这里的阻塞都能被中断响应!!】
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
售前到底在做啥
private volatile RejectedExecutionHandler handler;
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
生意不好做,工厂如何倒闭
我们知道线程不不比员工,你说让结束手头工作就能结束的。当你调用shutdown()时,需要保证每个员工都退出的。说不定很可能他还在傻傻在任务队列那等待呢。
既然要退出还是要退出那个有条件无限循环。退出还是要找到那个getTask()返回null,现在我们来看看之前那些省略的代码。当线程池状态不为运行状态时会返回null.
/**
* 从WorkQueue获取任务
* 同时用来判断work何时退出销毁
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
/** 无限循环,
* 当work超过指定时间没有获取时,设置timedOut = true进行二次遍历时销毁当前work **/
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/** 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空,则停止worker - 1,return null **/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
...省略代码
try {
/**
* 如果开启work淘汰机制超时获取任务,调用poll阻塞获取任务,存在超时,如果超时没有获取到任务
* 设置timedOut = true 进入第二次循环销毁
*
* 如果没开启work淘汰机制超时获取任务,调用take阻塞获取任务
* 【这里的阻塞都能被中断响应!!】
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
那如果work线程在工作队列那阻塞呢?这时就要中断该线程了。注意了workQueue.poll是可以响应中断的哦!
线程扩展
子类实现扩展
/**
* 模板方法给子类实现,执行任务前的操作
*/
protected void beforeExecute(Thread t, Runnable r) { }
/**
* 模板方法给子类实现,执行任务后的操作
*/
protected void afterExecute(Runnable r, Throwable t) { }
/**
* 模板方法给子类实现,线程池状态从TIDYING到TERMINATED需要做的清理动作
*/
protected void terminated() { }
工作队列扩展
可以修改offer方法逻辑在特定时候返回false,从而达到如果 (员工人数> corePoolSize) 且(员工人数 < maximumPoolSize)依旧可以雇佣新员工(创建work)
Executors静态工厂创建几种常用线程池
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
创建一个指定工作线程数的线程池,其中参数 corePoolSize 和 maximumPoolSize 相等,阻塞队列基于LinkedBlockingQueue
它是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是在线程池空闲时,即线程池中没有可运行任务时,它也不会释放工作线程,还会占用一定的系统资源
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
创建一个可缓存工作线程的线程池,默认存活时间60秒,线程池的线程数可达到Integer.MAX_VALUE,即2147483647,内部使用SynchronousQueue作为阻塞队列;
在没有任务执行时,当线程的空闲时间超过keepAliveTime,则工作线程将会终止,当提交新任务时,如果没有空闲线程,则创建新线程执行任务,会导致一定的系统开销
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
初始化的线程池可以在指定的时间内周期性的执行所提交的任务,在实际的业务场景中可以使用该线程池定期的同步数据