ThreadPoolExecutor类继承体系:
ThreadPoolExecutor -> AbstractExecutorService -> ExecutorService -> Executor
Executors提供工厂方法,用于创建不同类型的线程池
- newFixedThreadPool()
- newSingleThreadExecutor()
- newCachedThreadPool()
- newScheduledThreadPool()
前两种线程池可能队列积压导致OOM,后两种可能创建非常多的线程(Integer.MAX_VALUE),甚至OOM
以下内容借鉴:
【链接】如何优雅的关闭Java线程池
http://url.cn/5DGL2xe
线程的状态
- NEW
- RUNNABLE(RUNNING)
- BLOCKED
- WAITING
- TIMED_WAIT
- TERMINATED
1. 新建(new) :新创建了一个线程对象。
2. 可运行(runnable) :线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取cpu 的使用权 。
3. 运行(running) :可运行状态( runnable) 的线程获得了cpu 时间片(timeslice) ,执行程序代码。
4. 阻塞(block) :阻塞状态是指线程因为某种原因放弃了cpu 使用权,也即让出了cpu timeslice,暂时停止运行。直到线程进入 可运行(runnable) 状态,才有机会再次获得cpu timeslice 转到 运行(running) 状态。阻塞的情况分三种: (一). 等待阻塞: 运行(running) 的线程执行o.wait()方法,JVM会把该线程放入等待队列(waitting queue)中。 (二). 同步阻塞: 运行(running) 的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池(lock pool)中。 (三). 其他阻塞: 运行(running) 的线程执行Thread.sleep(long ms)或t.join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入 可运行(runnable) 状态。
5. 死亡(dead) :线程run()、main() 方法执行结束,或者因异常退出了run()方法,则该线程结束生命周期。死亡的线程不可再次复生。
执行任务
- execute()
- submit()
可简单理解为:前者是不需要返回值,后者需要返回值(Future.get()),虽然都是异步处理。
关闭线程池
- shutdownNow()
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
1. 拒绝接收新提交的任务
2. 同时立刻关闭线程池,池中的任务不再执行
3. 小结:当我们调用线程池的shutdownNow时,如果线程正在getTask方法中执⾏,则会通过for循环进入到if语句,于是getTask 返回null,从而线程退出。不管线程池⾥是否有未完成的任务。如果线程因为执行提交到线程池里的任务而处于阻塞状态,则会导致报错。(如果任务里没有捕获InterruptedException异常),否则线程会执行完当前任务,然后通过getTask方法返回为null来退出。
- shutdown()
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
1. 拒绝接收新提交的任务
2. 同时等待池中的任务执行完毕后关闭线程池
3. 小结:当我们调用线程池的shutdownNow时,如果线程正在getTask方法中执⾏,则会通过for循环进入到if语句,于是getTask 返回null,从而线程退出。不管线程池⾥是否有未完成的任务。如果线程因为执行提交到线程池里的任务而处于阻塞状态,则会导致报错。(如果任务里没有捕获InterruptedException异常),否则线程会执行完当前任务,然后通过getTask方法返回为null来退出。
总结
- shutdownNow(),可能会引起报错,可能会使线程池关闭不了,但不一定。(isTerminated=false)
- shutdown(),要保证任务里不会有永久阻塞等待的逻辑,否则线程池也关闭不了。(isTerminated=false)
TestCase
// 1. 线程正在执行时,shutdownNow不会报错
final ExecutorService pool1 = Executors.newSingleThreadExecutor();
pool1.submit(() -> {
long startTime = System.currentTimeMillis();
System.out.println("enter the thread pool1");
long counter = 0L;
while (counter < Long.MAX_VALUE >> (Integer.SIZE - 4)) {
Object o = new Object();
o = null;
counter++;
}
System.out.println("weak up from sleeping pool1");
System.out.println("leave the thread pool1");
long finishTime = System.currentTimeMillis();
System.out.println("cost time " + (finishTime - startTime));
});
System.out.println("going to shutdown the pool1");
pool1.shutdownNow();
pool1.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("exit the pool1");
boolean flag = pool1.isTerminated();
System.out.println("pool1->" + flag);
System.out.println("=========================");
// 2. 线程阻塞时,shutdownNow将会抛出InterruptedException
final ExecutorService pool2 = Executors.newSingleThreadExecutor();
pool2.submit(() -> {
System.out.println("enter the thread pool2");
try {
// 为什么两种不同进入阻塞状态的实现,最终结果不一致
//Thread.currentThread().wait(100_000);
Thread.sleep(100_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("weak up from sleeping pool2");
System.out.println("leave the thread pool2");
});
System.out.println("going to shutdown the pool2");
pool2.shutdownNow();
pool2.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("exit the pool2");
flag = pool2.isTerminated();
System.out.println("pool2->" + flag);
System.out.println("=========================");
// 3. 线程永久阻塞,shutdown将无法关闭线程池
final ExecutorService pool3 = Executors.newSingleThreadExecutor();
pool3.submit(() -> {
System.out.println("enter the thread pool3");
// for (long l = 0L; l < Long.MAX_VALUE; l++) {
// Object o = new Object();
// o = null;
// }
// try {
// Thread.sleep(1000_000_000); // 模拟永久阻塞
// //Thread.currentThread().wait();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
try {
ServerSocket socket = new ServerSocket(8080);
socket.accept();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("weak up from sleeping pool3");
System.out.println("leave the thread pool3");
});
System.out.println("going to shutdown the pool3");
pool3.shutdownNow();
pool3.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("exit the pool3");
flag = pool3.isTerminated();
System.out.println("pool3->" + flag);