如果觉得写的还可以请关注微信公众号:程序猿的日常分享,定期更新分享。
Executors工厂
线程池主要有三类,一个是用于创建立即执行的线程池ThreadPoolExecutor,一个是用于创建执行定时任务类线程池ScheduledThreadPoolExecutor,还有就是在 jdk1.7引入的基于fork/join框架的分治线程池ForkJoinPool。基于这3种线程池实现类,Executors为我们提供了6种可以快速创建的线程池分别是:
1、FixedThreadPool
2、CachedThreadPool
3、ScheduledThreadPool
4、SingleThreadExecutor
5、SingleThreadScheduledExecutor
6、WorkStealingPool
各个线程池解析
FixedThreadPool
第一种线程池是FixedThreadPool,它的核心线程数和最大线程数是一样的,所以可以把它看作是固定线程数的线程池,它的特点是线程池中的线程数除了初始阶段需要从 0 开始增加外,之后的线程数量就是固定的,就算任务数超过线程数,线程池也不会再创建更多的线程来处理任务,而是会把超出线程处理能力的任务放到任务队列中进行等待。而且就算任务队列满了,到了本该继续增加线程数的时候,由于它的最大线程数和核心线程数是一样的,所以也无法再增加新的线程了。创建线程池代码如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
CachedThreadPool
第二种线程池是 CachedThreadPool,可以称作可缓存线程池,它的特点在于线程数是几乎可以无限增加的(实际最大可以达到 Integer.MAX_VALUE,为 2^31-1,这个数非常大,所以基本不可能达到),而当线程闲置时还可以对线程进行回收。也就是说该线程池的线程数量不是固定不变的,当然它也有一个用于存储提交任务的队列,但这个队列是 SynchronousQueue,队列的容量为0,实际不存储任何任务,它只负责对任务进行中转和传递,所以效率比较高。
当我们提交一个任务后,线程池会判断已创建的线程中是否有空闲线程,如果有空闲线程则将任务直接指派给空闲线程,如果没有空闲线程,则新建线程去执行任务,这样就做到了动态地新增线程。创建线程池代码如下:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
ScheduledThreadPool
第三个线程池是 ScheduledThreadPool,它支持定时或周期性执行任务,它通过使用了延时队列来实现了调度的功能。创建代码如下:
//Executors类
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
//ScheduledThreadPoolExecutor类
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
SingleThreadExecutor
第四种线程池是 SingleThreadExecutor,它会使用唯一的线程去执行任务,原理和 FixedThreadPool 是一样的,只不过这里线程只有一个,如果线程在执行任务的过程中发生异常,线程池也会重新创建一个线程来执行后续的任务。这种线程池由于只有一个线程,所以非常适合用于所有任务都需要按被提交的顺序依次执行的场景,而前几种线程池不一定能够保障任务的执行顺序等于被提交的顺序,因为它们是多线程并行执行的。创建代码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
SingleThreadScheduledExecutor
第五个线程池是 SingleThreadScheduledExecutor,它实际和第三种 ScheduledThreadPool 线程池非常相似,它只是 ScheduledThreadPool 的一个特例,内部只有一个线程,创建代码如下:
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
以上5个线程池是jdk1.5就已经提供的创建线程池方法,总结下来他们使用的ThreadPoolExecutor参数主要分为两类,一类使用了最大线程数是Integer.MAX_VALUE,暂且算是无限线程;一类是使用了无界队列。
1、无限线程:CachedThreadPool、ScheduledThreadPool、SingleThreadScheduledExecutor,它们的缺点是如果任务量很大的情况下,不断去创建新的线程,可能会导致服务器资源耗尽。所以生产环境中不推荐使用。
2、无界队列:FixedThreadPool、SingleThreadExecutor,它们的缺点也是在任务量很大的情况下,会不断的往队列中储存任务,导致等待队列无限大,耗尽服务器资源。所以在生产环境中同样不推荐使用。
WorkStealingPool
最后,我们来看下第六种线程池 WorkStealingPool,这个线程池是在 JDK 8 加入的,它使用的是JDK7中新加入的基于Fork/Join框架的线程池ForkJoinPool,主要用法和之前的线程池是相同的,也是把任务交给线程池去执行,线程池中也有任务队列来存放任务。参数parallelism表示并行度,可以理解为线程数,默认使用系统CPU可用的处理器数量作为最大线程数,也可以自定义设置,他的最大值是0x7FFF,也就是32767。
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
但是 ForkJoinPool 线程池和之前的线程池有两点非常大的不同之处:
第一点它非常适合执行可以产生子任务的任务。
如图所示,我们有一个 Task,这个 Task 可以产生三个子任务,三个子任务并行执行完毕后将结果汇总给 Result,比如说主任务需要执行非常繁重的计算任务,我们就可以把计算拆分成三个部分,这三个部分是互不影响相互独立的,这样就可以利用 CPU 的多核优势,并行计算,然后将结果进行汇总。这里面主要涉及两个步骤,第一步是拆分也就是 Fork,第二步是汇总也就是 Join,到这里你应该已经了解到 ForkJoinPool 线程池名字的由来了。
第二点不同之处在于内部结构,之前的线程池所有的线程共用一个队列,但 ForkJoinPool 线程池中每个线程都有自己独立的任务队列。
ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。一旦线程中的任务被 Fork 分裂了,分裂出来的子任务放入线程自己的 deque 里。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。
在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
总结:
ForkJoinPool 线程池和其他线程池很多地方都是一样的,但重点区别在于它每个线程都有一个自己的双端队列来存储分裂出来的子任务。ForkJoinPool 非常适合用于递归的场景,例如树的遍历、最优路径搜索等场景。