1、写在前面
在此记录,以备遗忘
2、核心要点
2.1、为什么需要线程池
2.2、创建线程池的代码实现
2.3、ThreadPoolExecutor参数说明
2.4、ThreadPoolExecutor引发的问题
3、具体操作
3.1、为什么需要线程池
线程池解决了两个不同的问题:
1)提升性能:它们通常在执行大量异步任务时,由于减少了每个任务的调用开销,并且它们提供了一种限制和管理资源(包括线程)的方法,使得性能提升明显;
2)统计信息:每个ThreadPoolExecutor保持一些基本的统计信息,例如完成的任务数量。
3.2、创建线程池的代码实现
1)创建线程有很多种样式,工具类和框架,但是所有的线程池都要从ThreadPoolExecutor开始说起
2)ThreadPoolExecutor有几个构造方法,准确理解里面对应的【参数含义】和【使用场景】
public class TestThreadPool1 {
// 创建一个线程池提交线程任务
public static void main(String[] args) {
// 创建线程池参数
int corePoolSize = 2;
int maximumPoolSize = 4;
long keepAliveTime = 10;
BlockingQueue<Runnable> myQueue = new ArrayBlockingQueue<>(2);
MyTreadFactory myTreadFactory = new MyTreadFactory();
MyRejectPolicy myRejectPolicy = new MyRejectPolicy();
// 创建线程池并执行
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
TimeUnit.SECONDS, myQueue, myTreadFactory, myRejectPolicy);
for (int i = 1; i <= 10; i++) {
Test01ThreadRunnable task = new Test01ThreadRunnable();
threadPoolExecutor.execute(task);
}
}
// 创建一个Runnable执行任务
static class Test01ThreadRunnable implements Runnable {
private int count = 5;
@Override
public void run() {
for (int i = 0; i < 7; i++) {
if (count > 0) {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + "----currentIndex:" + (count--));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
// 创建一个自定义的线程工程类,定义线程的名称和常规属性
static class MyTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
public static class MyRejectPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}
// 可做日志记录等
private void doLog(Runnable r, ThreadPoolExecutor e) {
System.err.println( r.toString() + " rejected"+" ----->completedTaskCount: " + e.getCompletedTaskCount());
}
}
}
3.3、ThreadPoolExecutor参数说明
当前部分参考:线程池之ThreadPoolExecutor概述
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程池大小 |
2 | maximumPoolSize | int | 最大线程池大小 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue<Runnable> | 线程等待队列 |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
3.3.1、corePoolSize和maximumPoolSize对比
线程池执行器将会根据corePoolSize和maximumPoolSize自动地调整线程池大小。
1)taskCount < corePoolSize 即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求
2)corePoolSize < taskCount < maximumPoolSize 仅当队列已满时才会创建新线程【此时和workQueue的容纳数量有关系】
1)当提交任务>corePoolSize时,开始向workQueue中进行放置
2)一般情况corePoolSize+workQueue <= maximumPoolSize,否则workQueue还没有放置满就开始和maximumPoolSize进行比对,执行拒绝策略
3)最初和corePoolSize比较,最后和maximumPoolSize比较
3)taskCount > maximumPoolSize 直接执行拒绝策略
3.3.2、ThreadFactory 线程工厂
目的:使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态,可以在工程类中为线程指定名称。
- 如果未另行指定,则使用Executors.defaultThreadFactory默认工厂,使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。
3.3.3、Keep-alive times 线程存活时间
如果线程池当前拥有超过corePoolSize的线程,那么多余的线程在空闲时间超过keepAliveTime时会被终止 ( 请参阅getKeepAliveTime(TimeUnit) )。这提供了一种在不积极使用线程池时减少资源消耗的方法。
默认情况下,keep-alive策略仅适用于存在超过corePoolSize线程的情况。 但是,只要keepAliveTime值不为零,方法allowCoreThreadTimeOut(boolean)也可用于将此超时策略应用于核心线程。
3.3.4、BlockingQueue 队列【参考corePoolSize和maximumPoolSize对比】
BlockingQueue用于存放提交的任务,队列的实际容量与线程池大小相关联。
1)如果当前线程池任务线程数量小于核心线程池数量,执行器总是优先创建一个任务线程,而不是从线程队列中取一个空闲线程。
2)如果当前线程池任务线程数量大于核心线程池数量,执行器总是优先从线程队列中取一个空闲线程,而不是创建一个任务线程。
3)如果当前线程池任务线程数量大于核心线程池数量,且队列中无空闲任务线程,将会创建一个任务线程,直到超出maximumPoolSize,如果超时maximumPoolSize,则任务将会被拒绝。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
3.3.5、Rejected tasks 拒绝任务
拒绝任务有两种情况:
1) 线程池已经被关闭;
2)任务队列已满且maximumPoolSizes已满;
无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。
预定义了四种处理策略:
- AbortPolicy:默认测策略,抛出RejectedExecutionException运行时异常;
- CallerRunsPolicy:这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;
- DiscardPolicy:直接丢弃新提交的任务;
- DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);
- 我们可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景中。
3.4、ThreadPoolExecutor引发的问题
直接使用ThreadPoolExecutor,要手动设置corePoolSize和maximumPoolSize等参数,由于人为介入可能会导致无限制的创建线程或者没有没有为特定场景设定好线程池的BlockingQueue类型,Executors工具类为我们预配置了几种线程池,我们敦促程序员使用更方便的Executors的工厂方法直接使用。
- Executors.newCachedThreadPool(无界线程池,自动线程回收)
- Executors.newFixedThreadPool(固定大小的线程池);
- Executors.newSingleThreadExecutor(单一后台线程);
具体内容参看:线程池工具类Executors
4、课后习题
4.1、ThreadPoolExecutor全参构造方法有几个值?分别代表什么含义?
参考本文3.2章节
4.2、代码实现【自定义的拒绝策略】
4.3、ThreadPoolExecutor和ExecutorService和Executors之间的关系
1)ThreadPoolExecutor:线程池基础类
2)ExecutorService:线程池拒绝策略静态创建
3)Executors:线程池工具类【Alibaba开发推荐使用该工具类】
4.3、Hook methods 钩子方法
ThreadPoolExecutor为提供了每个任务执行前后提供了钩子方法,重写beforeExecute(Thread,Runnable)和afterExecute(Runnable,Throwable)方法来操纵执行环境; 例如,重新初始化ThreadLocals,收集统计信息或记录日志等。此外,terminated()在Executor完全终止后需要完成后会被调用,可以重写此方法,以执行任殊处理。
注意:如果hook或回调方法抛出异常,内部的任务线程将会失败并结束。
4.4、Queue maintenance 维护队列
getQueue()方法可以访问任务队列,一般用于监控和调试。绝不建议将这个方法用于其他目的。当在大量的队列任务被取消时,remove()和purge()方法可用于回收空间。
4.5、Finalization 关闭
如果程序中不在持有线程池的引用,并且线程池中没有线程时,线程池将会自动关闭。如果您希望确保即使用户忘记调用 shutdown()方法也可以回收未引用的线程池,使未使用线程最终死亡。那么必须通过设置适当的 keep-alive times 并设置allowCoreThreadTimeOut(boolean) 或者 使 corePoolSize下限为0 。
一般情况下,线程池启动后建议手动调用shutdown()关闭。
4.5、如何合理配置线程池的大小
本节来讨论一个比较重要的话题:如何合理配置线程池大小,仅供参考。
一般需要根据任务的类型来配置线程池大小:
- 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
- 如果是IO密集型任务,参考值可以设置为2*NCPU
- 当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。