1 概述
线程池即包含一个或者多个线程的集合,类似的还有对象池,数据库连接池,他们本质上都是一样的,只是因为集合里的元素类型不同,所以名字不同而已。之所以要使用线程池这种模式,是因为创建线程是有一定开销的,如果在线程使用频繁且线程生命周期不长的场景(例如Web环境下,一个请求响应的生命周期可能非常短)下,创建线程、销毁线程的开销绝对不容忽视,线程池可以重用线程,当线程处理完任务之后不会直接销毁,而是根据某种策略来决定是应该销毁还是将线程重新放入池中,以此降低线程创建和销毁的开销。
Java里的线程池与工作队列是密切相关的,在工作队列里保存了所有等待执行的任务,当有线程空闲的时候就会从工作队列里取出一个任务,执行任务,执行完毕后返回线程池,等待下一个任务。ThreadPoolExecutor类即线程池的实现类,该类实现了Executor接口,所以属于Executor框架里的一员,Executor框架是一个任务执行的抽象,目的是提供一种将“任务提交”和“任务执行”分离的机制。下面是ThreadPoolExecutor类的继承体系:
2 线程池的使用
在J.U.C包下有一个Executors类,该类包含了很多和线程池有关静态工厂方法,例如newFixedThreadPool,newWorkStealingPool等,通过这些静态工厂方法,我们可以很方便的使用线程池。下面是一个示例:
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; i++) {
executorService.execute(() -> {
System.out.println("hello, world");
});
}
executorService.shutdown();
executorService.awaitTermination(1000, TimeUnit.SECONDS);
}
代码中首先调用Executors.newFixedThreadPool()方法获取了一个ExecutorService实例,这个ExecutorService实例可以说是使用Executor框架的基础,很多操作例如execute(),submit(),shutdown()等都是通过该类的对象来操作的。在调用newFixedThreadPool()的时候,传入了一个参数4,这里代表线程池中有4个线程(但实际上,并不是那么简单,稍后介绍到ThreadPoolExecutor源码的时候,会详细介绍构造函数中各个参数的意义),然后调用了4次execute方法,其实就是提交了4个任务的意思(这里的4只是我随意设的,实际上和newFixedThreadPool的参数没有什么直接关系,完全可以设置成100,意思就是提交100个任务),提交任务之后,Executor框架会根据线程池的配置以及执行策略来执行任务。
在把任务提交完之后(并不意味着任务也执行完了),可以调用shutdown()方法,该方法的功能是让Executor框架停止接受新的任务,最后调用awaitTermination()方法,该方法是一个阻塞方法,会阻塞当前线程,当所有的任务都执行完毕或者设置的时间到期之后,线程会被唤醒,继续执行代码。
这是最基本的线程池使用方法,直接使用Executors类的静态工厂方法获取ExecutorService实例,下面简单介绍一下该类下和线程池有关的几个静态工厂方法:
- newCachedThreadPool,创建一个带有缓冲功能的线程池,线程池的最大容纳量是Integer.MAX_VALUE,但如果没有任务提交的时候,并不会把线程存在线程池里,比较适合线程使用频繁的场景。
- newFixedThreadPool,创建一个固定容量的线程池,线程的最大容量即传入的参数值,在没有任务提交的情况下,线程会被保存在池里,使用场景比较宽泛。
- newScheduledThreadPool,创建一个可以调度的线程池,例如每3s执行一次任务等。
- newSingleThreadExecutor,创建只有一个线程的线程池。
- newSingleThreadScheduledExecutor,创建只有一个线程的线程池,该线程池具有调度的功能。
- newWorkStealingPool,创建一个具有工作窃取功能的线程池,工作窃取是一种高效的模式,例如A线程很忙,B线程很闲,B线程就可以从与A线程绑定的队列末尾取出任务并执行,属于一种非公平的模式,但CPU利用率有很大的提升,不会出现“一核有难,七核围观”的情况。Java7出现的Fork-join框架非常依赖这种模式。
读者朋友可以自己试试这些个方法,体验一下有什么不同,这里我就不再逐一举例说明了。
3 ThreadPoolExecutor类
这一节是本文最核心,最重要的部分,也是最复杂的部分(我写的时候都觉得难以下笔)。主要介绍三个方面:
- ThreadPoolExecutor的构造函数及其各个参数的具体意义。
- ThreadPoolExecutor的几个重要方法源码分析。
- 如何自定义线程池。
3.1 ThreadPoolExecutor的构造函数及其各个参数的具体意义
ThreadPoolExecutor有很多重载的构造函数,但只是参数的个数不同而已,最根本的构造函数是有7个参数的那个重载形式,其他的构造函数最终都会调用这个构造函数,其源码如下所示:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
关键是参数的意义,其内部逻辑无非就是检查参数合法性以及对字段进行赋值而已。下面来逐一看看各个参数的意义:
- corePoolSize,即核心线程数量,也是线程池至少要维护的数量,如果当前运行的线程少于这个值且有新的任务进来,那么就创建一个新的线程直接执行任务。如果当前运行的线程大于这个值,但小于maximumPoolSize,那么该任务会被提交到阻塞队列中,等待被调度,如果阻塞队列满了,才会创建新的线程去执行任务。
- maximumPoolSize,即线程池最大容量,如果当前运行线程数量等于这个值,即已经达到上限了,而且理论上此时队列应该是已经满了,所以如果再有任务提交,那么该任务会被拒绝,拒绝逻辑根据配置的值可能会不同。
- keepAliveTime,线程池维护线程所允许的空闲时间,当线程池中的线程大于corePoolSize时,如果此时没有新的任务提交,核心线程之外的线程也不会立即被销毁,而是等待一段时间后,如果仍然没有任务提交才会销毁。这个时间如果设置的合适,可以大大提高线程池性能,如果不合适,可能会造成性能降低,所以要小心设置该值。
- unit,keepAliveTime设置的时间单位。
- workQueue,工作队列,必须是BlockingQueue的子类,工作队列主要有两种类型,分别是有界队列和无界队列,两种类似各有各的优缺点,适合的场景也不同,需要慎重选择。
- threadFactory,线程工厂,线程都是通过线程工程创建的,该参数非常重要。
- handler,拒绝策略,在介绍maximumPoolSize有提到过如果当前运行的线程数量已经达到maximumPoolSize,新提交的任务会被拒绝,而拒绝的策略就是根据这个参数决定的,JDK默认实现了4中策略,分别是:
- AbortPolicy,直接抛出异常,是默认策略。
- CallerRunsPolicy,用调用者所在线程来执行任务。
- DiscardOldestPolicy,丢弃阻塞队列中最靠前的任务,即最老的任务,然后执行新提交的任务。
- DiscardPolicy,直接丢弃任务,也是一个常用的策略。
大家可以去看看Executors里和线程池有关的方法源码,然后再对照这里的各个参数的意义,可能会有惊喜!
3.2 ThreadPoolExecutor的几个重要方法源码分析
阅读源码最重要的就是找到入口点,找到入口点,然后慢慢的跟着走下去,遇到不懂的方法再进到该方法里去看它的源码,这样就不容易迷失在复杂的源码海洋里了。ThreadPoolExecutor.execute就是其中一个入口点,下面就从该方法开始,慢慢阅读源码,揭开ThreadPoolExecutor的神秘面纱!
下面是execute的源码(由于篇幅限制,我删除了一些注释):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//clt记录着runState和workerCount,ctl是一个AtomicIneger类型的变量
int c = ctl.get();
//workerCountOf()会返回当前活跃的线程数,如果小于corePoolSize就执行内部逻辑
if (workerCountOf(c) < corePoolSize) {
//addWorker()会尝试创建一个Worker并启动和其绑定的线程,command就是任务
//如果成功启动,该方法会返回true,否则返回false
if (addWorker(command, true))
return;
//再次获取ctl值
c = ctl.get();
}
//如果当前线程处于RUNNING状态并且工作队列没满
//如果队列已经满的话,workQueue.offer()会返回false
if (isRunning(c) && workQueue.offer(command)) {
//重新取得ctl值
int recheck = ctl.get();
//如果不是处于运行状态,由于之前的addWorker方法会把任务放入队列里,所以调用remove()方法来移除任务,完事之后调用reject来执行拒绝逻辑
if (! isRunning(recheck) && remove(command))
reject(command);
//如果当前活跃的线程数是0,那么就直接添加任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果能执行到这里,说明线程已经不是RUNNING状态或者工作队列已满(隐含的条件是workerCount >= corePoolSize)。如果此时执行任务失败,就执行拒绝逻辑。
else if (!addWorker(command, false))
reject(command);
}
上图就是execute的逻辑流程图(图是网上找的,在最后我会给出出处)。
下面是addWorker()的源码:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取运行状态
int rs = runStateOf(c);
//SHUTDOWN状态表示不再接受新任务,如果rs >= SHUTDOWN,那就继续后面的判断
//后面的判断有三个:
//1. 如果rs == SHUTDOWN,说明处于不再接受任务的状态
//2. 如果firstTask == null,说明这是空任务
//3. !workQueue.isEmpty(),如果队列不空,该表达式返回true
//如果以上三个条件都成立,记得还有最外层的!符号,即要以上三个条件都不成力并且rs >= SHUTDOWN,这整个if才会成立,最后返回false,表示添加worker失败。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取workcount
int wc = workerCountOf(c);
//如果workcount大于CAPACITY,就直接返回false,表示添加worker失败
//或者,如果当前core参数为true,即表示添加核心线程,那么如果workcount大于corePoolSize,那么就不应该创建新线程,所以会返回false,如果当前core为false,即表示添加非核心线程,如果workcount大于maximumPoolSize,那么也不应该继续创建线程,最后返回false即可。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//使用CAS来增加workerCount,可能需要多次尝试
if (compareAndIncrementWorkerCount(c))
break retry;
//重新的ctl值
c = ctl.get();
//如果此时的ctl值和rs值不相等,说明状态以及改变,然后继续从头开始执行循环
if (runStateOf(c) != rs)
continue retry;
}
}
//线程启动标志
boolean workerStarted = false;
//添加到worker队列的标志
boolean workerAdded = false;
Worker w = null;
try {
//创建一个新的Worker对象
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//加锁
mainLock.lock();
try {
//获取当前的状态
int rs = runStateOf(ctl.get());
//如果小于SHUTDOWN,表示是RUNNING状态,那么就判断t是否是活跃状态,如果是的话,就抛出异常,因为可能由于并发的原因,该线程已经被其他的Worker使用了
//如果现在状态是SHUTDOWN状态且任务是空任务,并且线程处于活跃状态,那么就抛出异常
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
//如果一切正常,就继续执行下面的逻辑
//获取worker集合的大小
int s = workers.size();
//如果大小大于largestPoolSize,就将largestPoolSize设置为s
if (s > largestPoolSize)
largestPoolSize = s;
//能走到这,说明worker已经被添加到worker集合里了
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
//如果worker已经被添加
if (workerAdded) {
//启动线程
t.start();
//线程启动标志设置为true
workerStarted = true;
}
}
} finally {
//如果线程没有启动
if (! workerStarted)
//调用添加addWorkerFailed()的逻辑
addWorkerFailed(w);
}
//最后返回线程启动标志
return workerStarted;
}
上面的两段源码,都用到了worker,那么这个worker是什么呢?下面是Worker类的源码:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
因为涉及到AbstractQueuedSynchronizer,即AQS,这里先不介绍其相关的方法,其实只要知道Worker保存着什么东西,也基本能理解了。Worker还实现了Runnable接口,所以也完全可以作为一个任务提交给线程,实际上,Worker就是对线程的封装,ThreadPool的各个操作的对象主要就是Worker,并不会直接操作Thread。
Worker里两个重要字段是thread和firstTask,即线程实例和具体的任务。thread实例是由ThreadFactory产生的,调用ThreadFactory.newThread()即可获得一个thread实例,该方法接受一个Runnable实例,Worker类在获取thread传入的参数是this,即worker自己本身,从这可以看出,Worker和Thread其实是一个双向绑定的关系,当thead调用start方法的时候,会执行worker里run()逻辑,worker里的run只调用了一个runWorker()方法,该方法会判断各种状态,然后决定采用哪种方式执行任务,例如如果当前的任务为空,那么就尝试从工作队列里取出任务,然后执行。这里我就不带着大家分析runWroker方法以及其他方法了,比较不是专门源码分析的文章,希望大家能自己去看看,肯定能加深对源码的理解。
3.3 自定义线程池
我将要介绍的“自定义线程池”并不是指从头开始写一个线程池,而是指的自定义自己的线程工厂,拒绝策略等参数。如果是从头开始写一个线程池,那无异于“重复造轮子”,而且算是比较复杂的一个项目了,底层要考虑的东西很多,各个线程之间如何协调,如何防止发生死锁,如何防止内存泄露等问题都需要考虑。
我们可以通过继承ThreadPoolExecutor来重写部分方法,因为ThreadPoolExecutor本身设计的时候就是可扩展的,可定制的,也预留了一些方法来让扩展者自行实现,例如beforeExecute()和afterExecute()等方法。同样,ThreadFactory和RejectedExecutionHandler都是我们可以自己实现的。下面是一个示例:
//继承ThreadPoolExecutor,重写方法,最好还是不要重写execute等重要的方法,可能会造成逻辑混乱
public class MyThreadPool extends ThreadPoolExecutor {
public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("before execute");
System.out.println(t + " will execute!");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("after execute");
System.out.println(Thread.currentThread() + "finish execute");
}
}
//实现ThreadFactory接口,重写newThread方法即可
public class MyThreadFactory implements ThreadFactory {
//这里简单粗暴的直接new一个线程
@Override
public Thread newThread(Runnable r) {
Thread newThread = new Thread(r);
newThread.setName("myThread : " + newThread.getId());
return newThread;
}
}
//拒绝策略,实现RejectedExecutionHandler
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
//没做什么事
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("reject " + r);
}
}
//测试类
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService =
new MyThreadPool(1, 1, 10, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),
new MyThreadFactory(), new MyRejectedExecutionHandler());
for (int i = 0; i < 4; i++) {
executorService.execute(() -> {
System.out.println("hello,world");
});
}
executorService.shutdown();
executorService.awaitTermination(1000, TimeUnit.SECONDS);
}
其实很简单,阅读来源码之后,对于为什么可以自定义线程池的配置,应该是非常容易理解了的,就不多说了。
4 小结
本文介绍了线程池的基本概念,其特点、好处以及简单使用,之后还详细分析了ThreadPoolExecutor类的部分源码,希望读者能接着阅读剩余的源码,加深理解,最后还尝试自定义线程池的各项配置,ThreadPoolExecutor本身就是可扩展的,所以这个过程非常简单。
5 参考资料
深入理解 Java 线程池:ThreadPoolExecutor
《Java并发编程实战》