一、前期知识概要
1、设计模式对象池(资源池)
在我们的日常生活我们听过水池,电池等等,水池了用来存放水,电池用来存放电,而在编程的世界中的池是用来存放一组资源
资源池(resource pool)被认为是一种设计模式,这里的资源主要是指系统资源, 这些资源不专属于某个进程或内部资源。客户端向池请求资源, 并使用返回的资源进行指定的操作。当客户端使用完资源后, 会把资源放回池中而不是释放或丢弃掉。
总结一句话: 需要时,从池中提取,不用时,放回池中
举个栗子: 对象池就想我们公司的仓库,比如我们去公司上班,公司会给我们提供一个工位,行政人员会给我们提供相应的办公设备,那这个时候她首先会看一下库房中,如果库房中有,直接从库房中拿,如果库房中没有,那就会去网上或者商店购买一个新的。如果员工离职了正常情况下会将员工的能用的办公物品放到库房。
2、应用场景
它用在当对象的初始化过程代价较大或者使用频率较高时,比如线程池,数据库连接池等。运用对象池化技术可以显著地提升性能。
二、为什么要使用
创建线程对象不像其他对象一样在JVM分配内存即可,还要调用操作系统内核的API,然后操作系统为线程分配一系列的资源,这个成本就很高了。所以线程是一个重量级对象,应该避免频繁创建和销毁
降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
引用《Java并发编程的艺术》
三、Java线程池的架构设计
1、说明
Java里面线程池的顶级接口是Executor,该类位于
java.util.concurrent
,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。
2、重要类说明
类或者接口 | 说明 |
---|---|
ExecutorService | 真正的线程池接口。 |
ScheduledExecutorService | 定时任务与线程池功能结合使用 |
ThreadPoolExecutor | ExecutorService的默认实现。重点 |
ScheduledThreadPoolExecutor | 周期性任务调度。 |
3、结构图
4、Executor
-
说明
Executor接口只有一个execute方法,执行提交Runnable任务,用来替代通常启动线程的方法
-
方法
execute(Runnable r)
-
举个栗子
/*以前*/ Thread t = new Thread(); t.start(); /*使用线程池*/ Thread t = new Thread(); executor.execute(t)
5、ExecutorService
-
说明
ExecutorService接口继承自Executor接口,真正的线程池核心类。提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。如果需要支持即时关闭,也就是shutDownNow()方法,则任务需要正确处理中断。
-
核心方法
方法名 返回值 说明 submit(Callable task) Future<T>
提交一个可运行的任务执行,并返回一个表示该任务结果 submit(Runable task) Future<T>
提交一个可运行的任务执行,并返回一个表示该任务结果 shutdown() 布尔 阻止新来的任务提交,对已经提交了的任务不会产生任何影响。当已经提交的任务执行完后,它会将那些闲置的线程进行中断,这个过程是异步的 shutdownNow() List<Runable>
设置线程池的状态为STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表 isShutdown() 布尔 检测线程池是否正处于关闭中 isTerminated() 布尔 所有任务在关闭后完成,则返回 true
。awaitTermination() 布尔 定时或者永久等待线程池关闭结束 -
举个栗子
private static int TASK_COUNT = 10; public static void main(String[] args) { /*1. 创建线程池对象 */ ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 8, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());); /*2. 提交任务*/ for (int i = 0; i < TASK_COUNT; i++) { pool.execute(() -> { System.out.println(Thread.currentThread().getName() + ":----->在执行任务"); }); } /*3. 关闭连接池*/ pool.shutdown(); /* 或者 */ // pool.shutdownNow() }
6、ScheduledExecutorService
-
说明
ScheduledExecutorService是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,任务是并发执行,互不影响。
核心方法
-
关系图
-
示例代码
public static void main(String[] args) { // 创建定时任务线程池 ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); //设置日期格式 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 提交一个任务两秒之后开始执行 executorService.schedule(() -> { System.out.println("++++++++++++++++++++子线程:" + df.format(new Date())); }, 2, TimeUnit.SECONDS); System.out.println("主线程: " + df.format(new Date())); // executorService.shutdown(); }
7、工作流程(了解)
四、线程池的状态与生命周期
1、概要
线程池有运行、关闭、停止、清空状态、结束五种状态,结束后就会释放所有资源
- RUNNING(运行): 接受新的任务和处理队列中的任务
- SHUTDOWN(关闭): 不接受新的请求,但会处理已经添加到队列中的任务
- STOP(停止): 不接收新任务,也不处理队列任务,并且中断所有处理中的任务。
- TIDYING(整理):所有任务都被终结,有效线程为0。会触发terminated()方法
- TERMINATED(结束):当terminated()方法执行结束
2、流程图
五、ThreadPoolExecutor
1、线程池的创建
-
构造方法
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
2、参数概要
参数 | 概要 |
---|---|
corePoolSize | 池中所保存的线程数,包括空闲线程。 |
maximumPoolSize | 池中允许的最大线程数。 |
keepAliveTime | 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 |
unit - keepAliveTime | 时间单位。 |
workQueue | 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务。 |
threadFactory | 执行程序创建新线程时使用的工厂 |
handler | 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。 |
3、参数详解
3.1、corePoolSize(必要参数)
核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。
3.2、maximumPoolSize(必要参数)
线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
3.3、keepAliveTime(必要参数)
线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
3.4、unit(必要参数)
指定keepAliveTime参数的时间单位
可选值 说明 TimeUnit.DAYS 天 TimeUnit.HOURS 小时 TimeUnit.MINUTES(常用) 分钟 TimeUnit.SECONDS(常用) 秒 TimeUnit.MILLISECONDS(常用) 毫秒 TimeUnit.MICROSECONDS 微秒(千分之一毫秒) TimeUnit.NANOSECONDS 毫微秒(千分之一微秒)
3.5、workQueue
任务队列。Runnable对象就存储在该参数中
3.6、threadFactory(可选)
线程工厂。用于指定为线程池创建新线程的方式
3.7、handler(可选)
-
说明
拒绝策略。有两种情况会触发拒绝策略
- 队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常
- 当线程池被调用shutdown()后
-
可选值
策略 说明 AbortPolicy 直接抛出异常。默认值 CallerRunsPolicy 只用调用者所在线程来运行任务。 DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务。 DiscardPolicy 不处理,丢弃掉。
4、举个栗子
-
有返回值
public static void start() { /* * 创建线程池,并发量最大为5 * LinkedBlockingDeque,表示执行任务或者放入队列 */ ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); // 存储线程的返回值 List<Future<String>> results = new LinkedList<>(); for (int i = 0; i < 10; i++) { // 调用submit可以获得线程的返回值 int num = i; Future<String> result = executor.submit(() -> num + ""); results.add(result); } //如果不调用,awaitTermination将一直阻塞 executor.shutdown(); //1天,模拟永远等待 try { System.out.println(executor.awaitTermination(1, TimeUnit.DAYS)); } catch (InterruptedException e) { e.printStackTrace(); } //输出结果 for (int i = 0; i < 10; i++) { try { System.out.println(results.get(i).get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
5、corePoolSize、workQueue 、maximumPoolSize的关系
- 默认情况下,线程池在初始的时候,线程数为0。当接收到一个任务时,如果线程池中存活的线程数小于corePoolSize核心线程,则新建一个线程。
- 如果所有运行的核心线程都都在忙,超出核心线程处理的任务,执行器更多地选择把任务放进队列,而不是新建一个线程。
- 如果一个任务提交不了到队列,在不超出最大线程数量情况下,会新建线程。就根据指定的拒绝策略来处理,默认抛出异常。
- 如线程闲置时,线程池会根据keepAliveTime设置的时间回收大于corePoolSize的线程
六、ScheduledThreadPoolExecutor
1、简介
ScheduledThreadPoolExecutor用来执行周期性任务的调度。在这之前的实现需要依靠Timer和TimerTask或者其它第三方工具来完成。它主要有以下两个作用
- 延时执行任务。
- 周期性重复执行任务。
Timer ScheduledThreadPoolExecutor 单线程 多线程 单个任务执行时间影响其他任务调度 多线程,不会影响 基于绝对时间 基于相对时间 一旦执行任务出现异常不会捕获,其他任务得不到执行 多线程,单个任务的执行不会影响其他线程
2、示例代码
- 执行一次
private static final int TASK_COUNT = 3;
public static void main(String[] args) throws InterruptedException {
// 创建大小为2的线程池
ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(2);
for (int i = 0; i < TASK_COUNT; i++) {
// 只执行一次
scheduledThreadPool.schedule(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 5, TimeUnit.SECONDS);
}
// 关闭线程池
scheduledThreadPool.shutdown();
boolean isStop;
// 等待线程池终止
do {
isDone = scheduledThreadPool.awaitTermination(1, TimeUnit.HOURS);
System.out.println("等待任务结束中...");
} while (!isStop);
System.out.println("所有工作完成!!! 线程池关闭");
}
-
周期执行任务
private static final int TASK_COUNT = 3; public static void main(String[] args) throws InterruptedException { // 1. 创建大小为2的线程池 ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(2); // 2. 周期性执行,每2秒执行一次 scheduledThreadPool.scheduleAtFixedRate(() -> { try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } }, 0, 2, TimeUnit.SECONDS); } // 3.关闭线程池 scheduledThreadPool.shutdown(); boolean isStop; // 等待线程池终止 do { isStop = scheduledThreadPool.awaitTermination(1, TimeUnit.HOURS); System.out.println("等待任务结束中..."); } while (!isStop); System.out.println("所有工作完成!!! 线程池关闭"); }
七、任务队列
1、名词解释
1.1、什么叫有界
有界就是有固定大小的队列,无界表示无上限
1.2、什么叫队列
Queue 一个队列就是一个先入先出(FIFO)的数据结构
Queue接口与List、Set同一级别,都是继承了Collection接口。
2、常用队列
2.1、ArrayBlockingQueue
-
作用
采用数组实现的有界阻塞线程安全队列。如果向已满的队列继续塞入元素,将导致当前的线程阻塞。如果向空队列获取元素,那么将导致当前线程阻塞。
-
构造方法
构造方法 参数说明 public ArrayBlockingQueue(int capacity) 构造指定大小的有界队列 public ArrayBlockingQueue(int capacity, boolean fair) 构造指定大小的有界队列,指定为公平或非公平锁 public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) 构造指定大小的有界队列,指定为公平或非公平锁,指定在初始化时加入一个集合 -
示例代码
public class ArrayBlockingQueueExample { public static final int COUNT = 100; public static void main(String[] args) throws Exception { ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10); ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 8, 60, TimeUnit.SECONDS, queue); for (int i = 0; i < COUNT; i++) { TimeUnit.SECONDS.sleep(1); executor.execute(() -> System.out.println("线程池---数组实现的有界阻塞线程安全队列" + Thread.currentThread().getName())); } executor.shutdown(); } }
2.2、LinkedBlockingQueue
-
作用
一个由链表结构组成的有界阻塞队列(也可以当无界阻塞队列)。此队列按 FIFO(先进先出)原则。Executor.newFixedThreadPool()默认队列
-
构造方法
构造方法 参数说明 public LinkedBlockingQueue() 在未指明容量时,容量默认为Integer.MAX_VALUE public LinkedBlockingQueue(int capacity) 构造指定大小的有界队列
2.3、SynchronousQueue;
-
作用
一个不存储元素的有界阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列。
-
构造方法
构造方法 参数说明 public SynchronousQueue() 默认情况下不保证排序, public SynchronousQueue(boolean fair) 如果设置true队列可保证线程以 FIFO 的顺序进行访问 -
示例代码
public class SynchronousQueueExample { public static final int COUNT = 100; public static void main(String[] args) { SynchronousQueue<Runnable> queue = new SynchronousQueue<>(); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, Integer.MAX_VALUE, 1, TimeUnit.SECONDS, queue); for (int i = 0; i < COUNT; i++) { executor.execute(() -> System.out.println("线程池---同步队列" + Thread.currentThread().getName())); } executor.shutdown(); } }
-
分析
假设当前有2个核心线程
此时来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。”,所以A被添加到queue中。
又来了一个任务(B),且核心2个线程还没有忙完。接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。
此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程”,所以必然会新建一个线程来运行这个任务。
但是如果这三个任务都还没完成,继续来了一个任务,queue中无法插入(任务A还在queue中),而线程数达到了maximumPoolSize,所以只好执行异常策略了。
为了避免这种情况:,所以在使用SynchronousQueue通常要求maximumPoolSize是无界的(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。
2.4、PriorityBlockingQueue
-
作用
一种优先级队列,元素并不是以FIFO的方式出/入队。默认大小为11,不可以插入 null 值。当队列满的时候会进行扩容,是真正意义上的无界(仅受内存大小限制),它不像ArrayBlockingQueue那样构造时必须指定最大容量,也不像LinkedBlockingQueue默认最大容量为Integer.MAX_VALUE;
-
构造方法
构造方法 参数说明 PriorityBlockingQueue() PriorityBlockingQueue(int initialCapacity) 指定初始化队列长度 PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) 指定初始化队列长度,自定义比较器 -
示例代码
八、Executors(了解)
1、说明
对于新手来说要配置一个线程池还是比较有难度的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池
2、注意注意注意
3、Executors静态方法
1、newSingleThreadExecutor
-
作用
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。
-
方法
Executors.newSingleThreadExecutor()
-
应用场景
保证所有任务的执行顺序按照任务的提交顺序执行
不适合并发但可能引起IO阻塞性及影响UI线程响应的操作,如数据库操作、文件操作等
2、newFixedThreadExecutor
-
作用
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程
-
方法
Executors.newFixedThreadExecutor()
-
应用场景
控制线程最大并发数
-
举个栗子
3、newCacheThreadExecutor
-
作用
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小
-
方法
Executors.newCacheThreadExecutor()
-
应用场景
适合执行大量、耗时少的任务
-
举个栗子
4、newScheduleThreadExecutor
-
作用
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求
-
方法
Executors.newScheduleThreadExecutor()
-
示例代码
// 1. 创建 定时线程池对象 & 设置线程池线程数量固定为5 ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 2. 创建好Runnable类线程对象 & 需执行的任务 Runnable task =new Runnable(){ public void run() { System.out.println("执行任务啦"); } }; // 3. 向线程池提交任务 scheduledThreadPool.schedule(task, 1, TimeUnit.SECONDS); // 延迟1s后执行任务 scheduledThreadPool.scheduleAtFixedRate(task,10,1000,TimeUnit.MILLISECONDS);// 延迟10ms后、每隔1000ms执行任务
九、面试题
1、shutdown
- shutdown()有什么作用?
阻止新来的任务提交,对已经提交的任务不会产生任何影响 当已经提交的任务执行完成之后,那些闲置的线程会被回收
这个过程是异步的。
- 如何阻止新来的任务提交?
通过将线程池的状态改成SHUTDOWN,当再将执行execute提交任务时,如果测试到状态不为RUNNING,则抛出rejectedExecution,从而达到阻止新任务提交的目的。
- 为何对提交的任务不产生任何影响?
在调用中断任务的方法时,它会检测workers中的任务,如果worker对应的任务没有中断,并且是空闲线程,它才会去中断。另外的话,workQueue中的值,还是按照一定的逻辑顺序不断的往works中进行输送的,这样一来,就可以保证提交的任务按照线程本身的逻辑执行,不受到影响。
2、shutdownNow
-
shutdownNow()有什么功能?
阻止新来的任务提交,同时会中断当前正在运行的线程,即workers中的线程。另外它还将workQueue中的任务给移除,并将这些任务添加到列表中进行返回。
-
如何阻止新来的任务提交?
通过将线程池的状态改成STOP,当再将执行execute提交任务时,如果测试到状态不为RUNNING,则抛出rejectedExecution,从而达到阻止新任务提交的目的.
-
如果我提交的任务代码块中,正在等待某个资源,而这个资源没到,但此时执行shutdownNow(),会出现什么情况?
当执行shutdownNow()方法时,如遇已经激活的任务,并且处于阻塞状态时,shutdownNow()会执行1次中断阻塞的操作,此时对应的线程报InterruptedException,如果后续还要等待某个资源,则按正常逻辑等待某个资源的到达。例如,一个线程正在sleep状态中,此时执行shutdownNow(),它向该线程发起interrupt()请求,而sleep()方法遇到有interrupt()请求时,会抛出InterruptedException(),并继续往下执行。在这里要提醒注意的是,在激活的任务中,如果有多个sleep(),该方法只会中断第一个sleep(),而后面的仍然按照正常的执行逻辑进行。
附: 线程池工具类
public class ThreadPoolUtil {
/**
* 核心线程池大小
*/
private static final int CORE_POOL_SIZE = 20;
/**
* 最大可创建的线程数
*/
private static final int MAX_POOL_SIZE = 100;
/**
* 线程池维护线程所允许的空闲时间
*/
private static final int KEEP_ALIVE_SECONDS = 5;
/**
* 队列最大长度
*/
private static final int QUEUE_CAPACITY = 1000;
public volatile static ExecutorService pool;
/**
* 获取线程池
* @return
*/
public static ExecutorService getPool(){
if(null == pool){
synchronized (ThreadPoolUtil.class) {
if (null == pool) {
pool = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_SECONDS,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(QUEUE_CAPACITY),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
}
return pool;
}
/**
* 获取线程池
* @return
*/
public static ExecutorService getPoolNew(){
if(null == pool){
synchronized (ThreadPoolUtil.class) {
if (null == pool) {
/**
* 1. 缓存线程池
* {@Code Executors.newCachedThreadPool()}
* corePoolSize=0, maxinumPoolSize=+∞,队列长度=0 ,
* 因此线程数量会在corePoolSize到maxinumPoolSize之间一直灵活缓存和变动,
* 且不存在队列等待的情况,一来任务我就创建,用完了会释放。
*
* 2. 定长线程池
* {@Code Executors.newFixedThreadPool()}
* corePoolSize= maxinumPoolSize=构造参数值, 队列长度=+∞。因此不存在线程不够时扩充的情况
*
* 3. 定时器线程池
* {@Code Executors.newScheduledThreadPool()}
* 提交定时任务用的,构造参数里会带定时器的间隔和单位。 其他和FixedThreadPool相同,属于定长线程池。
*
* 4. 单线程池
* {@Code Executors.newScheduledThreadPool()}
* corePoolSize=maxinumPoolSize=1, 队列长度=+∞,只会跑一个任务, 所以其他的任务都会在队列中等待,因此会严格按照FIFO执行
*/
pool = Executors.newFixedThreadPool(50);
}
}
}
return pool;
}
/**
* 执行线程(无返回值)
*/
public static void execute(ThreadExecuteTask threadExecuteTask,Object... objects){
ExecutorService pool = getPool();
pool.execute(() -> {
threadExecuteTask.execute(objects);
});
}
/**
* 执行线程(有返回值)
*/
public static <T> Future<T> submit(ThreadSubmitTask<T> threadSubmitTask,Object... objects){
ExecutorService pool = getPool();
Future<T> future = pool.submit(() -> {
return threadSubmitTask.submit(objects);
});
return future;
}
}
interface ThreadExecuteTask{
/**
* 无返回值方法
* @param objects
*/
void execute(Object... objects);
}
interface ThreadSubmitTask<T>{
/**
* 有返回值方法
* @param objects
* @return
*/
T submit(Object... objects);
}