Integer.SIZE = 32
COUNT_BITS = Integer.SIZE - 3 = 32 - 3 = 29
为了方便做如下假设
Integer.SIZE = 10 则
COUNT_BITS = 10 - 3 = 7
CAPACITY = (1 << COUNT_BITS) - 1
CAPACITY: 0001111111
RUNNING: 1111111111 -1<< 7 => 1110000000
SHUTDOWN: 0000000000 0 << 7 => 0000000000
STOP: 0000000001 1 << 7 => 0010000000
TIDYING: 0000000010 2 << 7 => 0100000000
TERMINATED: 0000000011 3 << 7 => 0110000000
runStateOf 线程池的运行状态,获取高三位
c & ~capacity
1110000000 running
& 1110000000
=
1110000000
workerCountOf 工作线程的容量,获取后7位
c & CAPACITY
1110000000
& 0001111111
=
0001111111
通过上面的内容可以看到高三位作为状态控制位,后面的作为capacity容量大小
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
|: 两个位只要有一个为1,那么结果就是1,否则就为0
ctl
1110000000
| 0000000000
= 1110000000 保留高3位的线程池的运行状态
The main pool control state, ctl, is an atomic integer packing two conceptual fields
workerCount, indicating the effective number of threads
runState, indicating whether running, shutting down etc
ctl有两个作用, workerCount 有效的线程数, runState 运行状态running,shutting,STOP,TIDYING,TERMINATED
状态解释说明
The runState provides the main lifecycle control, taking on values:
RUNNING
: Accept new tasks and process queued tasks
SHUTDOWN
: Don't accept new tasks, but process queued tasks
STOP
: Don't accept new tasks, don't process queued tasks,
and interrupt in-progress tasks
TIDYING
: All tasks have terminated, workerCount is zero,
the thread transitioning to state TIDYING
will run the terminated() hook method
TERMINATED
: terminated() has completed
ThreadPoolExecutor 核心参数
ThreadFactory
线程工厂类,创建工作线程
RejectedExecutionHandler
拒绝策略类, 默认为AbortPolicy
,其他还有CallerRunsPolicy
,DiscardPolicy
,DiscardOldestPolicy
keepAliveTime
线程的空闲时间,超过空闲时间,线程会被回收
allowCoreThreadTimeOut
允许核心线程超时,默认core线程不会被回收,当为true时,核心线程的Idle时间超过keepAliveTime
也会被回收
corePoolSize
核心线程数
maximumPoolSize
最大线程数
线程池的工作原理
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize,
in which case, the task will be rejected.
如果正在运行的线程数少于corePoolSize,Executor添加新线程,而不是排队。
如果大于等于corePoolSize大小的线程数正在运行,Executor对请求进行排队,而不是添加新线程。
如果无法将请求放入队列中,则将创建一个新线程,除非创建的线程数超过了maximumPoolSize大小,在这种情况下,该任务将被拒绝。
ThreadPoolExecutor.execute(Runnable command)执行流程分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//如果当前worker数量小于corePoolSize,尝试直接创建一个worker,创建成功后直接返回
if (addWorker(command, true))
return;
//如果创建失败,重新获取ctl的值
c = ctl.get();
}
//如果当前worker大于等于corePoolSize ,ThreadPoolExecutor状态是运行中,将task添加到queue中
if (isRunning(c) && workQueue.offer(command)) {
//重新检查ctl,
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//当线程池不处于运行中状态,而且task被添加到队列中了,则使用拒绝策略来处理添加的task
reject(command);
else if (workerCountOf(recheck) == 0)
//当当前的worker数量为0,创建一个新的worker,firstTask为null,
addWorker(null, false);
}
//当当前worker数量大于等于corePoolSize,并且queue队列已经满了,则直接创建新的worker
else if (!addWorker(command, false))
//如果线程池不是running,或者当前的worker的数量超过了maximumPoolSize
reject(command);
}
ThreadPoolExecutor.addWorker(Runnable command)执行流程分析
//core: 如果为true,以corePoolSize作为边界,否则以maximumPoolSize作为边界
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//线程池的状态不是running,返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//worker count超过了边界,返回false
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个worker,在worker中会使用ThreadFactory创建一个线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将worker添加到workers Set集合中
workers.add(w);
int s = workers.size();
//记录线程池的最大数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//调用Thread.start方法执行执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker
//Worker继承AQS,所以Worker是线程安全的,
//Worker实现Runnable接口,作为参数传递给getThreadFactory().newThread()方法,当Thread.start()时,会执行Worker的run方法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果firstTask不为null,或者从队列中取到了task
while (task != null || (task = getTask()) != null) {
w.lock();
try {
//钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//task就是我们调用execute(Runnable command)提交的任务
task.run();
} finally {
//钩子方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理worker退出逻辑,主要是将worker从workers移除,中断worker相关的thread,worker count -1
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// allowCoreThreadTimeOut为true或者wc大于corePoolSize核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果worker count大于maximumPoolSize, 并且worker queue为空, 返回null task,
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果timed为true,poll超时获取,如果超时,返回null
//如果timed为false, (allowCoreThreadTimeOut为false, worker count小于corePoolSize),阻塞获取
//从而保证小于corePoolSize的worker不会清理掉
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
核心方法是: addWorker
,runWorker
,getTask
.
通过上面的分析可以回答一下几个问题
- 如果worker count大于corePoolSize,则通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)超时获取task方法,如果返回的是null,则执行processWorkerExit方法做worker的清理工作
2、如果worker count <= corePoolSize并且allowCoreThreadTimeOut为false,通过workQueue.take()方法阻塞获取task,从而不让processWorkerExit方法执行达到worker的保活的目的