结合JDK(1.8)源码分析线程池(ThreadPoolExecutor)实现原理
我们平时所提的线程池,大多指的是ThreadPoolExecutor,而非ThreadPool,关于ThreadPool这里不做赘叙。集中探讨一下ThreadPoolExecutor。
- 带着问题解读源码,先看问题:
- JDK中有多种线程池,我们应该使用哪一种线程池?选择的依据是什么?
- 线程池中的线程会一直存活着吗?
- 线程池饱和之后,无法接收新任务,线程池如何处理这部分任务(线程池拒绝策略)?
- 线程池是如何实现将任务提交与任务执行分离的?
- 线程池中使用了“生产者-消费者”模式,那么它是如何实现的?
- 线程池中的各个参数分别代表什么?并且线程池中的线程数量如何确定?
如果上述问题,很清楚的话,也请同学留步,下面是从源码角度来分析上述问题。
为了便于后续描述,先贴出使用线程池的基本代码:
/**
* 演示线程池基本操作
* @author Fooisart
* Created on 2018-12-11
*/
public class ExecutorServiceDemo {
public static void main(String[] args) {
//1,线程池创建
ExecutorService executor = Executors.newFixedThreadPool(2);
//2,新建任务
Runnable task = new Runnable() {
@Override
public void run() {
System.out.println("The real task is executing!");
}
};
//3,提交任务到线程池
executor.submit(task);
//4,线程池关闭
executor.shutdown();
}
}
上述代码,演示了线程池的一个简单使用方式。文首提到,线程池一般指的是ThreadPoolExecutor,而上述示例代码中,并未出现ThreadPoolExecutor。那是不是表明,在Java中是不是还有其他类表示线程池呢?其实不然,示例代码中,有两个类:
- ExecutorService
- Executors
分别解释一下,这哥俩与ThreadPoolExecutor的关系。
- Executors 看一下源码注释,
Factory and utility methods for Executor
第一句话表明其真正意义,表明自己是一个工具类,进一步说是一个工厂类。这个工厂的产品是—ThreadPoolExecutor。组装不同的参数,生产出不同功能的线程池。Executors源码解读.
-
ExecutorService 不贴它源码了。从他的名字可以看出,它是一个服务(Service),为线程池服务。它是一个接口,提供了一些操作线程池的接口方法。
结合示例代码,分析ThreadPoolExecutor源码。
第1步创建线程池,已解释。第2部创建任务,线程的基础知识。看第3步,提交任务到线程池,跟进(AbstractExecutorService):
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
一共四行代码,第二行是任务封装(封装成FutureTask);第三行,execute,把上一步封装好的任务,在此执行。(BTW,之前读过1.7源码,这个方法是在ThreadPoolExecutor中的,1.8抽象出了一个AbstractExecutorService)。继续跟进,进入ThreadPoolExecutor.execute()方法。此时,正式进入ThreadPoolExecutor,它是线程池的核心类,先简要介绍一下ThreadPoolExecutor。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
ThreadPoolExecutor中有多个构造函数,其中有一个最基础的构造函数,贴出的是它的说明。其中有7个参数,分别介绍,先从简单地讲起:
- threadFactory : 用来创建线程,一般是用默认即可。除非你想把你线程池中的线程编号,或者有规律地命名。
- handler : 拒绝策略。所谓的拒绝策略,即当我们自己所定义的线程池无法再添加任务时(什么时候无法添加一会说),那么会使用拒绝策略。看一下源码中对拒绝策略的说明:
1,In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime
RejectedExecutionException upon rejection.
简介:此为默认拒绝策略,会抛异常
2,In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute
itself runs the task. This provides a simple feedback control mechanism that will
slow down the rate that new tasks are submitted.
简介:把并行改串行,显然会降低线程池的执行效率。与策略1相比,优点是不
会抛异常,已不会丢任务,缺点也很明细,太慢啦!
3,In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
简介:这个很痛快,直接抛弃任务,也没有异常。效率是兼顾了,正确性与完整
性丢失了,或者起码得让我知道,任务被抛弃了
4,In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down,
the task at the head of the work queue is dropped, and then execution is retried
(which can fail again, causing this to be repeated.)
简介:无
- corePoolSize : 核心线程数,即一直存活在线程池中。当有新任务提价时,开始创建,直至创建的线程数达到corePoolSize值。
- workQueue : 工作队列,如果说线程数已经达到了corePoolSize,此时,又有新任务进入,咋办?那么这些任务会进入到等待队列中。
- maximumPoolSize : 最大线程数。前面提到了工作队列,既然是队列,就会有满的那一天。满了之后,咋办?则会接着创建临时线程(有点像快递公司在大促期间找的临时工),临时处理一下这些任务。
- unit : 时间单位。
- keepAliveTime : 存活时间。既然是存活时间,那代表什么存活时间呢?代表的是,上面提到的临时线程存活时间,配合时间单位(unit)参数,确定临时线程干完活之后,还能活多久(快递公司大促期间雇的临时工,雇佣时间)。
介绍完线程池各个参数的意义,回到ThreadPoolExecutor.execute方法:
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* 介绍:该方法是任务执行,从注释中看出任务会在某个时间点执行。所有这个方法,只是保证了任务
* 提交。提交了之后,具体如何执行,还得参照线程池中线程的实际情况。是立即执行还是排队,
* 是用已有线程执行,还是创建新线程处理它,都是未知的。
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 以下为线程池的核心三步,仔细看上文中7个参数的介绍,下面三步也就容易理解了
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 第一步,如果线程池中线程数量小于核心线程数,则创建线程,然后调用执行。
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 第二步,达到核心线程数之后,加入队列。加入队列成功后,等待被取出执行。
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 第三步,如果队列满了(加入队列失败),继续创建线程,直至达到max
* 线程数。如果达到max线程数,开始使用某一种拒绝策略,拒绝任务
*/
/**
* ctl这个参数使用了二进制方式存了线程池的两个信息:
* ①当下线程池中线程数量;②线程池状态,比如线程池关闭了等。
*
* workerCountOf,获取线程池线程数量;
* isRunning() 判断线程池是否处于运行态
*/
int c = ctl.get();
//第一步
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//第二步
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//第三步
else if (!addWorker(command, false))
reject(command);
}
源码中已添加了一些我的个人理解,应该很容易理解了。回想一下,咱们使用线程池,主要是为了高效地执行任务。执行任务,才是我们最关心的。可是至今,仍未明显地看到任务执行,所以继续跟进。我已经趟平了,直接进入addWorder():
private boolean addWorker(Runnable firstTask, boolean core) {
//线程创建前的一些合理性校验
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//任务执行
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/**
* 其实这里的firstTask是我们最初传进来要执行的任务,这里把它封装成了一个Worker。
* 看Worker源码可以知道,其实它是Runnable接口的一个实现类。在这里,Worker充当的
* 是一个线程的角色,它参数中有thread。那么线程的启动肯定要调用start()方法。
* 在下方也确实找到了start()方法,启动之后,根据线程的基本知识,肯定是进入了线程
* 的run()方法。
*/
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
根据上文代码注释中,进入Worker.run()方法:
public void run() {
runWorker(this);
}
继续跟进runWorker(),
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
/**
* firstTask是我们自己提交的任务
*
* 可以看到,下午中task直接调用了run()方法。根据线程的基础知识,只用调用线程的start()方法,
* 才会启动一个线程。而调用线程的run()方法,则会当做普通的方法执行,不会异步执行。
* 其实在这里就解释了任务提交与执行分离。线程池帮我们创建了线程,然后使用它创建的线程串行地
* 调用了我们的run()方法,从而执行了我们的任务。
*/
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask()此处是不断地获取队列中的任务。
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
至此,线程池已基本分析完了。文章开头介绍的那几个问题,没有一一解释,但如果是认真读到这里的话,那几个问题应该已经了然于胸了。
As always and let me know what you think, leave the comments below. Bye :)