源码不会骗你的!!!
一、背景
JAVA通过多线程的方式实现并发,为了方便线程池的管理,JAVA采用线程池的方式对线线程的整个生命周期进行管理。当然,对简单的并发自已也可以对Thread进行人工管理,但并不是此文的重点,而且不建议方式。本文的重点是研究ThreadPoolExecutor管理线程池的策略,让大家对ThreadPoolExecutor的工作原理和过程有一个透彻的理解。
二、几个关系
我们通常采用如下方法创建一个程池:
public class TestThread {
@Test
public void testCallable() throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
List<Future<Integer>> results = new ArrayList<>();
int i = 10;
while (i-- > 0) {
results.add(exec.submit(new GetRand()));
}
for (Future<Integer> n : results) {
System.out.println(n.get());
}
}
}
上便通过一个工场类Executors创建了一个工作类,工场类返回一个ExecutorService 对象。Executors可以返回多种类型的线程池,原谅我简单啰嗦一下这几种线程池
newCachedThreadPool() | 缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse.如果没有,就建一个新的线程加入池中;缓存型池子通常用于执行一些生存期很短的异步型任务因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。能reuse的线程,必须是timeout IDLE内的池中线程,缺省 timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。 |
newFixedThreadPool(int) | newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程;-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子;-和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器;-从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同:fixed池线程数固定,并且是0秒IDLE(无IDLE) cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE |
newScheduledThreadPool(int) | 这个池子里的线程可以按schedule依次delay执行,或周期执行 |
SingleThreadExecutor() | -单例线程,任意时间池中只能有一个线程;-用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE) |
以上几种线程池都都反回了ExecutorService对象,也就是实际是靠ExecutorService来管理线程的整个生命周期。进一步地,我们知道ExecutorService是一个接口,没有具体实现,最后的具体实现应该由ThreadPoolExecutor实现的(当然不包括周期线程池)。别问为什么,请自觉补充接口化编程;我们来看一下几个类的关系,这里有两条路线:
Executor 定义了一个execute接口,ExecutorService继承了Executor,并定义了管理线程生命周期的接口;
- (1)AbstractExecutorService 实现了ExecutorService;ThreadPoolExecutor继承了AbstractExecutorService;这条线是我们关注的重点.
- (2)ScheduleExecutorService 继承了ExecutorService,并增加周期调度的接口;ScheduledThreadPoolExecutor 实现了ScheduleExecutorService,用来管理周期线程池;(本文不介绍)
继承关系如下图所示:
我们再来看一下Executors工场类产生的线程池的方式如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
其最终的实现方式都是通过ThreadPoolExecutor进行实现的;通过上面的介绍,我们知道线程生命周期的管理,在本质上是由ThreadPoolExecutor来实现的,因此只需要透彻理解ThreadPoolExecutor的实现原理即可了解其如何管理线程。
三、ThreadPoolExecutor
3.1 构造参数
先来看一下ThreadPoolExecutor的构造函数:
ublic ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
//默认ThreadFactory,默认handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 默认handler
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
可以看到ThreadPoolExecutor有四个构造函数,只有参数有所不同,其它三个构造函数的具体实现都是由第一个构造函数来完成的。那我们只需要来研究一下第一个构造函数就可了!!!!
先来看一下每个参数的含义(先了解一下大概意思):
先看一下接口注释
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
看懂了,就没必要看下面的中文解释了哈:
corePoolSize:核心线程池大小;啥意思呢?就是线程池应该维护的最小线程的数量, 线程池数量小于该值,则来一个新线程时,就会创建一个新线程,无论线程池中有无线程空闲.
maximumPoolSize: 最大线程池大小;它表示线程池中最大创建线程池的数量.
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于. corePoolSize,
即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了. allowCoreThreadTimeOut(boolean)方法,
在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
unit:和keepAliveTime相当于同一个参数,有以下几个单位:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
BlockingQueue<Runnable> workQueue:阻塞的任务队例;用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;//内部维护一个数组,FIFO策略
LinkedBlockingQueue;//队列使用FIFO策略,内部维护了一个单向链表,默认最大容量是Integer.MAX_VALUE,动态生成节点
所以,线程池最多只有corePoolSize个thread被创建,其他都会在queue中被阻塞
适用场景,确保每个请求都能被执行,不被拒绝
SynchronousQueue;//相当于队列长度为0,因此只要达到 maximumPoolSize就会拒绝新提交的任务
threadFactory:线程工厂,用来创建线程.
handler:当阻塞队列和线程池都满了后,拒绝任务的策略,有以下几种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
3.2 执行过程
我们以execute为例来看一个任务的执行过程(如下流程图):
我们用源码解释一下具体的实现过程:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
再来翻译一下:
任务会通过创建一个新线程或者用线程池中的空闲线程来执行;
如果该任务由于executor被关闭或者队列已满的原因被拒绝执行,刚会交给RejectedExecutionHandler来处理.
前面是函数注释哈,再看函数内的注释:
任务执行要经过以下三个步骤(三个if):
- 1.如果当前线程池中的线程数量小于corePoolSize,则通过addWorker新创建一个线程来执行任务,addWorker会检查workerCount和runState,如果创建线程失败则返回false(addWorker稍后解释)
- 2.如果runState = RUNNING,且成功加入队列当中,还需要进行双因素验证,如果线程池关闭,则移除队列中的线程,并reject;如果当前线程池无线程,则新创建一个线程;
- 3.如果无法将任务加入队例,则尝试新建线程;如果失败,则reject
解释一下线程池的runState(来自参考文献的拷贝).
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
下面的几个static final变量表示runState可能的几个取值。
当创建线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
从上述过程中知道有三个关键操作:addWorker(添加任务),workQueue.offer(将任务添加至队列),reject(拒绝任务);下面我们来看看这三个方法的具体执行过程:
addWorker(添加任务)
同样先读源码,execute代码中,每个条件都执行了addworker,但参数都有不同,大家注意。
addWorker源码:
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
下面按照不同条件来说明addWorker都做了什么,直接在源码中注释了哈:
current_thread_num < corePoolSize
/*********** execute **************/
public void execute(Runnable command) {
... ...
if (workerCountOf(c) < corePoolSize) { // 当前线程数 < corePoolSize
if (addWorker(command, true))
return;
c = ctl.get();
... ...
}
/********* addWorker ************/
private boolean addWorker(Runnable firstTask, boolean core) { // 参数 firstTask != null, core = true
... // 验证是否满足可新增线程的条件,曰:满足^_^
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 通过ThreadFactory创建一个线程,并且线程用于执行firstTask
final Thread t = w.thread;
if (t != null) {
... ...
try {
... ...
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
... ...
workers.add(w);
workerAdded = true;
}
} finally {
... ...
}
if (workerAdded) { // 上面检查是否确实添加线程成功,曰:成功
t.start(); // 线程启动,调用worker.run
workerStarted = true;
}
}
} finally {
... ...
}
... ...
}
/*************** worker.run ***************/
public void run() { runWorker(this);}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // firstTask不为null
w.firstTask = null;
... ...
try {
while (task != null || (task = getTask()) != null) { // 一进来就满足,就执行当前这个task
w.lock();
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
... ...
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}
maximumPoolSize > current_thread_num >= corePoolSize
/*********** execute **************/
public void execute(Runnable command) {
... ...
if (isRunning(c) && workQueue.offer(command)) { // 放入阻塞队列 - workQueue.offer(command)
int recheck = ctl.get();
// 成功加入阻塞队列后,仍需要进行double-check,以防 线程终止了或者线程池在进入这个方法的时候已经shutdown了
if (! isRunning(recheck) && remove(command)) // 如果double check失败,remove用来回滚 workQueue.offer 的操作,执行 workQueue.remove(task)
reject(command); // 拒绝当前的任务
else if (workerCountOf(recheck) == 0) // 如果当前没有线程就创建一个
addWorker(null, false); // 注意参数是 (null, false)
} else if (!addWorker(command, false)) // 如果不能放入阻塞队列,那么久创建一个thread执行当前任务
reject(command);
... ...
}
/********* addWorker ************/
private boolean addWorker(Runnable firstTask, boolean core) { // 参数 firstTask = null, core = false
... ...
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
... ...
try {
... ...
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
... ...
workers.add(w);
workerAdded = true;
}
} finally {
... ...
}
if (workerAdded) {
t.start(); // 线程启动,调用worker.run
workerStarted = true;
}
}
} finally {
... ...
}
return workerStarted;
}
/*************** worker.run ***************/
public void run() { runWorker(this);}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // firstTask == null
w.firstTask = null;
... ...
try {
while (task != null || (task = getTask()) != null) { // 1. 第一次进入,task=null,执行getTask;2. 获取到非null的task之后,执行task
... ...
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
... ...
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}
/********* getTask ************/
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
... ...
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 是否设置了超时时间或者线程数已经达到corePoolSize
... ...
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 如果设置了超时,通过 workQueue.poll 取出还有效的任务
workQueue.take(); // 如果没有设置超时,通过 workQueue.take取出任务
... ...
} catch (InterruptedException retry) {
... ...
}
}
}
这一个过程最难理解,即maximumPoolSize > current_thread_num >= corePoolSize时,会往队列中尾部添加一个任务,并从头部中取出一个任务来运行.
current_thread_num >= maximumPoolSize
/*********** execute **************/
public void execute(Runnable command) {
... ...
else if (!addWorker(command, false)) // addWorker = false,执行reject
reject(command);
... ...
}
/********* addWorker ************/
private boolean addWorker(Runnable firstTask, boolean core) {
for (;;) {
... ...
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 超过maximumPoolSize之后,返回false; 上面 addWorker=false
return false;
}
}
... ...
}
这个就是直接reject了
workQueue
workQueue决定如何添加队列,如何取队列,和并发关系并不大,而且是独立的一块,所以这里就不再详情介绍,给出三篇文档:
SynchronousQueue.
LinkedBlockingQueue.
ArrayBlockingQueue.
reject(拒绝任务)
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
按照四个不同的reject策略详述,RejectedExecutionHandler,默认使用 AbortPolicy.
class | comment |
---|---|
DiscardPolicy | 什么也不做,丢弃当前的task |
DiscardOldestPolicy | executor如果被关闭,什么也不做;如果没被关闭,丢弃队列中最老的task【queue.poll】,重新执行execute(task)【放入队列】 |
CallerRunsPolicy | 如果当前executor没有被关闭,那么使用当前执行execute方法的线程执行task;关闭了的话,就什么也不做 |
AbortPolicy | 抛出一个 RejectedExecutionException |
你会根据自己的业务场景创建线程池了吗?
参考文献:
https://gold.xitu.io/entry/587601a7b123db4a2ed68485/view.
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html