1. 线程复用
ThreadPoolExecutor是如何实现线程复用的呢?
让我们直接从ThreadPoolExecutor源码中找寻答案。
- ThreadPoolExecutor.execute()为入口
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 当工作线程数小于核心线程数,就创建一个工作线程,核心方法就是addWorker()
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- addWorker()方法
private boolean addWorker(Runnable firstTask, boolean core) {
// 省略上面部分代码
boolean workerStarted = false;
boolean workerAdded = false;
// 创建一个Worker实例,将firstTask传给Worker实例
// 然后,取得Worker里的thread的属性,并在下面运行这个thread
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- Worker类
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
// 将firstTask存入到Worker里面,后面直接从Worker里取得这个任务执行
this.firstTask = firstTask;
// 从线程工厂中创建一个线程
this.thread = getThreadFactory().newThread(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// while循环先执行firstTask,然后getTask()从阻塞队列中取任务来执行
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- getTask()方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 判断从阻塞队列中take()取任务时,如果队列中没有任务了会调用await()阻塞当前线程,这样线程就不会回收了
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
总结
- 在实际执行的线程外部套一个Thread,外部Thread的run()方法来执行while循环来执行任务
- 当Thread的run方法执行完一个任务之后,会循环地从阻塞队列中取任务来执行,这样执行完一个任务之后就不会立即销毁了
- 然后通过从阻塞队列中取任务来执行,在默认不改变allowCoreThreadTimeOut的前提下(默认是false),如果工作线程数大于核心线程数,则通过poll()从队列取任务;如果工作线程数小于核心线程数,则通过take()从队列取任务;这2个方法区别是take()取任务时,如果队列中没有任务了会调用await()阻塞当前线程,这样线程就不会回收了
- 线程的唤醒是在execute时,当调用workQueue.offer()方法,将任务放入阻塞队列时,会调用Condition.signal()方法唤醒一个之前阻塞的线程
2. 线程销毁
核心还是在getTask()方法中
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// wc的工作线程数如果大于核心线程数,timed会返回true,执行下面代码
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed为true,结合BlockingQueue的阻塞超时来实现的,超时没有获取到任务,则返回null,会跳出外层while循环,销毁线程
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3. 为什么用BlockingQueue
- 获取等待任务的时候,直接用阻塞代替通知轮询,提高性能,减少代码复杂度。
- 复用阻塞超时获取等待任务实现线程超时销毁,设计精巧。
- 本身就是支持并发操作的,不用额外维护线程安全。