在线程池详解一:线程池概念以及架构中提到了四个主要的快捷创建线程池方法,但是尽管 Executors 的工厂方法使用方便,在生产场景被很多企业的开发规范所禁用。要求通过标准构造器 ThreadPoolExecutor 去构造工作线程池。
1. 核心数据结构
public class ThreadPoolExecutor extends AbstractExecutorService {
//...
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 对线程池内部各种变量进行互斥访问控制
private final ReentrantLock mainLock = new ReentrantLock();
// 线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//...
}
每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ...
// Worker封装的线程
final Thread thread;
// Worker接收到的第1个任务
Runnable firstTask;
// Worker执行完毕的任务个数
volatile long completedTasks;
// ...
}
由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。
这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。
2.核心配置参数解释
上图的各个参数,解释如下:
参数一:corePoolSize:线程池中的常驻核心线程数,在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近视理解为今日当值线程,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放入到缓存队列当中.
参数二:maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值大于等于1
参数三:keepAliveTime:多余的空闲线程存活时间,当空间时间达到keepAliveTime值时,多余的线程会被销毁直到只剩下corePoolSize个线程为止默认情况下:只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用,知道线程中的线程数不大于corepoolSIze,
参数四:unit:keepAliveTime的单位
参数五:workQueue:任务队列,被提交但尚未被执行的任务.
参数六:threadFactory:表示生成线程池中工作线程的线程工厂,用户创建新线程,一般用默认即可
参数七:handler:拒绝策略,表示当线程队列满了并且工作线程大于等于线程池的最大显示 数(maxnumPoolSize)时如何来拒绝.
3.线程池的拒绝策略
handler:拒绝处理任务的策略
AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。(默认这种)
DiscardPolicy:也是丢弃任务,但是不抛出异常
DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
CallerRunsPolicy:由调用线程处理该任务
测试拒绝策略代码如下:
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
3,
5,
1,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
new ThreadPoolExecutor.AbortPolicy()
// new ThreadPoolExecutor.CallerRunsPolicy()
// new ThreadPoolExecutor.DiscardOldestPolicy()
//new ThreadPoolExecutor.DiscardPolicy()
);
for (int i = 0; i < 20; i++) {
int finalI = i;
executor.execute(() -> {
System.out.println(Thread.currentThread().getId() + "[" + finalI + "] -- 开始");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId() + "[" + finalI + "] -- 结束");
});
try {Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); }
}
executor.shutdown();
boolean flag = true;
try {
do {
flag = !executor.awaitTermination(1, TimeUnit.SECONDS);
} while (flag);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程池关闭成功。。。");
System.out.println(Thread.currentThread().getId());
}
}
4. 任务阻塞队列
Java 中的阻塞队列(BlockingQueue)与普通队列相比,有一个重要的特点:在阻塞队列为空时,会阻塞当前线程的元素获取操作。具体来说,在一个线程从一个空的阻塞队列中取元素时,线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒(唤醒过程不需要用户程序干预)。
Java 线程池使用 BlockingQueue 实例暂时接收到的异步任务,BlockingQueue 是 JUC 包的一个超级接口,比较常用的实现类有:
(1)ArrayBlockingQueue:是一个数组实现的有界阻塞队列 (有界队列),队列中元素按 FIFO排序;ArrayBlockingQueue 在创建时必须设置大小,接收的任务超出 corePoolSize 数量时,则任务被缓存到该阻塞队列中,任务缓存的数量只能为创建时设置的大小;若该阻塞队列满,则会为新的任务则创建线程,直到线程池中的线程总数> maximumPoolSize。
(2)LinkedBlockingQueue:是一个基于链表实现的阻塞队列,按 FIFO 排序任务,可以设置容量(有界队列),不设置容量则默认使用 Integer.Max_VALUE 作为容量 (无界队列)。该队列的吞吐量高于 ArrayBlockingQueue。如果不设置 LinkedBlockingQueue 的容量(无界队列),当接收的任务数量超出 corePoolSize数量时,则新任务可以被无限制地缓存到该阻塞队列中,直到资源耗尽。有两个快捷创建线程池的工厂方法 Executors.newSingleThreadExecutor、Executors.newFixedThreadPool,使用了这个队列,并且都没有设置容量(无界队列)。
(3)PriorityBlockingQueue:是具有优先级的无界队列。
(4)DelayQueue:这是一个无界阻塞延迟队列,底层基于 PriorityBlockingQueue 实现的,队列中每个元素都有过期时间,当从队列获取元素(元素出队)时,只有已经过期的元素才会出队,而队列头部的元素是过期最快的元素。快捷工厂方法 Executors.newScheduledThreadPool 所创建的线程池使用此队列。
(5)SynchronousQueue:(同步队列) 是一个不存储元素的阻塞队列,每个插入操作必须等到另 一 个 线 程 的 调 用 移 除 操 作 , 否 则 插 入 操 作 一 直 处 于 阻 塞 状 态 , 其 吞 吐 量 通 常 高 于LinkedBlockingQueue。 快捷工厂方法 Executors.newCachedThreadPool 所创建的线程池使用此队列。与前面的队列相比,这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
5. 线程池的运行原理
1.在创建了线程池之后,等待提交过来的任务请求
2.当调用execute()方法添加一个请求任务的时候,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个程序
2.2 如果正在运行的线程数量大于或者等于corePoolSize,那么将这个任务放入队列
2.3 如果这个时候队列满了并且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻执行这个任务
2.4 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
3. 当一个线程完成任务的时候,它会从队列中取下一个任务来执行
4. 当一个线程无事可做超过一定时间(keepAliveTime)时:线程会判断如果当前运行的线程数大于corePoolSize,那么这个线程就会被停掉。