本文章讲解的内容是Java线程池源码分析。
本文章分析的相关的源码基于Java Development Kit(JDK) 13。
概述
线程是操作系统的内核资源,是CPU调度的最小单位,所有的应用程序都运行在线程上,它是我们实现并发和异步的基础。在Java的API中,Thread是实现线程的基础类,每创建一个Thread对象,操作系统内核就会启动一个线程,在Thread的源码中,它所有的关键方法都是本地方法(Native Method),内部实现是大量的JNI的调用,因为线程的实现必须由操作系统提供直接支持。在Linux操作系统中,每一个Java thread对应一个native thread,它们是一一对应的。在Android中,创建线程的过程中会调用Linux API中的pthread_create函数。
线程的调用会存在以下问题:
- 线程不是轻量级资源,大量创建线程会消耗系统大量资源,传统的阻塞调用会导致系统存在大量的因为阻塞而不能运行的线程,这是非常浪费系统资源。
- 线程运行状态和阻塞状态的切换会存在相当大的开销,一直以来都是优化点,例如:Java虚拟机在运行时会对锁进行优化,就像自旋锁、锁粗化和锁消除等。
线程池(Thread Pool)是一种基于池化思想管理线程的工具,使用线程池有如下好处:
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程的创建和销毁带来的损耗。
- 提高响应速度:任务到达时,无需等待线程创建就可以立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备拓展性,允许开发人员向其中增加更多的功能。
结构
ThreadPoolExecutor的UML类图,如下图所示:
ThreadPoolExecutor类继承AbstractExecutorService抽象类,AbstractExecutorService抽象类实现ExecutorService接口,ExecutorService接口继承Executor接口。
Executor
接口Executor可以执行提交的Runnable对象的任务,它的思想是可以将任务提交和每个任务运行机制(例如:线程的使用和线程的调度)解耦,我们无需关注线程是怎样创建的,也无需关注线程是怎样被调度执行任务的,只需要提供Runnable对象。源码如下所示:
// Executor.java
package java.util.concurrent;
public interface Executor {
// 在之后的某个时段执行给定的任务,该任务可以在新线程中执行,也可以在线程池中的线程中执行,也可以在调用线程中执行,这个是由Executor的实现决定的
void execute(Runnable command);
}
ExecutorService
接口ExeutorService可以提供如下能力:
- 扩充Executor的能力:提供了为一个或者多个异步任务生成Future的方法。
- 提供管理线程的能力:提供了终止线程池运行的方法。
源码如下所示:
// ExecutorService.java
package java.util.concurrent;
import java.util.Collection;
import java.util.List;
public interface ExecutorService extends Executor {
// 启动有序关闭,在这个关闭过程中,会继续执行先前提交的任务,但是不接受新任务,如果已经关闭,调用这个方法没有额外影响
void shutdown();
// 尝试停止所有正在执行的任务,停止等待任务的处理,并且返回等待执行的任务列表
List<Runnable> shutdownNow();
// 如果Excutor已经关闭,就返回true,否则返回false
boolean isShutdown();
// 如果关闭后所有任务都已经完成,就返回true,要注意的是,除非先调用shutdown()方法或者shutdownNow()方法,否则这个方法永远不会返回true
boolean isTerminated();
// 阻塞,直到所有任务在关闭请求,或者发生超时,或者当前线程被中断(要注意的是,以最先发生的为准)后完成执行执行
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交一个有返回值的任务以执行,并且返回一个表示该任务挂起结果的Future,Future的get方法将在任务成功完成时返回任务的结果
<T> Future<T> submit(Callable<T> task);
// 提交一个可运行的任务以执行,并且返回一个表示该任务挂起结果的Future,Future的get方法将在任务成功完成时返回任务的结果
<T> Future<T> submit(Runnable task, T result);
// 提交一个可运行的任务以执行,并且返回一个表示该任务挂起结果的Future,Future的get方法将在任务完成后返回null
Future<?> submit(Runnable task);
// 执行给定的任务,当所有任务完成时,返回包含其状态和结果的Future列表,Future列表中的每个元素中的isDone()方法是返回true,要注意的是,一个已完成的任务可以正常终止,也可以通过抛出异常终止,如果在执行此操作时修改了给定的集合,则此方法的结果将未知
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 执行给定的任务,当所有任务完成时,返回包含其状态和结果的Future列表,Future列表中的每个元素中的isDone()方法是返回true,在返回时,未完成的任务将被取消,要注意的是,一个已完成的任务可以正常终止,也可以通过抛出异常终止,如果在执行此操作时修改了给定的集合,则此方法的结果将未知
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 执行给定的任务,如果有的话,就返回一个已成功完成的任务的结果(例如:没有抛出异常),在正常或者异常返回时,未完成的任务将被取消,要注意的是,如果在执行此操作时修改了给定的集合,则此方法的结果将未知
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 执行给定的任务,如果有任务在给定超时结束之前完成,就返回已成功完成的任务的结果(例如:没有抛出异常),在正常或者异常返回时,未完成的任务将被取消,如果在执行此操作时修改了给定的集合,则此方法的结果将未知
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService
抽象类AbstractExecutorService提供了接口ExecutorService执行方法的默认实现。这个抽象类使用由newTaskFor方法返回的RunnableFuture来实现submit方法、invokeAny方法和invokeAll方法。
线程池分为两个部分:任务管理和线程管理,它使用了生产者消费者模型,任务管理充当生产者的角色,线程管理充当消费者的角色。
任务管理
当任务提交后,线程池会执行以下事情:
- 申请线程执行任务。
- 将任务放到缓冲队列中,等待执行。
- 拒绝执行任务。
线程管理
线程池根据任务进行线程分配,当线程执行完当前任务后会继续执行下个任务,如果线程获取不到任务就会被回收。
运行状态
线程池的运行状态使用变量ctl控制,源码如下所示:
// ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 运行状态(runState)会存储在高阶位中
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 包装和拆装ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
COUNT_BITS的值是Integer.SIZE(值是32)减3,也就是29,可以看到运行状态(runState)都是基于COUNT_BITS做左移运算,也就是运行状态会存储在高3位中。
变量ctl使用两个值维护,分别是运行状态(runState)和工作线程数量(workerCount),高3位保存运行状态,低29位保存工作线程数量,这样做的好处是可以避免执行相关逻辑的时候,如果出现不一致的情况下,不需要因为维护两者的一致,而占用锁资源。可以发现这里采用位运算,根据之前阅读源码的经验,采用位运算会比基本运算效率要高。
线程池的五种状态:
- RUNNING:接受新任务,并且处理阻塞队列中的任务。
- SHUTDOWN:不接受新任务,但是会去处理阻塞队列中的任务。
- STOP:不接受新任务,不处理阻塞队列中的任务,也会中断正在进行中的任务。
- TIDYING:所有任务都已经终止,线程数量是零,转换到这个状态的线程将调用terminated()钩子方法(hook method)。
- TERMINATED:调用terminated()方法完成后就会进入这个状态。
线程池的状态先是RUNNING状态,然后分成两种情况:
- 如果调用shutdown()方法,就会进入SHUTDOWN状态,然后进入TIDYING状态,阻塞队列为空,线程池中的工作线程数量为零,最后调用terminated()方法后进入TERMINATED状态。
- 如果调用shutdownNow()方法,就会进入STOP状态,然后进入TIDYING状态,线程池中的工作线程数量为零,但是阻塞队列不一定为空,最后调用terminated()方法后进入TERMINATED状态。
任务调度
任务调度是线程池的核心机制,相关的逻辑在execute(Runnable command)方法中,源码如下所示:
// ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
// 如果变量command为空,就抛出NullPointerException异常
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 如果workerCount(工作线程数量)小于corePoolSize(核心池大小),就创建并且启动一个线程执行新提交的任务
if (addWorker(command, true))
// 如果添加任务成功,就返回
return;
// 如果添加任务失败,就再次获取运行状态和工作线程数量
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
// 如果当前线程池的运行状态是RUNNING状态,并且添加任务成功,执行以下逻辑
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 如果当前线程池的运行状态不是RUNNING状态,并且移除任务成功,就调用reject(Runnable command)方法,拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0)
// 如果没有工作线程,就调用addWorker(Runnable firstTask, boolean core)方法,第一个参数firstTask传入null,表示在线程池中创建一个线程,但是不启动它;第二参数core传入false,表示将线程池的最大线程数量设为maximumPoolSize的值
addWorker(null, false);
}
else if (!addWorker(command, false))
// 如果添加任务失败,就调用reject(Runnable command),拒绝任务
reject(command);
}
一些重要的成员变量,源码如下所示:
// ThreadPoolExecutor.java
// 用于保存任务并且将其传递给工作线程的队列,可以使用使用isEmpty()方法判断队列是否为空,例如:判断是否决定从SHUTDOWN状态进入TIDYING状态
private final BlockingQueue<Runnable> workQueue;
// 对变量和工作线程加上可重入的独占锁。虽然可以使用其他可以处理并发问题的集合,但是使用ReentrantLock会更加可取,其中一个原因是这种方法序列化了interruptIdleWorkers,从而可以避免不必要的中断,特别是在shutdown的时候,否则,已经退出的线程将并发地中断那些尚未中断的线程,它还对largestPoolSize等相关的变量加锁,shutdown()方法和shutdownNow()持有这个锁,以确保工作线程是稳定的,能正确处理中断状态
private final ReentrantLock mainLock = new ReentrantLock();
// 包含线程池中所有工作线程的集合(仅在持有mainLock锁时访问)
private final HashSet<Worker> workers = new HashSet<>();
// 等待条件,以支持等待终止
private final Condition termination = mainLock.newCondition();
// 最大池大小(仅在持有mainLock锁时访问)
private int largestPoolSize;
// 已完成任务的计数器,仅在工作线程终止时更新(仅在持有mainLock锁时访问)
private long completedTaskCount;
// 创建线程的工厂
private volatile ThreadFactory threadFactory;
// 处理拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲线程等待工作的超时时间(以纳秒为单位),当workerCount(工作线程的数量)大于corePoolSize或者allowCoreThreadTimeOut时,线程使用这个超时时间
private volatile long keepAliveTime;
// 如果返回false(默认),核心线程即使在空闲时也保持活动;如果返回true,核心线程使用keepAliveTime来超时等待工作
private volatile boolean allowCoreThreadTimeOut;
// 核心池大小,它是保持活动的工作线程的最小数量(并且不允许超时等)
private volatile int corePoolSize;
// 最大池大小
private volatile int maximumPoolSize;
// 默认的拒绝策略是AbortPolicy
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
// 调用shutdown()方法和shutdownNow()方法需要的权限
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
执行过程如下所示:
- 检查线程池的运行状态,保证是在RUNNING状态下执行任务,如果不是这个状态,就直接拒绝。
- 如果workerCount(工作线程数量)小于corePoolSize(核心池大小),就创建并且启动一个线程执行新提交的任务。
- 如果workerCount大于等于corePoolSize,并且如果阻塞队列还没满,就将新提交的任务放到阻塞队列中。
- 如果workerCount大于等于corePoolSize,并且阻塞队列已经满了,并且workerCount小于maximumPoolSize,就创建并且启动一个线程执行新提交的任务。
- 如果workerCount大于等于corePoolSize,并且阻塞队列已经满了,并且workerCount大于等于maximumPoolSize,就根据拒绝策略处理这个任务,默认的处理方式是抛出RejectedExecutionException异常。
部分过程是在addWorker(Runnable firstTask, boolean core)方法中,后面会详细讲解。
任务缓冲
任务缓冲是线程池能够管理任务的核心机制,它是通过一个阻塞队列(BlockingQueue)实现的,阻塞队列缓存任务,工作线程从阻塞队列中获取任务,它符合生产者消费者模型,生产者是添加元素的线程,消费者是获取元素的线程。
阻塞队列(BlockingQueue)的数据结构是队列,它支持两个附加操作,分别是:
- 在阻塞队列为空的时候,获取元素的线程会等待该队列变为非空。
- 在阻塞队列已经满的时候,添加元素的线程会等待该队列变为可用。
源码如下所示:
// BlockingQueue.java
public interface BlockingQueue<E> extends Queue<E> {
// 在不超过队列容量的前提下,将指定的元素添加到队列中,成功就返回true,如果队列已经满了,就抛出IllegalStateException异常,当使用有容量限制的队列时,最好还是调用offer(E e)方法
boolean add(E e);
// 在不超过容量的前提下,将指定的元素添加到队列中,成功就返回true,如果队列已经满了,就返回false,当使用有容量限制的队列时,调用这个方法会比add(E e)方法好
boolean offer(E e);
// 将指定的元素添加到队列中,如果没有足够的空间,就等待
void put(E e) throws InterruptedException;
// 将指定的元素添加到队列中,如果没有足够的空间,就等待到指定的等待时间
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 检索并且删除队列的头元素,如果没有足够的空间,就等待
E take() throws InterruptedException;
// 检索并且删除队列的头元素,如果没有足够的空间,就等待到指定的等待时间
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 返回队列在理想情况下(在没有内存或者资源约束的情况下)可以不阻塞地接受额外元素的数量,如果没有限制,就返回Integer.MAX_VALUE的值
int remainingCapacity();
// 如果队列中包含一个或者多个指定的元素(通过equals方法判断是否是同一个元素),就删除这些元素,并且返回true,否则返回false
boolean remove(Object o);
// 如果队列中包含一个或者多个指定的元素(通过equals方法判断是否是同一个元素),就返回true,否则返回false
boolean contains(Object o);
// 从队列中删除指定的集合c中的元素,如果指定的集合c是该队列或者该队列的某个元素的某些属性阻止将其添加到指定的集合中,就抛出IllegalArgumentException异常,如果在执行此操作时修改了给定的集合,则此方法的结果将未知
int drainTo(Collection<? super E> c);
// 从队列中删除指定的集合c中的数量是maxElements的元素,如果指定的集合c是该队列或者该队列的某个元素的某些属性阻止将其添加到指定的集合中,就抛出IllegalArgumentException异常,如果在执行此操作时修改了给定的集合,则此方法的结果将未知
int drainTo(Collection<? super E> c, int maxElements);
}
看下接口BlockingQueue的实现类,可得知阻塞队列有以下几种类型:
- ArrayBlockingQueue:使用数组实现的有容量限制的阻塞队列,按照先进先出(FIFO,First-In-First-Out)的原则对元素进行排序,支持公平锁和非公平锁。
- DelayQueue:实现延迟获取的没有容量限制的阻塞队列,可以指定延迟时间,只有达到指定的延迟时间,才能获取该队列中的元素。
- LinkedBlockingDeque:使用双向链表实现的阻塞队列,队头和队尾都可以添加或者删除元素,在并发环境下,可以将锁的竞争最多降到一半。
- LinkedBlockingQueue:使用链表实现的有容量限制的阻塞队列,按照先进先出(FIFO,First-In-First-Out)的原则对元素进行排序,默认长度是Integer.MAX_VALUE,所以默认创建该队列可能有容量危险,该队列通常比基于数组实现的队列具有更高的吞吐量,但是在并发环境下,性能会比较差。
- LinkedTransferQueue:使用链表实现的没有容量限制的阻塞队列,按照先进先出(FIFO,First-In-First-Out)的原则对元素进行排序,和其他队列对比,多出了两个方法,分别是:transfer(E e)方法和tryTransfer(E e, long timeout, TimeUnit unit)方法。
- PriorityBlockingQueue:支持线程优先级排序的没有容量限制的阻塞队列,默认是按自然顺序排序,也就是从优先级低到高排序,也可以实现compareTo()方法来指定元素的排序规则,但是不能保证同优先级元素的顺序。
- SynchronousQueue:不存储元素的阻塞队列,支持公平锁和非公平锁,每一个插入操作都必须要等待另一个线程的移除操作,每一个移除操作都必须要等待另一个线程的插入操作,其中,Executors.newCachedThreadPool()使用了这个队列。
任务申请
任务申请相关的逻辑在getTask()方法中,源码如下所示:
// ThreadPoolExecutor.java
private Runnable getTask() {
boolean timedOut = false;
// 循环执行
for (;;) {
// 获取线程池的运行状态和工作线程数量
int c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
// 如果线程池的运行状态至少是SHUTDOWN状态,并且至少是STOP状态,或者阻塞队列为空,就执行以下逻辑
// 调用decrementWorkerCount()方法,减少成员变量ctl的workerCount的值
decrementWorkerCount();
// 返回null
return null;
}
// 获取工作线程数量
int wc = workerCountOf(c);
// 获取线程是否是回收状态
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
// 如果线程数量大于maximumPoolSize的值,也就是线程数量过多,就返回null
return null;
continue;
}
try {
// 如果线程是可回收的,就调用poll(long timeout, TimeUnit unit)方法,否则调用take()方法
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
执行过程如下所示:
- 获取线程池的运行状态和工作线程数量。
- 如果线程池已经停止执行,就返回null,否则执行步骤3。
- 如果线程池数量过多,就返回null,否则执行执行步骤4。
- 如果该线程是可回收的,就调用poll(long timeout, TimeUnit unit)方法,检索并且删除队列的头元素,如果没有足够的空间,就等待到指定的等待时间;如果该线程是不可回收的,就调用take()方法,检索并且删除队列的头元素,如果没有足够的空间,就等待。
任务拒绝
任务拒绝是线程池的保护机制,相关的逻辑在RejectedExecutionHandler中,源码如下所示:
// RejectedExecutionHandler.java
public interface RejectedExecutionHandler {
// 当线程池不能接受任务时,会调用这个方法
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
看下接口RejectedExecutionHandler的实现类,可得知拒绝策略有以下几种类型:
- ThreadPoolExecutor.AbortPolicy:线程池的默认拒绝策略,也就是ThreadPoolExecutor和ScheduledThreadPoolExecutor的默认拒绝策略,丢弃任务,并且抛出RejectedExecutionException异常,建议使用这个策略,因为可以方便通过异常发现。
- ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程处理该任务,这种情况下,需要等到所有任务执行完毕,这种策略适合大量计算的任务类型。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务,这种策略适合经常需要丢弃旧的任务类型。
- ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常,使用这种策略会导致我们无法发现异常。
工作线程管理
Worker类是线程池的工作线程的类,它是ThreadPoolExecutor类的一个被关键字final修饰的内部类,继承了AbstractQueuedSynchronizer类,并且实现了Runnable接口,源码如下所示:
// ThreadPoolExecutor.java
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 序列化版本号
private static final long serialVersionUID = 6138294804551838833L;
// 正在运行的工作线程,如果创建失败的,就为空
final Thread thread;
// 要运行的初始任务,可能为空
Runnable firstTask;
// 线程任务计数器
volatile long completedTasks;
// 从ThreadFactory中创建的第一个线程和任务
Worker(Runnable firstTask) {
// 在运行工作线程之前禁止中断
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
// 调用runWorker(Worker w)方法
runWorker(this);
}
// 以下是加锁的方法,值是0表示未解锁状态,值是1表示解锁状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
根据前面的源码可得知,线程池使用HashSet存储所有的工作线程,也就是存储所有的Worker对象,这样就可以方便地通过添加或者删除元素来控制线程池中的线程。
Worker类通过继承AbstractQueuedSynchronizer类实现独占锁,独占锁的意思是每次只有一个线程持有锁,没有使用ReentrantLock的原因是因为它是可重入锁,线程池不能允许工作线程能够多次获取锁,所以使用AbstractQueuedSynchronizer。
AbstractQueuedSynchronizer是依赖先进先出(FIFO,First-In-First-Out)的队列的阻塞锁和相关同步器(信号量、事件等)实现的,里面维护着一个int的state。
添加工作线程
看下addWorker(Runnable firstTask, boolean core)方法,源码如下所示:
// ThreadPoolExecutor.java
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
// 获取线程池的运行状态和工作线程数量
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
// 如果线程池已经停止,就添加工作线程失败
return false;
// 循环执行
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
// 获取工作线程数量,如果工作线程数量大于等于核心池大小或者最大池大小,就添加工作线程失败
return false;
// 增加workerCount(原子操作)
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加失败,就再次获取ctl
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN))
// 如果中途有其他线程修改线程池的运行状态,使其变成SHUTDOWN状态,就跳到retry标签处,回到最外层的循环
continue retry;
}
}
// Worker对象对应的线程是否已经启动
boolean workerStarted = false;
// Worker对象是否添加到HashSet成功
boolean workerAdded = false;
Worker w = null;
try {
// 使用firstTask创建Worker对象
w = new Worker(firstTask);
// 根据Worker对象创建一个线程
final Thread t = w.thread;
if (t != null) {
// 如果线程不为空,就创建ReentrantLock对象
final ReentrantLock mainLock = this.mainLock;
// 对mainLock加锁
mainLock.lock();
try {
// 保持锁定状态时再次检查,在ThreadFactory失败或者在获得锁之前关闭退出
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 如果线程池处于小于STOP状态,也就是RUNNING状态,就执行以下逻辑
if (t.getState() != Thread.State.NEW)
// 如果线程不是创建状态,就抛出IllegalThreadStateException异常
throw new IllegalThreadStateException();
// 将Worker对象添加到HashSet中
workers.add(w);
// Worker对象成功添加到HashSet
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
// 如果线程数量大于最大池大小,就将该值赋值给最大池大小
largestPoolSize = s;
}
} finally {
// 对mainLock解锁
mainLock.unlock();
}
if (workerAdded) {
// 如果Worker对象成功添加到HashSet中,就启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 如果线程没有启动,就调用addWorkerFailed(Worker w)方法,执行回滚工作线程的创建,也就是做一些清理工作
addWorkerFailed(w);
}
// 返回线程是否启动成功
return workerStarted;
}
执行流程如下所示:
- 检查线程池是否已经停止,如果是,就添加线程失败,否则执行步骤2。
- 检查线程池是否正在停止,如果是,就执行步骤3,否则添加线程失败。
- 检查线程是否用于执行剩余任务,如果是,就执行步骤4,否则添加线程失败。
- 获取工作线程数量,检查线程池的运行状态是否发生改变,如果有改变,执行步骤1,否则执行步骤5。
- 检查工作线程数量是否大于核心池大小或者最大池大小(取决于布尔型形式参数core,core是true,就使用核心池大小;core是false,就使用最大池大小),如果是,就添加线程失败,否则执行步骤6。
- 把Worker对象存储到HashSet中,如果成功添加,就启动线程,否则执行步骤4。
执行工作线程
// ThreadPoolExecutor.java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 调用getTask获取任务
while (task != null || (task = getTask()) != null) {
// 对Worker对象加锁
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 如果线程大于等于STOP状态,并且线程还不是中断状态,就中断线程
wt.interrupt();
try {
// 调用beforeExecute(Thread t, Runnable r)方法,这个方法可以给子类去实现
beforeExecute(wt, task);
try {
// 执行任务
task.run();
// 调用afterExecute(Runnable r, Throwable t)方法,这个方法是可以给子类去实现
afterExecute(task, null);
} catch (Throwable ex) {
// 如果有异常,就调用afterExecute(Runnable r, Throwable t)方法,这个方法是可以给子类去实现
afterExecute(task, ex);
throw ex;
}
} finally {
// task设为null,准备下一个任务
task = null;
// 完成任务数量的值自增
w.completedTasks++;
// 对Worker对象解锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 调用processWorkerExit(Worker w, boolean completedAbruptly)方法,回收工作线程
processWorkerExit(w, completedAbruptly);
}
}
执行流程如下所示:
- 如果firstTask不为空,执行firstTask,调用getTask()方法获取任务执行。
- 如果firstTask为空,调用getTask()方法获取任务执行。
回收工作线程
// ThreadPoolExecutor.java
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
// 如果completedAbruptly是true的话,证明workerCount的值还没有被减少
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
// 对mainLock加锁
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 移除HashSet中的对应的Worker对象
workers.remove(w);
} finally {
// 对mainLock解锁
mainLock.unlock();
}
// 调用tryTerminate()方法,这个方法会调用terminated()方法,转换成TERMINATED状态
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
执行流程如下所示:
- 检查任务执行情况,例如:检查workCount的值是否被减少。
- 移除HashSet中对应的Worker对象。
- 线程池的运行状态变为TERMINATED状态。
线程池的种类
在Executors类中封装了五种线程池:
newFixedThreadPool
源码如下所示:
// Executors.java
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
该线程池线程数量固定,核心池大小(coolPoolSize)和最大池大小(maximumPoolSize)都一样,并且keepAliveTime是0L,使用的阻塞队列是LinkedBlockingQueue,它使用链表实现的有容量限制的阻塞队列,按照先进先出(FIFO,First-In-First-Out)的原则对元素进行排序,默认长度是Integer.MAX_VALUE,所以默认创建该队列可能有容量危险,该队列通常比基于数组实现的队列具有更高的吞吐量,但是在并发环境下,性能会比较差。
适用于在已知并发压力下,执行耗时长的任务的场景。
newWorkStealingPool
源码如下所示:
// Executors.java
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
该线程池可以拥有多个队列,以便减少连接数,默认线程数量是当前计算机可用的CPU数量。
适用于耗时长、需要并发执行任务的场景。
newSingleThreadExecutor
源码如下所示:
// Executors.java
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
该线程池线程数量只有一个,核心池大小(coolPoolSize)和最大池大小(maximumPoolSize)都1,并且keepAliveTime是0L,使用的阻塞队列是LinkedBlockingQueue,它使用链表实现的有容量限制的阻塞队列,按照先进先出(FIFO,First-In-First-Out)的原则对元素进行排序,默认长度是Integer.MAX_VALUE,所以默认创建该队列可能有容量危险,该队列通常比基于数组实现的队列具有更高的吞吐量,但是在并发环境下,性能会比较差。
适用于需要保证任务执行顺序的场景。
newCachedThreadPool
源码如下所示:
// Executors.java
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
该线程池可以缓存线程,它会根据需要创建新线程,但是会在可用的时候重用之前创建的线程,核心池大小(coolPoolSize)是0,最大池大小(maximumPoolSize)是Integer.MAX_VALUE,并且keepAliveTime是60L,使用的阻塞队列是SynchronousQueue,它不存储元素的阻塞队列,支持公平锁和非公平锁,每一个插入操作都必须要等待另一个线程的移除操作,每一个移除操作都必须要等待另一个线程的插入操作。
适用于执行耗时比较短的任务的场景。
newScheduledThreadPool
源码如下所示:
// Executors.java
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
该线程池可以安排任务在给定的延迟后运行或者定期运行,使用的阻塞队列是DelayedWorkQueue,它是一个支持延迟的阻塞队列。
适用于执行周期性任务的场景。
我的GitHub:TanJiaJunBeyond
Android通用框架:Android通用框架
我的掘金:谭嘉俊
我的简书:谭嘉俊
我的CSDN:谭嘉俊