一、前言
如果我们平时接触过多线程开发,那肯定对线程池不陌生。在我们原先的学习中,我们了解到,如果我们需要创建多个线程来执行程序,那么只需要简单使用Thread,Runnable或者Callable就可以完成我们所需的功能。
但线程频繁的创建与销毁是需要系统开销的,我们举几个例子来说下使用线程池的优点:
- 例如,创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3,那么如果T1+T3>T2,那么是不是说开启一个线程来执行这个任务太不划算了!这时候使用线程池,线程池缓存线程,可用已有的闲置线程来执行新任务,避免了T1+T3带来的系统开销;
- 使用多线程会占用系统资源,如果同时执行的线程过多,就有可能导致系统资源不足而产生阻塞的情况,而使用线程池能有效的控制线程的最大并发数,避免这种问题的产生;
- 使用线程池还可以进行延迟执行,定时执行等操作;
本文测试使用的JDK版本为JDK 8.0;
二、Java中的线程池实现
Java中的线程池实现是通过Executor
框架体系和工具类Executors
来实现的,其中最核心的类是ThreadPoolExecutor
这个类,我们将围绕着这个类来进行学习。我们先来看一下Executor框架的继承关系:
1. 接口简单介绍
-
-
Executor,线程池框架最基础的任务执行接口,Executor框架中几乎所有类都直接或者间接实现 Executor 接口,该接口提供了一种将
任务提交
与任务执行
分离开来的机制,该接口只有一个方法,用来执行已经提供的线程任务:
void execute(Runnable command);
-
Executor,线程池框架最基础的任务执行接口,Executor框架中几乎所有类都直接或者间接实现 Executor 接口,该接口提供了一种将
-
-
ExcutorService,继承自
Executor
接口,扩展了对任务各种操作的接口,该接口是我们最常用的线程池接口,我们来看一下它的一些方法:
public interface ExecutorService extends Executor { /** * 启动一次有顺序的关闭,之前提交的任务正常执行,新的任务不再执行 */ void shutdown(); /** * 试图停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表 */ List<Runnable> shutdownNow(); /** * 如果执行程序已经停止,则返回false */ boolean isShutdown(); /** * 如果所有任务都在关闭后完成,返回true */ boolean isTerminated(); /** * 调用此方法,在shutdown请求发起后,除非以下任何一种情况发生, 否则当前线程将一直阻塞,直到所有任务执行完成: 1、所有任务执行完成 2、发生超时; 3、当前线程被中断 */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /** * 提交一个有返回值的任务,多个重载方法,其中一个是指定返回值 */ <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); /** * 执行给定的一组任务,返回持有任务执行完成的结果和状态的Future的list, 对于每一个返回的结果,Future.isDone = true 完成的任务可能正常结束或者抛出异常结束, 如果在任务执行过程中参数Collection改变了,那么返回结果是不确定的。 */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; /** * 和上面接口一致,不同的是 如果所有任务执行完成或者超时, 那么对于每一个返回的结果,Future.isDone = true */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; /** * 执行给定的一组任务,只要有一个执行成功就返回结果, 不论正常返回还是异常结束,未执行的任务都会被取消; */ <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; /** * 和上面接口一致,不同的是,该接口在未超时情况下,只要有一个执行成功就返回结果; */ <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
-
ExcutorService,继承自
- AbstractExecutorService,ExecutorService接口的抽象类实现,提供ExecutorService执行方法的默认实现,该类实现了submit、invokeAny和invokeAll方法,并且提供了newTaskFor方法返回一个RunnableFuture对象。
-
- ScheduledExecutorService,ExecutorService的另一个实现,用于延迟或定时执行任务,提供了如下几个方法:
/** * 创建并执行一个在给定的延迟之后的ScheduledFuture */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * 创建并执行一个周期性动作,在给定的初始延迟之后执行,后续按照周期执行 * 也就是先延迟initialDelay后执行,下次是initialDelay+period执行, * 再下次是initialDelay+period * 2,initialDelay+period * 3依次执行; * 如果遇到异常,则会停止执行,否则该任务将仅通过取消或终止执行器终止; */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** * 创建并执行一个周期性动作,在给定的初始延迟之后执行, * 在每一次执行终止和下一次执行开始之间都存在给定的延迟执行; */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
2. ThreadPoolExecutor介绍
ThreadPoolExecutor是Java线程池中最核心的类了,我们学习线程池很大概念上就是学习这个类的使用,我们来看一下它的构造方法,参数及相应的实现。
2.1 构造方法
ThreadPoolExecutor共有4个构造方法,但其实参数一共就7种类型,我们主要就来看一下这具体的7种类型:
// 5个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
// 6个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
// 6个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
// 7个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
2.2 corePoolSize
该线程池中核心线程的最大值。
- 线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程。核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态);
- 如果指定
ThreadPoolExecutor
的allowCoreThreadTimeOut
这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉;
2.3 maximumPoolSize
该线程池中线程总数最大值。
线程总数 = 核心线程数 + 非核心线程数。
2.4 keepAliveTime
该线程池中非核心线程闲置的超时时长。
一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉,而如果设置
allowCoreThreadTimeOut = true
,则会作用于核心线程;
2.5 TimeUnit
keepAliveTime的单位,TimeUnit是一个枚举类型,其实前面已经仔细学习过这个类,这里不多说了:Java线程-Lock学习(五).
2.6 BlockingQueue workQueue
该线程池中的任务队列,用于维护等待执行的Runnable对象。当所有的核心线程都在执行时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。该任务队列决定了线程池的排队策略。常用的workQueue类型有:
- SynchronousQueue,这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现
线程数达到了maximumPoolSize而不能新建线程
的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大;- LinkedBlockingQueue,无界队列,这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize;
- ArrayBlockingQueue,有界队列,可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则添加的新的线程将发生异常:RejectedExecutionException 表示被拒绝策略拒绝了,也就是说线程超出了线程池的总容量;
- DelayQueue,队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务;
2.7 ThreadFactory threadFactory
线程工厂,用来创建线程,通过newThread()方法提供创建线程的功能。通过源码我们知道,通过newThread()方法创建的线程都是非守护线程,并且线程优先级都是Thread.NORM_PRIORITY
。可以通过Executors.defaultThreadFactory()
来创建默认的线程工厂。
可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。
2.8 RejectedExecutionHandler handler
用于抛出异常的,比如说上面的队列发生了异常,可以通过指定该参数来对异常进行抛出处理,有4种可选类型:
- CallerRunsPolicy,不在新线程中执行任务,而是强制由调用者所在的线程来执行任务;
- AbortPolicy,默认处理,直接抛出异常,然后不再执行相应的任务;
- DiscardPolicy,不执行任务,也不抛出异常,也就是忽略这个任务;
- DiscardOldestPolicy,将队列中最前面的那个任务丢弃,然后执行新任务;
3. ThreadPoolExecutor实现及注意事项
3.1 ThreadPoolExecutor的执行策略
上面介绍参数的时候其实已经说到了ThreadPoolExecutor执行的策略,这里再总结一下,当一个任务被添加进线程池时:
- 线程数量未达到corePoolSize,则新建一个线程(核心线程)执行任务;
- 线程数量达到了corePools,则将任务移入队列等待;
- 队列已满,新建线程(非核心线程)执行任务;
- 队列已满,总线程数又达到了maximumPoolSize,就会由上面配置的异常处理RejectedExecutionHandler来抛出异常。
3.2 ThreadPoolExecutor任务提交
前面说了这么多,一直在介绍ThreadPoolExecutor的各个参数,现在我们new了一个线程池后,如何执行线程任务呢?其实有两种方式,Executor.execute()
、ExecutorService.submit()
,submit
方法和execute
方法不同的是它能够返回任务执行的结果Future,来看一个简单的例子:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
for (int i =0 ;i < 4; i++) {
threadPoolExecutor.execute(() -> System.out.println("threadName:" + Thread.currentThread().getName()));
}
Future<Integer> future = threadPoolExecutor.submit(() ->
System.out.println("threadName:" + Thread.currentThread().getName()), 1);
System.out.println(future.get());
threadPoolExecutor.shutdown();
}
4. Executors工具类
Executors工具类是Executor框架的一个工具帮助类,提供了4种创建线程池的方式,这4种方式都是直接或间接通过ThreadPoolExecutor
来实现的,一般情况下我们可以通过该工具类来创建线程池,如果该工具类的几个方法满足不了的情况下,我们可以自定义实现。
4.1 CachedThreadPool方法
创建可缓存的线程池,看源码就知道了:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建线程简单例子:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
该方法创建的线程池线程数量无限制,有空闲线程则复用空闲线程,无空闲线程则创建新的线程;
4.2 FixedThreadPool方法
创建固定数量的线程池,来看下源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建线程简单例子:
ExecutorService executorService= Executors.newFixedThreadPool(10);
ExecutorService executorService= Executors.newFixedThreadPool(10, Executors.defaultThreadFactory());
该方法创建的线程池中的最大线程数固定,超出的线程会进入队列等待;
这里再简单说下无界队列LinkedBlockingQueue的问题,无界队列的队列大小无限制,使用无界队列做为阻塞队列时要尤其当心,因为newFixedThreadPool
采用就是 LinkedBlockingQueue,当任务耗时较长或者QPS很高时可能会导致大量新任务在队列中堆积,有可能会导致cpu和内存飙升服务器挂掉。
这里可参考一个线上问题:一次Java线程池误用引发的血案和总结
4.3 ScheduledThreadPool方法
创建支持延迟和定时的固定数量的线程池,看下源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
可以看到,该方法是通过ScheduledThreadPoolExecutor
的构造方法来实现的,但底层仍然是通过ThreadPoolExecutor
的构造方法来实现的,并且任务队列是DelayQueue
,简单例子:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, Executors.defaultThreadFactory());
4.4 SingleThreadExecutor方法
创建一个只有单个线程的线程池,看下源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
简单例子:
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
通过该方法创建的线程池有且只有一个工作线程执行任务,任务会按照队列的顺序来执行。
4.5 newSingleThreadScheduledExecutor方法
该方法结合了ScheduledThreadPool方法和SingleThreadExecutor方法,就不多说了。
三、总结
其实,JDK7之后还引入了一种采用分治思想的fork/join
框架,该类框架的接口也继承了ExecutorService,所以说也算是一种特殊的线程池,下一章再专门来学习该框架。
- 在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartCoreThread或prestartallcorethread方法;这两个方法是用于预创建线程,也就是即使没有任务来的时候就预先创建corePoolSize或1个线程;
1. ExecutorService关闭方式shutdown和shutdownNow区别
针对shutdown方法而言:
调用该方法后不允许继续往线程池内添加线程,线程池的状态变为SHUTDOWN状态,而所有在调用shutdown()方法之前提交到ExecutorSrvice的任务都会执行,并且一旦所有线程执行任务结束,ExecutorService才会真正关闭;
而shutdownNow方法则是:
调用该方法后,会将线程池的状态变为stop状态,然后试图停止当前正在执行的任务,并返回在等待中没有执行的任务列表;
这里参考自:JAVA线程池shutdown和shutdownNow的区别,而有关这两个方法及awaitTermination方法的使用方面的注意事项,可以参考:[翻译][Java]ExecutorService的正确关闭方法
2. 有关阿里巴巴开发手册规范中的一项内容
在代码中使用Executors创建线程池的时候,idea的阿里规范扫描插件会给出一项警告:
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
- 1) FixedThreadPool 和 SingleThreadPool :
允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。- 2) CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。
所以这里建议自定义实现线程池。
3. 参考文档及文章
首先强烈推荐JDK-API文档,另外,本文还参考了:
海子-Java并发编程:线程池的使用
线程池,这一篇或许就够了
并发编程网-聊聊并发(三)Java线程池的分析和使用