1. 前言
线程池是JAVA开发中最常使用的池化技术之一,可以减少线程资源的重复创建与销毁造成的开销。
2. 灵魂拷问:怎么做到线程重复利用?
很多同学会联想到连接池,理所当然的说:需要的时候从池中取出线程,执行完任务再放回去。
如何用代码实现呢?
此时就会发现,调用线程的start方法后,生命周期就不由父线程直接控制了。线程的run方法执行完成就销毁了,所谓的“取出”和“放回”只不过是想当然的操作。
这里先说答案:生产者消费者模型
3. ThreadPoolExecutor的实现
3.1 结构
首先看下ThreadPoolExecutor的继承结构
顶级接口是Executor,定义execute方法
ExecutorService添加了submit方法,支持返回future获取执行结果,以及线程池运行状态的相关方法
本文着重讲线程池的执行流程,因此将暂时忽略线程池的状态相关的代码,也建议新手看源码时从核心流程看起。
3.2 核心方法:execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 判断是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//添加worker,添加成功则退出
if (addWorker(command, true))
return;
c = ctl.get();
}
// 核心线程数已用完则放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 双重检查,避免入队完成后,所有线程已销毁,导致没有消费者消费当前任务
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 队列已满则开启非核心线程,达到最大线程数则使用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
execute方法就是一个生产的过程,主要分为开启线程和入队
开启线程会传入command(即当前任务),开启的线程会立即消费该任务
入队的任务则会由Worker消费
主要关注corePoolSize,maximumPoolSize,queueSize(任务队列长度),workerCount(当前worker数量)这几个参数,可以总结为以下:
已满 | 未满 | 操作 |
---|---|---|
corePoolSize | 开启核心线程 | |
corePoolSize | queueSize | 入队 |
queueSize | maximumPoolSize | 开启非核心线程 |
maximumPoolSize | 拒绝 |
3.2 消费者:Worker
Worker类实现Runnable接口,继承AQS,主要先关注thread和firstTask两个属性和run方法
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
从Worker的构造方法可以看出,thread就是线程池中真正消费任务的线程,创建时会传入this(即Worker对象),而Worker实现了Runnable,因此线程运行时就是执行了Worker的run方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 重点关注
while (task != null || (task = getTask()) != null) {
// ···
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 重点关注
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 重点关注
task = null;
// ···
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
再来看run方法,直接调用了ThreadPoolExecutor的runWorker方法,runWorker中有一个while循环,循环体执行了task.run方法
task首先会获取Worker中的firstTask属性,并将其置为null,因此firstTask只会执行一次,后续task将通过getTask方法获取。
因此Worker的工作流程可以概括为:消费完Worker的firstTask后,循环执行getTask获取任务并消费,获取不到task时,就退出循环,线程销毁。
此处便可以看出生产者消费者模型了。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
// ···
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
// 此处返回null,runWorker将退出循环
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker方法退出循环的条件是getTask返回null。
观察getTask,只有同时满足以下情况时才会返回null
条件 | 解读 | |||
---|---|---|---|---|
1 | wc > maximumPoolSize | (timed && timedOut) | workQueue.poll方法超时 | |
2 | wc > 1 | workQueue.isEmpty() | 队列任务全部执行完 | |
3 | compareAndDecrementWorkerCount(c) | cas减少workerCount成功 |
返回的task是通过workQueue.poll和workQueue.take得到的
两者执行时线程均会挂起,直至workQueue中有新的任务
不同之处在于poll方法阻塞keepAliveTime时间后会自动唤醒并返回null,此时timeOut置为true,即满足条件1
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// ···
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// ···
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// ···
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// ···
}
return workerStarted;
}
了解了Worker之后,再来看execute中调用的addWorker方法
方法有两个参数,firstTask即为Worker的firstTask,core则标记需要新增的是否是核心线程
retry循环与线程池状态相关,内层for循环则是重复尝试cas增加线程,若大于corePoolSize或者maximumPoolSize则新增失败,cas成功后,new一个Worker并start
3.3 总结
回到最初的问题,线程是如何做到重复利用的?
并不存在取出线程使用完再归还的操作,线程启动后进入循环,主动获取任务执行,退出循环则线程销毁。
execute方法控制新增Worker和任务入队
附:手写简易线程池
public class MyThreadPool implements Executor {
private int corePoolSize;
private int maximumPoolSize;
private BlockingQueue<Runnable> queue;
//记录当前工作线程数
private AtomicInteger count;
private long keepAliveTime;
private RejectHandler rejectHandler;
public MyThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> queue, long keepAliveTime, RejectHandler rejectHandler) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.queue = queue;
this.keepAliveTime = keepAliveTime;
this.rejectHandler = rejectHandler;
count = new AtomicInteger(0);
}
@Override
public void execute(Runnable task) {
int ct = count.get();
//核心线程数未满,尝试增加核心线程
if (ct < corePoolSize && count.compareAndSet(ct, ct + 1)) {
new Worker(task).start();
return;
}
//入队
if (queue.offer(task)) {
return;
}
//队列已满,尝试增加非核心线程
if (ct < maximumPoolSize && count.compareAndSet(ct, ct + 1)) {
new Worker(task).start();
return;
}
//已达最大线程数,拒绝
rejectHandler.reject(task);
}
class Worker extends Thread {
Runnable firstTask;
public Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
@Override
public void run() {
Runnable task = firstTask;
firstTask = null;
while (true) {
try {
//getTask会阻塞
if (task != null || (task = getTask()) != null) {
task.run();
} else {
//getTask超时才会进入,直接退出,线程销毁
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//置空,否则不能getTask
task = null;
}
}
}
}
Runnable getTask() throws InterruptedException {
//标记是否超时过
boolean timedOut = false;
while (true) {
int ct = count.get();
//超出核心线程数才进入超时逻辑,即使timeOut由于线程poll超时过一次变成true,执行到这里如果不超出corePoolSize,可以再次进入take分支
if (ct > corePoolSize) {
//超出核心线程数
if (timedOut) {
//已超时过,尝试减少工作线程数,失败会continue,然后重新比较corePoolSize,重试减少线程数
if (count.compareAndSet(ct, ct - 1)) {
return null;
} else {
continue;
}
}
Runnable task = queue.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (task == null) {
//poll超时才进入
timedOut = true;
continue;
}
return task;
} else {
//必然能获取到task
return queue.take();
}
}
}
public static interface RejectHandler {
void reject(Runnable r);
}
public static void main(String[] args) {
MyThreadPool pool = new MyThreadPool(2, 5, new LinkedBlockingQueue<>(100), 2000, r -> {
System.out.println(r + ": reject");
});
for (int i = 0; i < 3; i++) {
final int x = i;
new Thread(() -> {
for (int j = 0; j < 5; j++) {
final int y = j;
pool.execute(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalDateTime now = LocalDateTime.now();
System.out.println(String.format("线程i=%s, j=%s,执行结束: %s", x, y, now.format(DateTimeFormatter.ISO_DATE_TIME)));
});
}
}).start();
}
}
}