一、池化技术
程序的运行,其本质上,是对系统资源(CPU、内存、磁盘、网络等等)的使用,如何高效的使用这些资源是编程优化演进的一个方向,池化技术就是非常重要的一项优化手段。
池化技术简单点来说,就是提前保存大量的资源,以备不时之需。在机器资源有限的情况下,使用池化技术可以大大的提高资源的利用率,提升性能等。在编程领域,比较典型的池化技术有:线程池、连接池、内存池、对象池等。
下面代码可以创建一个线程:
public class Application {
public static void main(String[] args) throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程运行......");
}
}).start();
}
}
实现Runnable接口就可以实现一个简单的线程。可以利用上多核CPU。当一个任务结束,当前线程就接收。但很多时候,我们不止会执行一个任务。如果每次都是如此的创建线程、执行任务,销毁线程,会造成很大的性能开销。
那能否一个线程创建后,执行完一个任务后,又去执行另一个任务,而不是销毁?这就是线程池能解决的问题。这也就是池化技术的思想,通过预先创建好多个线程,放在池中,这样可以在需要使用线程的时候直接获取,避免多次重复创建、销毁带来的开销。
二、线程池的使用
1. 通过Executors创建:
ExecutorService executorService1 = Executors.newFixedThreadPool(3);
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
ExecutorService executorService3 = Executors.newCachedThreadPool();
ExecutorService executorService4 = Executors.newScheduledThreadPool(3);
executorService1.execute(new Runnable() {
@Override
public void run() {
}
});
通过Executors工厂类中的静态方法可以更简单的进行线程池的创建。在阿里巴巴Java开发手册中,明确说明不允许使用Executors,因为Executors的不正确使用会带来若干问题,如OOM等。
2. 通过ThreadPoolExecutor创建:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
}
});
我们在实践中开发中,应尽量选择此方式,并进行完善的异常控制机制。
三、线程池的类设计
我们先看一下ThreadPoolExecutor类概要图:
ExecutorService是真正的线程池接口。
Executor是线程池的顶级接口,只是一个执行线程的工具,只提供一个execute(Runnable command)的方法,真正的线程池接口是ExecutorService。
AbstractExecutorService实现了ExecutorService接口,实现了其中大部分的方法(有未实现的方法,所以被声明为Abstract)。
ThreadPoolExecutor,继承了AbstractExecutorService,是ExecutorService的默认实现。
另外,Executors也是ThreadPoolExecutor相关类,生产各种类型线程池,上文已作介绍,不推荐使用。
ThreadPoolExecutor类
1. 构造方法
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。
在ThreadPoolExecutor类中提供了四个构造方法:
/**
* 注释省略.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
/**
* 注释省略.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
/**
* 注释省略.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 注释省略.
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
前面三个构造方法缺省部分参数,会使用缺省参数的默认值作为相应实参,调用最后一个构造方法。通过构造方法即可创建线程池。
构造方法中各个参数的含义:
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。
unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; // 天
TimeUnit.HOURS; // 小时
TimeUnit.MINUTES; // 分钟
TimeUnit.SECONDS; // 秒
TimeUnit.MILLISECONDS; // 毫秒
TimeUnit.MICROSECONDS; // 微妙
TimeUnit.NANOSECONDS; // 纳秒-
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
- ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
常用的workQueue类型: - SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现「线程数达到了maximumPoolSize而不能新建线程」的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。
- LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize。
- ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误。
- DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
- ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
threadFactory:线程工厂,主要用来创建线程。如果没指定的话,默认会使用Executors.defaultThreadFactory(),一般来说,我们会在这里对线程设置名称、异常处理器等。
handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
2. 方法
- execute()
- submit()
- shutdown()
- shutdownNow()
- getQueue()
- getPoolSize()
- getActiveCount()
- getCompletedTaskCount()
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
shutdown()和shutdownNow()是用来关闭线程池的。
getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,还有在下文提到的runwork()、addwork()、processworkerExit() 方法等等。
AbstractExecutorService类
下面我们看一下AbstractExecutorService的各方法:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {...};
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {...};
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) {...};
public <T> Future<T> submit(Callable<T> task) {...};
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {...};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {...};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {...};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {...};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {...};
}
AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
ExecutorService类
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService继承了Executor接口。
Executor接口
public interface Executor {
void execute(Runnable command);
}
上述类与接口之间的关系
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
然后ThreadPoolExecutor继承了类AbstractExecutorService。
四、线程池的执行过程
AtomicInteger ctl 相关代码
/**
* 注释省略.
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
其中ctl这个 AtomicInteger 的功能很强大,其高3位用于维护线程池运行状态,低29位维护线程池中线程数量
- RUNNING:-1 << COUNT_BITS,即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务。
- SHUTDOWN:0 << COUNT_BITS,即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务。
- STOP:1 << COUNT_BITS,即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务。
- TIDYING:2 << COUNT_BITS,即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法。
- TERMINATED:3 << COUNT_BITS,即高3位为100,低29位为0,terminated()方法调用完成后变成此状态。
这些状态均由int型表示,大小关系为 RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED,这个顺序基本上也是遵循线程池从运行到终止这个过程。
runStateOf(int c) 方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态。
workerCountOf(int c) 方法:c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的线程数量。
ctlOf(int rs, int wc) 方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl。
核心方法源码
execute(Runnable command)方法
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* (待执行的command)
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
// 需要执行的任务command为空,抛出空指针异常
if (command == null) // 1
throw new NullPointerException();
/*
* 执行的流程实际上分为三步:
* 1. 如果运行的线程小于 corePoolSize,以用户给定的 Runable 对象新开一个线程去执行
* 并且执行addWorker方法会以原子性操作去检查 runState 和 workerCount,以防止当返回false的
* 时候添加了不应该添加的线程
* 2. 如果任务能够成功添加到队列当中,我们仍需要对添加的线程进行双重检查,有可能添加的线程在前
* 一次检查时已经死亡,又或者在进入该方法的时候线程池关闭了。所以我们需要复查状态,并有有必
* 要的话需要在停止时回滚入列操作,或者在没有线程的时候新开一个线程
* 3. 如果任务无法入列,那我们需要尝试新增一个线程,如果新建线程失败了,我们就知道线程可能关闭了
* 或者饱和了,就需要拒绝这个任务
*
*/
// 获取线程池的控制状态
int c = ctl.get(); // 2
// 通过workCountOf方法算workerCount值,小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 添加任务到worker集合当中
if (addWorker(command, true))
return; // 成功返回
c = ctl.get(); // 失败的话再次获取线程池的控制状态
}
/*
* 判断线程池是否正处于RUNNING状态
* 是的话添加Runnable对象到workQueue队列当中
*/
if (isRunning(c) && workQueue.offer(command)) { // 3
int recheck = ctl.get(); // 再次获取线程池的状态
// 再次检查状态
// 线程池不处于RUNNING状态,将任务从workQueue队列中移除
if (! isRunning(recheck) && remove(command))
// 拒绝任务
reject(command);
// workerCount等于0
else if (workerCountOf(recheck) == 0) // 4
// 添加worker
addWorker(null, false);
}
// 加入阻塞队列失败,则尝试以线程池最大线程数新开线程去执行该任务
else if (!addWorker(command, false)) // 5
// 执行失败则拒绝任务
reject(command);
}
我们来说一下上面这个代码的流程:
1. 首先判断任务是否为空,空则抛出空指针异常
2. 不为空则获取线程池控制状态,判断小于corePoolSize,添加到worker集合当中执行。若成功,则返回;失败的话再接着获取线程池控制状态,因为只有状态变了才会失败,所以重新获取
3. 判断线程池是否处于运行状态,是的话则添加command到阻塞队列,加入时也会再次获取状态并且检测状态是否不处于运行状态,不处于的话则将command从阻塞队列移除,并且拒绝任务
4. 如果线程池里没有了线程,则创建新的线程去执行获取阻塞队列的任务执行
5. 如果以上都没执行成功,则需要开启最大线程池里的线程来执行任务,失败的话就丢弃
addWorker(Runnable firstTask, boolean core)方法
private boolean addWorker(Runnable firstTask, boolean core) {
//外部循环标记
retry:
//外层死循环
for (;;) {
//获取线程池控制状态
int c = ctl.get();
//获取runState
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/**
*1.如果线程池runState至少已经是SHUTDOWN
*2\. 有一个是false则addWorker失败,看false的情况
* - runState==SHUTDOWN,即状态已经大于SHUTDOWN了
* - firstTask为null,即传进来的任务为空,结合上面就是runState是SHUTDOWN,但是
* firstTask不为空,代表线程池已经关闭了还在传任务进来
* - 队列为空,既然任务已经为空,队列为空,就不需要往线程池添加任务了
*/
if (rs >= SHUTDOWN && //runState大于等于SHUTDOWN,初始位RUNNING
! (rs == SHUTDOWN && //runState等于SHUTDOWN
firstTask == null && //firstTask为null
! workQueue.isEmpty())) //workQueue队列不为空
return false;
//内层死循环
for (;;) {
//获取线程池的workerCount数量
int wc = workerCountOf(c);
//如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize
//返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//通过CAS操作,使workerCount数量+1,成功则跳出循环,回到retry标记
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS操作失败,再次获取线程池的控制状态
c = ctl.get(); // Re-read ctl
//如果当前runState不等于刚开始获取的runState,则跳出内层循环,继续外层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//CAS由于更改workerCount而失败,继续内层循环
}
}
//通过以上循环,能执行到这是workerCount成功+1了
//worker开始标记
boolean workerStarted = false;
//worker添加标记
boolean workerAdded = false;
//初始化worker为null
Worker w = null;
try {
//初始化一个当前Runnable对象的worker对象
w = new Worker(firstTask);
//获取该worker对应的线程
final Thread t = w.thread;
//如果线程不为null
if (t != null) {
//初始线程池的锁
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取锁后再次检查,获取线程池runState
int rs = runStateOf(ctl.get());
//当runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask为null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//线程已存活
if (t.isAlive()) // precheck that t is startable
//线程未启动就存活,抛出IllegalThreadStateException异常
throw new IllegalThreadStateException();
//将worker对象添加到workers集合当中
workers.add(w);
//获取workers集合的大小
int s = workers.size();
//如果大小超过largestPoolSize
if (s > largestPoolSize)
//重新设置largestPoolSize
largestPoolSize = s;
//标记worker已经被添加
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
//如果worker添加成功
if (workerAdded) {
//启动线程
t.start();
//标记worker已经启动
workerStarted = true;
}
}
} finally {
//如果worker没有启动成功
if (! workerStarted)
//workerCount-1的操作
addWorkerFailed(w);
}
//返回worker是否启动的标记
return workerStarted;
}
简单说一下代码流程:
1. 获取线程池的控制状态,进行判断,不符合则返回false,符合则下一步
2. 死循环,判断workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,没有的话则对workerCount+1操作
3. 如果不符合上述判断或+1操作失败,再次获取线程池的控制状态,获取runState与刚开始获取的runState相比,不一致则跳出内层循环继续外层循环,否则继续内层循环
4. +1操作成功后,使用重入锁ReentrantLock来保证往workers当中添加worker实例,添加成功就启动该实例
addWorker方法有4种传参的方式(在execute方法中使用了前3种):
addWorker(command, true)
addWorker(command, false)
addWorker(null, false)
addWorker(null, true)
第一个:线程数小于corePoolSize时,放一个需要处理的task进Workers Set。如果Workers Set长度超过corePoolSize,就返回false。
第二个:当队列被放满时,就尝试将这个新来的task直接放入Workers Set,而此时Workers Set的长度限制是maximumPoolSize。如果线程池也满了的话就返回false。
第三个:放入一个空的task进workers Set,长度限制是maximumPoolSize。这样一个task为空的worker在线程执行的时候会去任务队列里拿任务,这样就相当于创建了一个新的线程,只是没有马上分配任务。
第四个:这个方法就是放一个null的task进Workers Set,而且是在小于corePoolSize时,如果此时Set中的数量已经达到corePoolSize那就返回false,什么也不干。实际使用中是在prestartAllCoreThreads()方法,这个方法用来为线程池预先启动corePoolSize个worker等待从workQueue中获取任务执行。
addWorkerFailed(Worker w)
private void addWorkerFailed(Worker w) {
//重入锁
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
//如果worker不为null
if (w != null)
//workers移除worker
workers.remove(w);
//通过CAS操作,workerCount-1
decrementWorkerCount();
tryTerminate();
} finally {
//释放锁
mainLock.unlock();
}
}
addWorker方法添加worker失败,并且没有成功启动任务的时候,就会调用此方法,将任务从workers中移除,并且workerCount做-1操作。
tryTerminate()
final void tryTerminate() {
//死循环
for (;;) {
//获取线程池控制状态
int c = ctl.get();
/*
*线程池处于RUNNING状态
*线程池状态最小大于TIDYING
*线程池==SHUTDOWN并且workQUeue不为空
*直接return,不能终止
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果workerCount不为0
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//获取线程池的锁
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
//通过CAS操作,设置线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//设置线程池的状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//发送释放信号给在termination条件上等待的线程
termination.signalAll();
}
return;
}
} finally {
//释放锁
mainLock.unlock();
}
// else retry on failed CAS
}
}
当对线程池执行了非正常成功逻辑的操作时,都会需要执行tryTerminate尝试终止线程池。
runWorker(Worker w)
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取worker里的任务
Runnable task = w.firstTask;
//将worker实例的任务赋值为null
w.firstTask = null;
/*
*unlock方法会调用AQS的release方法
*release方法会调用具体实现类也就是Worker的tryRelease方法
*也就是将AQS状态置为0,允许中断
*/
w.unlock(); // allow interrupts
//是否突然完成
boolean completedAbruptly = true;
try {
//worker实例的task不为空,或者通过getTask获取的不为空
while (task != null || (task = getTask()) != null) {
//获取锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/*
*获取线程池的控制状态,至少要大于STOP状态
*如果状态不对,检查当前线程是否中断并清除中断状态,并且再次检查线程池状态是否大于STOP
*如果上述满足,检查该对象是否处于中断状态,不清除中断标记
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断改对象
wt.interrupt();
try {
//执行前的方法,由子类具体实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
//执行完后调用的方法,也是由子类具体实现
afterExecute(task, thrown);
}
} finally {//执行完后
//task设置为null
task = null;
//已完成任务数+1
w.completedTasks++;
//释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理并退出当前worker
processWorkerExit(w, completedAbruptly);
}
}
该方法的作用就是去执行任务。执行步骤如下:
1. 首先在方法一进来,就执行了w.unlock(),这是为了将AQS的状态改为0,因为只有getState() >= 0的时候,线程才可以被中断
2. 判断firstTask是否为空,为空则通过getTask()获取任务,不为空接着往下执行
3. 判断是否符合中断状态,符合的话设置中断标记
4. 执行beforeExecute(),task.run(),afterExecute()方法
5. 任何一个出异常都会导致任务执行的终止;进入processWorkerExit来退出任务
6. 正常执行的话会接着回到步骤2
getTask()
private Runnable getTask() {
//标志是否获取任务超时
boolean timedOut = false; // Did the last poll() time out?
//死循环
for (;;) {
//获取线程池的控制状态
int c = ctl.get();
//获取线程池的runState
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
*判断线程池的状态,出现以下两种情况
*1、runState大于等于SHUTDOWN状态
*2、runState大于等于STOP或者阻塞队列为空
*将会通过CAS操作,进行workerCount-1并返回null
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取线程池的workerCount
int wc = workerCountOf(c);
// Are workers subject to culling?
/*
*allowCoreThreadTimeOut:是否允许core Thread超时,默认false
*workerCount是否大于核心核心线程池
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
*1、wc大于maximumPoolSize或者已超时
*2、队列不为空时保证至少有一个任务
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/*
*通过CAS操作,workerCount-1
*能进行-1操作,证明wc大于maximumPoolSize或者已经超时
*/
if (compareAndDecrementWorkerCount(c))
//-1操作成功,返回null
return null;
//-1操作失败,继续循环
continue;
}
try {
/*
*wc大于核心线程池
*执行poll方法
*小于核心线程池
*执行take方法
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//判断任务不为空返回任务
if (r != null)
return r;
//获取一段时间没有获取到,获取超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在上面的runWorker方法当中我们可以看出,当firstTask为空的时候,会通过该方法来接着获取任务去执行,执行逻辑顺序如下:
1. 获取线程池控制状态和runState,判断线程池是否已经关闭或者正在关闭,是的话则workerCount-1操作返回null
2. 获取workerCount判断是否大于核心线程池
3. 判断workerCount是否大于最大线程池数目或者已经超时,是的话workerCount-1,-1成功则返回null,不成功则回到步骤1重新继续
4. 判断workerCount是否大于核心线程池,大于则用poll方法从队列获取任务,否则用take方法从队列获取任务
5. 判断任务是否为空,不为空则返回获取的任务,否则回到步骤1重新继续
processWorkerExit(Worker w, boolean completedAbruptly)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
/*
*completedAbruptly:在runWorker出现,代表是否突然完成的意思
*也就是在执行任务过程当中出现异常,就会突然完成,传true
*
*如果是突然完成,需要通过CAS操作,workerCount-1
*不是突然完成,则不需要-1,因为getTask方法当中已经-1
*
*下面的代码注释貌似与代码意思相反了
*/
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//生成重入锁
final ReentrantLock mainLock = this.mainLock;
//获取锁
mainLock.lock();
try {
//线程池统计的完成任务数completedTaskCount加上worker当中完成的任务数
completedTaskCount += w.completedTasks;
//从HashSet<Worker>中移除
workers.remove(w);
} finally {
//释放锁
mainLock.unlock();
}
//因为上述操作是释放任务或线程,所以会判断线程池状态,尝试终止线程池
tryTerminate();
//获取线程池的控制状态
int c = ctl.get();
//判断runState是否小鱼STOP,即是RUNNING或者SHUTDOWN
//如果是RUNNING或者SHUTDOWN,代表没有成功终止线程池
if (runStateLessThan(c, STOP)) {
/*
*是否突然完成
*如若不是,代表已经没有任务可获取完成,因为getTask当中是while循环
*/
if (!completedAbruptly) {
/*
*allowCoreThreadTimeOut:是否允许core thread超时,默认false
*min-默认是corePoolSize
*/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//允许core thread超时并且队列不为空
//min为0,即允许core thread超时,这样就不需要维护核心核心线程池了
//如果workQueue不为空,则至少保持一个线程存活
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果workerCount大于min,则表示满足所需,可以直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//如果是突然完成,添加一个空任务的worker线程--这里我也不太理解
addWorker(null, false);
}
}
明显的,在执行任务当中,会去获取任务进行执行,那既然是执行任务,肯定就会有执行完或者出现异常中断执行的时候,那这时候肯定也会有相对应的操作。代码逻辑如下:
- 首先判断线程是否突然终止,如果是突然终止,通过CAS,workerCount-1
- 统计线程池完成任务数,并将worker从workers当中移除
- 判断线程池状态,尝试终止线程池
- 线程池没有成功终止
- 判断是否突然完成任务,不是则进行下一步,是则进行第三步
- 如允许核心线程超时,队列不为空,则至少保证一个线程存活
- 添加一个空任务的worker线程
Worker内部类
我们在上面已经算是挺详细地讲了线程池执行任务execute的执行流程和一些细节,在上面频繁地出现了一个字眼,那就是worker实例,那么这个worker究竟是什么呢?
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** 工作线程,如果工厂失败则为空. */
final Thread thread;
/** 初始化任务,有可能为空 */
Runnable firstTask;
/** 已完成的任务计数 */
volatile long completedTasks;
/**
* 创建并初始化第一个任务,使用线程工厂来创建线程
* 初始化有3步
*1、设置AQS的同步状态为-1,表示该对象需要被唤醒
*2、初始化第一个任务
*3、调用ThreadFactory来使自身创建一个线程,并赋值给worker的成员变量thread
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//重写Runnable的run方法
/** Delegates main run loop to outer runWorker */
public void run() {
//调用ThreadPoolExecutor的runWorker方法
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//代表是否独占锁,0-非独占 1-独占
protected boolean isHeldExclusively() {
return getState() != 0;
}
//重写AQS的tryAcquire方法尝试获取锁
protected boolean tryAcquire(int unused) {
//尝试将AQS的同步状态从0改为1
if (compareAndSetState(0, 1)) {
//如果改变成,则将当前独占模式的线程设置为当前线程并返回true
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//否则返回false
return false;
}
//重写AQS的tryRelease尝试释放锁
protected boolean tryRelease(int unused) {
//设置当前独占模式的线程为null
setExclusiveOwnerThread(null);
//设置AQS同步状态为0
setState(0);
//返回true
return true;
}
//获取锁
public void lock() { acquire(1); }
//尝试获取锁
public boolean tryLock() { return tryAcquire(1); }
//释放锁
public void unlock() { release(1); }
//是否被独占
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
我们可以看到Worker内部类继承AQS同步器并且实现了Runnable接口,所以Worker很明显就是一个可执行任务并且又可以控制中断、起到锁效果的类。