一、ThreadPoolExecutor线程数执行优先级
上一节中对线程池构造函数中的参数做了简单介绍,提到了核心线程数和最大线程数,以及队列。那么任务在线程池执行是否有优先级呢?
如上图所示,核心线程数corePoolSize有2个,非核心线程数有3个,所以最大线程数maximumPoolSize=2+3=5。workerQueue capacity如图设置为6个。假设现在有12个任务,任务执行时间非常长。任务会按照task_0,task_1被核心线程数处理,接下来task_2——task_7会放入队列中。task_8——task_10被非核心线程数执行。task_11因为线程池已经被占满会被拒绝策略拒绝掉,默认是AbortPolicy抛出异常。
现在有一个问题,非核心线程数是先执行放入队列中的任务,后续新加进来的任务继续放到队列里还是直接跳过队列,按照上面说的直接执行task_8——task_10,通过代码来验证下:
public class ThreadPoolTest {
public static void main(String[] args) {
myTask();
}
public static void myTask() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
5,
5,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(6));
for (int i = 0; i < 12; i++) {
int tmp = i;
threadPoolExecutor.execute(() -> {
try {
System.out.println("--->" + Thread.currentThread().getName() + "--" + tmp);
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
threadPoolExecutor.shutdown();
}
}
******************************************************************
--->pool-1-thread-1--0
--->pool-1-thread-2--1
--->pool-1-thread-3--8
--->pool-1-thread-4--9
--->pool-1-thread-5--10
Exception in thread "main" java.util.concurrent.RejectedExecutionException: 省略
--->pool-1-thread-1--2
--->pool-1-thread-3--3
--->pool-1-thread-4--6
--->pool-1-thread-5--5
--->pool-1-thread-2--4
--->pool-1-thread-1--7
从结果中我们可以看到最后执行的是队列中的任务。这里面涉及到了提交优先级、执行优先级。任务I提交顺序是核心线程数->队列->非核心线程数,执行优先级是核心线程数->非核心线程数->队列。
二、ThreadPoolExecutor运行状态
ThreadPoolExecutor的运行状态有5种,分别为:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的 二进制 位数- 3后的剩余位数所表示的数是线程的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程最大个数(低29位) 00011111111111111111111111111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 高 3 位用来表示线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- 运行(RUNNING):该状态下的线程池接收新任务并处理队列中的任务;线程池创建完毕就处于该状态,也就是正常状态;
- 关机(SHUTDOWN):线程池不接受新任务,但处理队列中的任务;线程池调用shutdown()之后的池状态;
- 停止(STOP):线程池不接受新任务,也不处理队列中的任务,并中断正在执行的任务;线程池调用shutdownNow()之后的池状态;
- 清理(TIDYING):线程池所有任务已经终止,workCount(当前线程数)为0;过渡到清理状态的线程将运行terminated()钩子方法;
- 终止(TERMINATED):terminated()方法结束后的线程池状态;
对于execute源码分析,发现一篇写的非常不错的博客,有兴趣的话可以了解一下:https://blog.csdn.net/weixin_38910645/article/details/109725243
3个if条件中注释可以看到提交优先级线程数->队列->非核心线程数
/**
* 执行任务的方法
*/
public void execute(Runnable command) {
//如果command为空,将抛出NullPointerException
if (command == null)
throw new NullPointerException();
/*
* 处理过程分为以下三个部分:
*
* 1. 如果线程池内的线程数小于corePoolSize,
* 则创建新的核心线程执行任务,参数command将会作为
* 这个新线程的第一个执行的任务。否则进入步骤2.
*
* 2. 如果任务可以成功进入阻塞队列,重新检查线程池的状态。
* 如果线程池已经停止了,则将刚刚入队的任务做出队处理
* 如果线程池内没有线程了,则创建一个新的线程。
*
* 3. 如果任务入队失败,则创建一个新的非核心线程。
* 如果创建新非核心线程失败了,则线程池可能已经关闭或饱和
* 因此需要拒绝这个任务
*/
//获取ctl对应的int值
int c = ctl.get();
//workerCountOf方法获取工作线程数
//如果工作线程数小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
//addWorker方法创建新的线程
//参数core:true-核心线程,false-非核心线程
//如果addWorker方法创建新的核心线程成功
if (addWorker(command, true))
//创建线程成功后,方法结束
return;
//如果addWorker方法创建新的核心线程失败
//重新获取ctl的int值(因为线程池的状态已经被修改)
c = ctl.get();
}
//isRunning方法判断线程池处于RUNNING状态
//offer方法将任务入队
//如果线程池处于RUNNING状态且入队成功
if (isRunning(c) && workQueue.offer(command)) {
//再次获取ctl的int值
//任务入队的过程中,线程池状态可能已经被修改
int recheck = ctl.get();
//如果线程池并非处于RUNNING状态
//并且成功删除刚刚入队的任务
if (! isRunning(recheck) && remove(command))
//则执行reject方法,拒绝此次提交的任务
reject(command);
//如果工作线程数为0
else if (workerCountOf(recheck) == 0)
//进入本分支有几种情况:
//1.线程池处于RUNNING状态,但工作线程数为0
//2.线程池处于非RUNNING状态,但是任务从阻塞队列删除失败,
//此时工作线程数为0
//创建新的非核心线程
addWorker(null, false);
}
//线程池处于非RUNNING状态
//或者线程池处于RUNNING状态,但任务入阻塞队列失败
//尝试通过addWorker方法创建一个非核心线程
//如果addWorker方法创建非核心线程成功,则方法结束
//如果addWorker方法创建非核心线程失败,则执行reject方法
else if (!addWorker(command, false))
reject(command);
}
/**
* 创建新的工作线程到线程池中
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//外层循环标记
retry:
//自旋开始
for (;;) {
//获取ctl对应的int值
int c = ctl.get();
//获取线程池的运行状态
int rs = runStateOf(c);
//线程池的状态有以下几种:
//SHUTDOWN/STOP/TIDYING/TERMINATED
//或者线程池的状态不为SHUTDOWN
//或者fiestTask不等于空
//或者队列为空
//返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//添加工作线程失败
return false;
//自旋
for (;;) {
//获取工作线程数
int wc = workerCountOf(c);
//如果工作线程数大于CAPACITY
//或者创建的是核心线程且工作线程数大于等于corePoolSize
//或者创建的是非核心线程且工作线程数大于等于maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//添加工作线程失败
return false;
//CAS自增工作线程数
//如果成功则跳出最外层循环
if (compareAndIncrementWorkerCount(c))
break retry;
//重新读取ctl的值
c = ctl.get(); // Re-read ctl
//如果内层循环获取到的线程池工作状态
//不等于外层循环的工作状态
if (runStateOf(c) != rs)
//进入下一次内层循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//标记工作线程是否被启动
boolean workerStarted = false;
//标记工作线程是否被添加成功
boolean workerAdded = false;
//工作线程
Worker w = null;
try {
//创建一个新的工作线程
w = new Worker(firstTask);※※※※※※※※※※※※
//获取工作线程的thread属性
final Thread t = w.thread;
//如果t非空
if (t != null) {
//加可重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池的工作状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//线程状态检验
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//工作线程的HashSet中添加此工作线程
workers.add(w);
//工作线程数
int s = workers.size();
//如果工作线程数大于largestPoolSize
if (s > largestPoolSize)
//设置largestPoolSize为s
largestPoolSize = s;
//设置workerAdded为true
workerAdded = true;
}
} finally {
//解锁
mainLock.unlock();
}
if (workerAdded) {
//启动线程
t.start(); ※※※※※※※※※※※※※※※※※※※※※※※※
//设置workerStarted为true
workerStarted = true;
}
}
} finally {
//如果workerStarted为false即工作线程启动失败
if (! workerStarted)
//删除此工作线程
addWorkerFailed(w);
}
//返回工作线程启动结果workerStarted
return workerStarted;
}
在addWorker源码中,重点关注new Work(),Work是ThreadPoolExecutor的私有内部类实现了Runnable接口,runWorker()方法中,可以看到执行优先级为什么是核心线程数->非核心线程数->队列。
final void runWorker(Worker w) {
···
try {
while (task != null || (task = getTask()) != null) {
w.lock();
}
····
}