为什么需要线程池
对象复用思想在编程中有很多应用,不论是线程池还是连接池都是一种对象复用的思想。今天来谈谈Java里面的线程池。
Java中创建和销毁一个线程是比较昂贵的操作,需要系统调用。频繁创建和销毁线程会影响系统性能。于是线程池应运而生。
Executor框架
Executor框架是java中的线程池实现。Executor是最顶层的接口定义,它的子类和实现主要包括ExecutorService,ScheduledExecutorService,ThreadPoolExecutor,ScheduledThreadPoolExecutor,ForkJoinPool等。其结构如下图所示:
Executor:Executor是一个接口,其只定义了一个execute()方法:void execute(Runnable command);
,只能提交Runnable形式的任务,不支持提交Callable带有返回值的任务。
ExecutorService:ExecutorService在Executor的基础上加入了线程池的生命周期管理,我们可以通过ExecutorService#shutdown或者ExecutorService#shutdownNow方法来关闭我们的线程池。ExecutorService支持提交Callable形式的任务,提交完Callable任务后我们拿到一个Future,它代表一个异步任务执行的结果。关于shutdown和shutdownNow方法我们需要注意的是:这两个方法是非阻塞的,调用后立即返回,不会等待线程池关闭完成。如果我们需要等待线程池处理完成再返回可以使用ExecutorService#awaitTermination来完成。
shutdown方法会等待线程池中已经运行的任何和阻塞队列中等待执行的任务执行完成,而shutdownNow则不会,shutdownNow方法会尝试中断线程池中已经运行的任务,阻塞队列中等待的任务不会再被执行,阻塞队列中等待执行的任务会作为返回值返回。
ThreadPoolExecutor:是线程池中最核心的类,这里着重说一下这个类的各个构造参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:线程池的核心线程数目,当一个请求进来时如果当前线程池中线程数量小于这个值,则直接通过ThreadFactory新建一个线程来处理这个请求,如果已有线程数量大于等于这个值则将请求放入阻塞队列中。
- maximumPoolSize:线程池的最大线程数目,当线程池数量已经等于corePoolSize并且阻塞队列也已经满了,则看线程数量是否小于maximumPoolSize:如果小于则创建一个线程来处理请求,否则使用“饱和策略”来拒绝这个请求。对于大于corePoolSize部分的线程,称作这部分线程为“idle threads”,这部分线程会有一个最大空闲时间,如果超过这个空闲时间还没有任务进来则将这些空闲线程回收。
- keepAliveTime和unit:这两个参数主要用来控制idle threads的最大空闲时间,超过这个空闲时间空闲线程将被回收。这里有一点需要注意,ThreadPoolExecutor中有一个属性:
private volatile boolean allowCoreThreadTimeOut;
,这个用来指定是否允许核心线程空闲超时回收,默认为false,即不允许核心线程超时回收,核心线程将一直等待新任务。如果设置这个参数为true,核心线程空闲超时后也可以被回收。 - workQueue:阻塞队列,超过corePoolSize部分的请求放入这个阻塞队列中等待执行。阻塞队列分为有界阻塞队列和无界阻塞队列。在创建阻塞队列时如果我们指定了这个队列的“capacity”则这个队列就是有界的,否则是无界的。这里有一点需要注意:使用线程池之前请明确是否真的需要无界阻塞队列,如果阻塞队列是无界的,会导致大量的请求堆积,进而造成内存溢出系统崩溃。
- threadFactory:是一个线程池工厂,主要用来为线程池创建线程,我们可以定制一个ThreadFactory来达到统一命名我们线程池中的线程的目的。
- handler:饱和策略,用来拒绝多余的请求。饱和策略有:CallerRunsPolicy:请求脱离线程池运行(调用者caller线程来运行这个任务);AbortPolicy:抛出RejectedExecutionException异常;DiscardPolicy:丢弃这个任务,即什么也不做;DiscardOldestPolicy:将阻塞队列中等待时间最久的任务删除(即队列头部的任务),将新的任务加入队尾。
ScheduledThreadPoolExecutor:ThreadPoolExecutor子类,它在ThreadPoolExecutor基础上加入了任务定时执行的功能。
Executors
Executors是一个工厂类,主要用来创建ExecutorService,ScheduledExecutorService等线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
这个静态方法用来创建固定线程数目的线程池,可以看到其设置的corePoolSize和maximumPoolSize都是nThreads,其设定的阻塞队列是无界的,也就说多余的请求将一直积压在队列中进行等待,<font color="#FF0000">有可能造成内存溢出</font>。这也是阿里编码规范不推荐用Executors来创建ThreadPoolExecutor的原因。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
这个方法用来创建一个“工作窃取(work stealing)”ForkJoinPool线程池,这里不做展开,后面会介绍ForkJoinPool。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
注意到这个方法创建的核心线程数是0,maximumPoolSize是一个最大值,空闲线程的最大空闲时间是一分钟,阻塞队列是一个SynchronousQueue。这个类其实挺有意思的,SynchronousQueue主要拥有数据交换而不是队列。当SynchronousQueue中有一个元素时,put请求会被阻塞住,直到有消费者从队列中拿走这个元素,同理如果队列为空,消费者也必须阻塞等待有生产者放入新的元素。
注意到这些静态方法可以方便我们创建线程池,但是你必须清楚每个方法创建的线程池的各个参数设置,就像newFixedThreadPool这个方法创建的线程池的阻塞队列是无界的。所以我更倾向推荐大家自己直接用ThreadPoolExecutor来定制自己的线程池。
ForkJoinPool
Doug Lea在JDK7中引入了Fork/Join框架,ForkJoinPool不同于ThreadPoolExecutor,它是一种基于"分治"思想的计算框架。java8的stream API中很多地方都有用到ForkJoinPool。ForkJoinPool中的工作线程会对自己的任务按照一定的粒度进行拆分,一个大任务拆分成多个子任务之后,子任务放入工作队列中等待执行。当一个线程的工作队列为空是可以从其他线程的工作队列中steal任务执行。这也是"work-stealing"的由来。
基本思想
- ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
- 每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是"LIFO"方式,也就是说每次从队尾取出任务来执行。
- 每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
- 在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
- 在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
fork
fork将任务放入任务隶属的工作线程的工作队列中。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
join
- 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
- 查看任务的完成状态,如果已经完成,直接返回结果。
- 如果任务尚未完成,但处于自己的工作队列内,则完成它。
- 如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。
- 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
- 递归地执行第5步。
<div align="center">
<img src="{{ site.url }}/images/posts/java/flowchart-of-join.png" alt="join"/>
</div>
以上就是 fork() 和 join() 的原理,这可以解释 ForkJoinPool 在递归过程中的执行逻辑,但还有一个问题:
最初的任务是 push 到哪个线程的工作队列里的?这就涉及到 submit() 函数的实现方法了。
submit
其实除了前面介绍过的每个工作线程自己拥有的工作队列以外,ForkJoinPool 自身也拥有工作队列,这些工作队列的作用是用来接收由外部线程(非 ForkJoinThread 线程)提交过来的任务,而这些工作队列被称为 submitting queue 。
submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作)。submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。
总结
- Java中线程是一个比较昂贵的对象,线程的频繁创建和销毁会影响性能,因此借助"对象复用"思想产生线程池。由线程池来管理我们的线程。
- 使用Executors创建线程池时要明确创建的阻塞队列是否有界,如果是无界队列则饱和策略将失效,所有请求将一直排队等待被执行,可能会产生内存溢出的风险。最好自己创建ThreadPoolExecutor。
- ForkJoinPool一般配合stream API来使用,它适合做一些CPU密集型任务而不是I/O密集型任务。
- 线程池不要申明为本地变量,如果每次请求都要创建和销毁线程池,那么线程池也就失去了它的意义,应该将线程池申明为全局的。