一、简介
一种管理线程的技术,可以缓存线程
(1)复用已经存在的线程,减少线程的创建和销毁
(2)提高响应时间,已有线程可以直接执行任务,无需等待线程创建
(3)方便管理线程数量,防止过多的线程占用太多内存,同时能防止过多的线程切换,提高CPU效率
二、构造函数
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
int corePoolSize: 核心线程数,在不设置 allowCoreThreadTimeOut 为 true 的情况下,核心线程就算没事做也不会被销毁。
int maximumPoolSize : 最大线程数
long keepAliveTime:超时时长,一个非核心线程(设置 allowCoreThreadTimeOut 为 true 也同样作用于核心线程)在处于闲置状态(没事做)超过这个时长就 会被销毁。
TimeUnit unit:时间单位,秒、毫秒、微秒等
BlockingQueue<Runnable> workQueue: 阻塞队列
ThreadFactory threadFactory:线程池工厂,可以自己实现,方便设置线程的名字以及优先级
RejectedExecutionHandler handler:拒绝策略
三、常见的线程池
CachedThreadPool
可缓存线程池
Executors.newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
1. 内部没有核心线程,线程的数量是有没限制的。
2. 在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。
3. 没有工作的线程(闲置状态)在超过了60S没有任务,就会销毁。
FixedThreadPool
定长线程池
Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
1. 最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。
2. 如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。LinkedBlockingQueue默认大小是Integer.MAX_VALUE,可能导致内存占用过多
ScheduledThreadPool
定时任务线程池
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
1. 设置了核心线程数,最大线程数也是 Integer.MAX_VALUE。
2. 这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池。
SingleThreadExecutor
单线程池
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
1、只有一个线程且不回收
2、LinkedBlockingQueue默认大小是Integer.MAX_VALUE,可能导致内存占用过多
四、阻塞队列
常用的几种队列
(1)ArrayBlockingQueue:基于数组实现,固定大小,其构造必须指定大小。其所含的对象是FIFO顺序排序的。
(2)LinkedBlockingQueue:基于链表实现,默认大小Integer.MAX_VALUE,可传入。其所含的对象是FIFO顺序排序的。
(3)PriorityBlockingQueue:类似于LinkedBlockingQueue,但是其所含对象的排序不是FIFO,而是依据对象的自然顺序或者构造函数的Comparator决定。无界队列
(4)SynchronousQueue:不存储元素的阻塞队列,大小为0,每一个put都必须等待take操作,提供任务的转发。
(5) DelayQueue:基于PriorityQueue, 支持延时获取元素, 无界
常用方法
抛异常 | 返回特殊值 | 一直阻塞 | 超时退出 | |
---|---|---|---|---|
插入数据 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除数据 | remove(e) | poll() | take() | poll(time, unit) |
take:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
五、拒绝策略
1、拒绝时机
线程池没有能力处理新任务:最大线程数大于且队列数量到达了最大值
线程池被调用了shutdown关闭之后,还没有完全停止的时候。
2、策略
AbortPolicy:默认的策略,直接抛出RejectedExecutionException ,RuntimeException
DiscardPolicy:默默丢弃任务,不进行任何通知。实际上内部啥也没干
DiscardOldestPolicy:丢弃掉在队列中存在时间最久的任务,然后重新提交任务
CallerRunsPolicy:让提交任务的线程去执行任务,不会丢失业务数据,阻塞了提交任务的线程,减缓提交任务的速度
3、拒绝策略使用
关键业务要自定义拒绝策略,可以记录日志,也可以把拒绝结果返回给用户
六、合理使用线程池
java 开发手册<并发处理>中片段:
[图片上传失败...(image-cdc18e-1600494356170)]
1、自定义线程池时,使用自定义ThreadFactory,指定线程名,方便排查问题
2、分析业务数量级,如果会很大,必须自定义线程池,限定最大线程池,使用有界队列,防止jvm OOM
3、即使限制了最大线程数,过多的线程会导致线程切换频繁,cpu效率地下,也要考虑为别的业务让出cpu
4、核心线程数,看任务,如果时一天执行一次,设置为0,跑完释放内存。如果常用,看任务量
5、拒绝策略要重写,默认的策略AbortPolicy会导致crash
七、线程池的复用原理
1、回顾线程创建
new Thread(new Runnable() {
@Override
public void run() {
}
}).start();
创建一个线程,调用start()启动线程,线程执行完run方法,就进入Dead状态。start()方法只能调用一次
2、线程池提交任务
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Runnable() {
@Override
public void run() {
}
});
3、复用原理
复用线程的本质:提交任务,在任务内死循环执行提交下来的任务
public void run() {
while (true) {
if(tasks available) {
Runnable task = taskqueue.dequeue();
task.run();
} else{
// wait or shutdown
}
}
}
线程创建执行任务流程:
(1) execute提交任务
判断当前工作线程数是否小于核心线程数,需要创建新线程则进入addWorker
(2) addWorker 创建线程
private final class Worker implements Runnable{
final Thread thread;
Runnable firstTask;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
// 把 Worker 作为 thread 运行的任务
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
(3)创建Worker
Worker以提交下来的任务作为构造参数,并创建一个新的线程
w = new Worker(firstTask);
final Thread t = w.thread;
workers.add(w);
(4)runWorker
调用Worker内部变量thread的start()方法启动线程
final Thread t = worker.thread;
t.start();
线程启动后,调用到Worker内部的run方法
public void run() {
runWorker(this);
}
(5)循环执行任务
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} finally {
task = null;
}
}
}
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take()
八、关闭线程池
shutdown() : 调用之后,线程池不是立刻就关闭,等待任务队列中的任务执行完才彻底关闭。同时,还会根据拒绝策略拒绝新提交的任务
isShutdown(): 返回线程池是否已经开始了关闭工作,返回不代表已经彻底关闭。
isTerminated(): 检测线程池是否真正的关闭了
shutdownNow(): 强制关闭线程池,给所有线程发送interrupt中断信号,然后所有的任务转移到另外一个List中。方便调用这做一些补救操作。
参考文档
Java 阻塞队列--BlockingQueue: https://www.cnblogs.com/bjxq-cs88/p/9759571.html
Java 线程池中的线程复用是如何实现:https://www.jianshu.com/p/f84a61917b03
Tomcat 高并发之道原理拆解与性能调优:https://mp.weixin.qq.com/s/4_En4hHeolYyO0RcliGxCQ
Java线程池的拒绝策略:https://www.cnblogs.com/eric-fang/p/11584142.html