Java线程池
线程池测试代码
public static void main(String[] args) {
/**
* 核线程数
* 最大线程数
* 非核心线程最大存活时间
* 非核心线程最大存存活时间单位
* 任务阻塞队列
* 线程工厂
* 任务拒绝策略
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
10,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r);
}
},
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 20; i++) {
int finalI = i;
threadPoolExecutor.execute(()->{
System.out.println(finalI +"XXXXXXXXXXX"+Thread.currentThread().getName());
});
}
}
线程池的执行过程:
线程池的状态切换
核心类ThreadPoolExecutor
系统自带的线程池,通过 ThreadPoolExecutor 实现的。
线程池的状态
//ctl int 类型的数值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// ctl 表述了两个状态:
// 1. 表示线程池当前的状态(高3位)
// 2. 表示线程池当前的工作线程个数(低29位)
// 29: 00000000 00000000 00000000 00011101
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大数量 57:0000000 0000000 0000000 00111010
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池的5种状态: runState存储在高位
// 10000000 00000000 0000000 00000001 左移 29 位 : 11100000 00000000 00000000 00000000
// 111
private static final int RUNNING = -1 << COUNT_BITS;
// 00000000 00000000 0000000 00000000 左移 29 位: 00000000 00000000 00000000 00000000
// 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00000000 00000000 0000000 00000001 左移 29 位 : 00100000 00000000 00000000 00000000
// 001
private static final int STOP = 1 << COUNT_BITS;
// 00000000 00000000 0000000 00000010 左移 29 位 : 01000000 00000000 00000000 00000000
// 010
private static final int TIDYING = 2 << COUNT_BITS;
// 00000000 00000000 00000000 00000011 左移 29 位 :01100000 00000000 00000000 00000000
// 011
private static final int TERMINATED = 3 << COUNT_BITS;
// 计算出当前线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 计算当前线程池的工作线程个数
private static int workerCountOf(int c) { return c & CAPACITY; }
构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize 核心线程数
maximumPoolSize 最大线程数
keepAliveTime 非核心线程等待任务时的超时时间
unit 非核心线程等待任务时的超时时间单位
workQueue 多余任务等待队列(当线程池中所有线程都在工作,新来的任务进入队列)
threadFactory 线程工厂(线程创建)
handler 拒绝策略(阻止执行的处理程序)
执行优先级:核心线程 -> 队列 -> 非核心线程
执行过程execute
public void execute(Runnable command) {
//非空判断
if (command == null)
throw new NullPointerException();
/**
* 分为三步:
* 1.如果运行的线程数少于核心线程数,添加一个核心线程启动该任务
* 2.第一步的条件已不成立,进行第二步:加入阻塞队列进行排队
* 3.如果前两步都不成立:尝试添加一个新线程;如果它失败了,我们知道我们已经关闭或饱和,因此拒绝该任务。
**/
//获取线程池状态
int c = ctl.get();
//工作线程个数 是否小于 核心线程数
if (workerCountOf(c) < corePoolSize) {
//添加核心线程成功,返回
if (addWorker(command, true))
return;
// 如果在并发情况下,添加核心线程失败的线程,重新获取线程池状态
c = ctl.get();
}
// 当线程池为运行状态,将任务件加到工作队列 成功
if (isRunning(c) && workQueue.offer(command)) {
// 添加任务到工作队列成功
// 再次获取 ctl
int recheck = ctl.get();
// 判断线程池是否 RUNNING 状态,如果不是 RUNNING ,需要将任务从工作队列移除
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略(线程池状态不正确,执行拒绝策略)
reject(command);
// 判断工作线程个数是否为 0 (这时工作队列存在任务)
else if (workerCountOf(recheck) == 0)
// 工作线程数为0, 但是工作队列中有任务排队
// 添加一个空任务非核心线程,为了处理在工作队列排队的任务
addWorker(null, false);
}
// 任务添加到工作队列失败,添加非核心线程去执行当前任务
else if (!addWorker(command, false))
//添加非核心线程失败,执行拒绝策略
reject(command);
}
线程池的执行 addWorker
// 添加工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
// ========== 对线程池状态的判断,以及对线程数量的判断 =========
// 外层for循环标识
retry:
for (;;) {
//获取ctl值
int c = ctl.get();
//获取线程池的运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果线程池的状态不是 RUNNING ,就再次做后续判断,查看当前任务是否可以不处理
if (rs >= SHUTDOWN &&
// 线程池状态为 SHUTDOWN ,并且任务为空 , 并且工作队列任务不为空
// 如果满足了这三个要求,那就是要处理工作队列当前任务,否则结束
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
for (;;) {
//基础 ctl 获取当前工作线程数量
int wc = workerCountOf(c);
//判断工作线程是否大于最大值
if (wc >= CAPACITY ||
// 如果是核心线程,是否大于设置的 corePoolSize (核心线程数)
// 如果非核心线程,是否大于设置的 maximumPoolSize(最大线程数)
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 以 CAS 的方式,对工作线程 +1 ,如果成功
if (compareAndIncrementWorkerCount(c))
//直接跳出外层 for 循环
break retry;
//否则:重新获取 ctl
c = ctl.get(); // Re-read ctl
// 基于新获取的 ctl 拿到线程池的状态,判断是否和之前的 rs 状态一致
if (runStateOf(c) != rs)
// 说明并发操作导致线程池的状态变化,需要重新判断状态
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// ========== 添加工作线程,并启动工作线程 =========
//工作线程是否启动了
boolean workerStarted = false;
//工作线程是否添加了
boolean workerAdded = false;
// worker 就是工作线程
Worker w = null;
try {
// new Worker 构建工作线程,将任务扔到了 worker 中
w = new Worker(firstTask);
// 拿到 Worker 中绑定的 Thread 线程
final Thread t = w.thread;
// 肯定不为null(健壮性判断)
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//保持锁定时重新检查。如果ThreadFactory出现故障或在获取锁之前关闭,请退出。
//基于重新获取的 ctl,拿到线程池的状态
int rs = runStateOf(ctl.get());
//如果满足线程池的状态为 RUNNING,就添加工作线程
if (rs < SHUTDOWN ||
// 如果满足线程池的状态为 SHUTDOWN,并且传入的任务为 null
(rs == SHUTDOWN && firstTask == null)) {
//开始添加工作线程
//判断当前线程是否处于 run 状态 (健壮性判断)
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将构建好的 Worker 对象添加到了 workers
workers.add(w);
//获取工作线程个数
int s = workers.size();
//如果现在的工作线程数,大于历史最大的工作线程数,就重新赋值给 largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
//将工作线程添加的标识设置为 true
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
if (workerAdded) {
//添加共组线程成功,启动线程
t.start();
//将工作线程启动标识设置为 true
workerStarted = true;
}
}
} finally {
//如果启动工作线程失败
if (! workerStarted)
// 移除 workers 中的工作线程,将工作线程数 - 1 ,尝试修改线程池的状态为 TIDYING
addWorkerFailed(w);
}
return workerStarted;
}
//启动工作线程失败,做的补救操作
private void addWorkerFailed(Worker w) {
//加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//判断之前创建工作线程是否成功
if (w != null)
//如果成功,就将workers中的当前工作线程移除
workers.remove(w);
//将工作线程数 - 1
decrementWorkerCount();
//尝试修改线程池的状态为 TIDYING
tryTerminate();
} finally {
//释放锁
mainLock.unlock();
}
}
常见的三种线程池的创建
ExecutorService executorService = Executors.newCachedThreadPool();
ExecutorService executorService1 = Executors.newFixedThreadPool(10);
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
newCachedThreadPool
缓存线程池,设置最大线程数,当线程空闲超过60秒,自动销毁线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool
固定线程池,核心线程数等于最大线程数,不存在自动销毁空闲线程。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor
单个线程的线程池,线程池中只存在一个线程。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
为什么不推荐使用自带的线程池?
-
newCachedThreadPool
maximumPoolSize : Integer.MAX_VALUE 任务有多少就需要创建多少 worker 。
创建线程数太多,占用资源,可能造成 CPU 100%;
-
newFixedThreadPool 和 newSingleThreadExecutor
LinkedBlockingQueue 默认大小 Integer.MAX_VALUE
当任务足够多时,都放入了队列,可能造成内存溢出。