概述
在之前的一篇博客里谈谈ThreadPoolExecutor的实现已经对ThreadPoolExecutor中的线程如何运行进行了简单的介绍,本文将介绍线程池是如何进行结束的,并对上篇文章遗留问题进行解答。
功能介绍
java中线程池提供了两个关闭方法shutdown和shutdownNow,两个方法的具体使用如下:
//执行该方法,线程处于shutdown状态,线程池不允许再提交任务,但是已提交的任务会继续执行直到结束
void shutdown();
//执行该方法,线程会处于stop状态,线程池会试图停止正在执行的任务,并返回没有执行成功的任务列表
List<Runnable> shutdownNow();
源码分析
我们知道,ThreadPoolExecutor在会将每个线程封装为一个Worker对象,该对象会持有任务并且在执行完其创建时的第一个任务firstTask后,会阻塞的从workQueue中获取任务。前面我们讲到shutdown方法会将已提交的任务执行直到结束,同时会将空闲线程回收(可以先思考下,如何判断线程是否空闲?)。我们进入源码进行分析:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
//全局加锁,保证只会有一个线程内执行该方法
mainLock.lock();
try {
//权限检查,忽略
checkShutdownAccess();
//将线程池状态置为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历每个Worker对象
for (Worker w : workers) {
Thread t = w.thread;
//如果没有被中断,并且持有Worker的互斥锁,说明该woker对象为空闲线程并且没有被中断
if (!t.isInterrupted() && w.tryLock()) {
try {
//中断worker对象持有的线程,因为该线程可能正在阻塞在任务队列中
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
我们可以回到我之前的博客谈谈ThreadPoolExecutor的实现查看Worker类在执行任务之前会首先获取到自身的互斥锁,这样如果获取不到Worker的互斥锁,则说明该worker正在执行任务,这就回答了我们上面的问题,也是为什么Worker类实现AQS的原因(这里并不是为了并发安全,只是为了判断线程是否正在执行任务,下面会有更深刻的认识)。当我们在主线程中中断了worker中持有的线程,woker线程中的runWorker方法会执行最外围的processWorkerExit函数销毁线程,进而完全结束worker的生命周期。
而正在执行的线程在执行完其任务也会因为获取不到任务进入processWorkerExit函数,结束线程生命周期。
下面,我们继续看下shutdownNow方法如何实现的,相比于shutdown方法,该方法简直太狠了直接对所有的线程执行中断,具体代码如下:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//权限检查
checkShutdownAccess();
//将线程池状态置为STOP
advanceRunState(STOP);
//中断所有线程包括
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
//这个函数就比较狠了,无论是否持有锁,只要线程没有被中断,就中断Worker持有的线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//循环遍历中断线程
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
//无论是否持有互斥锁,只有线程没有被中断就执行interrupt
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
使用建议
在日常开发中,我们一般不会去使用shutdownNow,这个方法会导致部分任务无法执行。我们通常会调用shutdown方法使线程池不接受新的任务,然后等正在执行的任务执行完成后再结束。下面我给出一个个人觉得比较优雅的结束线程池的使用示例:
public class ThreadPoolExecutorTest {
private static Logger logger = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
private static ExecutorService executorService = Executors.newFixedThreadPool(3);
public static void main(String []args) {
for (int i = 0; i< 5; i++) {
executorService.submit(new SleepRunnable());
}
executorService.shutdown();//执行该方法相当于通知线程池结束
try {
//阻塞等待线程池结束
executorService.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
//出现异常是强制结束
List<Runnable> notExcuteRunnables = executorService.shutdownNow();
logger.info("awaitTermination exception, notExcuteRunnables = {}", notExcuteRunnables, e);
}
}
//测试任务,简单的sleep 1s
static class SleepRunnable implements Runnable {
@Override
public void run() {
try {
logger.info("sleep in thread = {}", Thread.currentThread().getName());
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.info("exception in sleep", e);
}
}
}
}
原文
袁琼琼的技术博客,欢迎指针
http://yuanqiongqiong.cn/2019/07/15/ThreadPoolExecutor%E5%85%B3%E9%97%AD%E7%BA%BF%E7%A8%8B%E6%B1%A0%E8%AF%A6%E8%A7%A3/