1. 线程池的实现原理
下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地
从队列中取任务,这是一个典型的生产者—消费者
模型。
要实现这样一个线程池,有几个问题需要考虑:
- 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有
界的,当队列满了之后,调用方如何处理? - 线程池中的线程个数是固定的,还是动态变化的?
- 每次提交新任务,是放入队列?还是开新线程?
- 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
有3种做法:- 不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制。当队列为空时,线程
池中的线程只能睡眠一会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询。
- 不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制。当队列为空时,线程
- 不使用阻塞队列,但在队列外部、线程池内部实现了阻塞/唤醒机制。
- 使用阻塞队列。(最优)
很显然,
做法3
最完善,既避免了线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡
眠/轮询带来的资源消耗和延迟。 ThreadPoolExector/ScheduledThreadPoolExecutor都是基于阻塞队列来实现的
2. 线程池的类继承体系
其中这两个是核心的类
- ThreadPoolExector
可以执行某个任务 - ScheduledThreadPoolExecutor
可以执行某个任务,还可以周期性地执行任务
3. ThreadPoolExecutor
3.1. 核心数据结构
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 对线程池内部各种变量进行互斥访问控制
private final ReentrantLock mainLock = new ReentrantLock();
// 线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//...
}
每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ...
final Thread thread;
// Worker封装的线程
Runnable firstTask;
// Worker接收到的第1个任务
volatile long completedTasks;
// Worker执行完毕的任务个数
// ...
}
核心配置参数解释
ThreadPoolExecutor在其构造方法中提供了几个核心配置参数,来配置不同策略的线程池。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:在线程池中始终维护的线程个数。
- maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。
- keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
- blockingQueue:线程池所用的队列类型。
- threadFactory:线程创建工厂,可以自定义,有默认值Executors.defaultThreadFactory() 。
- RejectedExecutionHandler:corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒绝策略。
3.3. 线程池的优雅关闭
线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任
务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是
瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。
正确关闭线程池的步骤
关闭线程池的过程为:在调用 shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接
下来需要调用 awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:
// executor.shutdownNow();
executor.shutdown();
try {
boolean flag = true;
do {
flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} while (flag);
} catch (InterruptedException e) {
// ...
}
任务的提交过程分析
任务的执行过程分析
线程池的4种拒绝策略
默认策略
AbortPolicy
- CallerRunsPolicy
策略1:调用者直接在自己的线程里执行,线程池不处理,比如到医院打点滴,医院没地方了,到你家自己操作吧: - AbortPolicy
策略2:线程池抛异常: - DiscardPolicy
策略3:线程池直接丢掉任务,神不知鬼不觉: - DiscardOldestPolicy
策略4:删除队列中最早的任务,将当前任务入队列:
4. Executors工具类
concurrent包提供了Executors工具类,利用它可以创建各种不同类型的线程池。
线程池类型
# 固定大小线程池(newFixedThreadPool),核心线程数是固定的
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
# 单线程线程池(newSigleThradExecutor)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
# 缓存线程池(newCachedThreadPool),根据任务量增加
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
# 定时、周期性线程池(newScheduledThreadPool),适合时间轮,定时任务触发
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
- 固定大小线程池(newFixedThreadPool),核心线程数是固定的
- 缓存线程池(newCachedThreadPool),根据任务量增加
- 单线程线程池(newSigleThradExecutor)
- 定时、周期性线程池(newScheduledThreadPool),适合时间轮,定时任务触发
最佳实践
不同类型的线程池,其实都是由前面的几个关键配置参数配置而成的。
在《阿里巴巴Java开发手册》中,明确禁止使用Executors创建线程池,并要求开发者直接使用ThreadPoolExector或ScheduledThreadPoolExecutor进行创建。这样做是为了强制开发者明确线程池的运行策略,使其对线程池的每个配置参数皆做到心中有数,以规避因使用不当而造成资源耗尽的风险。
5. ScheduledThreadPoolExecutor
- 周期性单线程线程池(newSingleThreadScheduledExecutor)
6. CompletableFuture用法
7. ForkJoinPool
简介
ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合
并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的
Map/Reduce,多个线程并行计算。
相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。
假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余
4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。
利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而
实现任务计算的负载均衡。