一、线程池简介
作用:
线程池作用就是限制系统中执行线程的数量,重复利用已创建的线程来降低系统开销。
使用线程池的好处:
1、减少在创建和销毁线程上的资源消耗
2、限制系统中执行线程的数量,避免程序创建大量线程而导致系统性能下降甚至崩溃
二、JAVA实现
1、重点类ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, //核心线程的数量
int maximumPoolSize, //最大线程数量
long keepAliveTime, //超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间的单位
BlockingQueue<Runnable> workQueue, //保存待执行任务的队列
ThreadFactory threadFactory, //创建新线程使用的工厂
RejectedExecutionHandler handler // 当任务无法执行时的处理器
)
//目前正在运行的工作线程,Worker实现Runnable接口,继承AbstractQueuedSynchronizer同步队列
private final HashSet<Worker> workers = new HashSet<>();
private final BlockingQueue<Runnable> workQueue; //待执行任务的队列
corePoolSize:核心线程池数量
在线程数少于核心数量时,有新任务进来就新建一个线程,即使有的线程没事干
等超出核心数量后,就不会新建线程了,空闲的线程就得去任务队列里取任务执行了
maximumPoolSize:最大线程数量
包括核心线程池数量 + 核心以外的数量
如果任务队列满了,并且池中线程数小于最大线程数,会再创建新的线程执行任务
keepAliveTime:核心池以外的线程存活时间
如果给线程池设置 allowCoreThreadTimeOut(true),则核心线程在空闲时头上也会响起死亡的倒计时
如果任务是多而容易执行的,可以调大这个参数,那样线程就可以在存活的时间里有更大可能接受新任务
TimeUnit unit 存活时间的单位,枚举
workQueue:保存待执行任务的阻塞队列
线程池中使用的队列是 BlockingQueue 接口,常用的实现有如下几种:
ThreadPoolExecutor的三种队列SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue
ArrayBlockingQueue:基于数组、有界,按 FIFO(先进先出)原则对元素进行排序
LinkedBlockingQueue:基于链表,按FIFO (先进先出) 排序元素
吞吐量通常要高于 ArrayBlockingQueue
Executors.newFixedThreadPool() 使用了这个队列
SynchronousQueue:不存储元素的阻塞队列
每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
吞吐量通常要高于 LinkedBlockingQueue
Executors.newCachedThreadPool使用了这个队列
PriorityBlockingQueue:具有优先级的、无限阻塞队列
threadFactory:每个线程创建的工厂
可以给线程命名,设置个优先级
handler:饱和策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
也可以自己实现
2、主要流程
当currentSize<corePoolSize时,直接启动一个核心线程并执行任务。
当currentSize>=corePoolSize、并且workQueue未满时,添加进来的任务会被安排到workQueue中等待执行。
当workQueue已满,但是currentSize<maximumPoolSize时,会立即开启一个非核心线程来执行任务。
当currentSize>=corePoolSize、workQueue已满、并且currentSize>maximumPoolSize时,调用handler默认抛出RejectExecutionExpection异常。
//ThreadPoolExecutor.execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//1.当前池中线程比核心数少,新建一个线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.核心池已满,但任务队列未满,添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //如果这时被关闭了,拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0) //如果之前的线程已被销毁完,新建一个线程
addWorker(null, false);
}
//3.核心池已满,队列已满,试着创建一个新线程
else if (!addWorker(command, false))
reject(command); //如果创建新线程失败了,说明线程池被关闭或者线程池完全满了,拒绝任务
}
Worker.run -> ThreadPoolExecutor.runWork -> ThreadPoolExecutor.getTask获取阻塞列队中的任务
//ThreadPoolExecutor.getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//如果线程池处于关闭之后或已关闭任务队列为空,则重置工作线程数
decrementWorkerCount();
return null;//返回null任务
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
//如果线程池正在运行,根据是否允许空闲线程等待任务和
//当前工作线程与核心线程池数量比较值,判断是否需要超时等待任务
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
//如果当前工作线程数,小于最大线程数,空闲工作线程不需要超时等待任务,
//则跳出自旋,即在当前工作线程小于最大线程池的情况下,有工作线程可用,
//任务队列为空。
break;
if (compareAndDecrementWorkerCount(c))
//减少工作线程数量失败,返回null
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
//如果与自旋前状态不一致,跳出本次自旋
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//如果非超时则直接take,否则等待keepAliveTime时间,poll任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3、JAVA建议使用的线程池实现类
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池
Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。
newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue;
newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。
另外阿里编码规约中强制要求线程池不允许使用 Executors 去创建,这边做下了解