实际编程中,频繁创建和销毁线程开销很大,所以一般使用线程的方式是线程池。
很方便的,java给我们提供了现成的线程池创建函数ThreadPoolExecutor,这个创建函数也成了不少公司面试必考题,当然,要想彻底理清线程池执行过程,需要剖析源码,这里我们就来仔细分析分析。
首先是线程池创建函数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以把线程池类比成一个小公司,公司有少量正式员工,执行平时的一些工作量。如果工作量太大,正式员工忙不过来,会雇佣部分外包人员。
原则上,如果任务量突然变多,先把任务临时缓存起来,等正式员工有空闲时,交由正式员工由处理(节约成本嘛)。如果缓存队列满了,这时候就要考虑找外包人员了。
公司总预算有限,所以正式员工数量是固定的,且雇佣的外包人员也有最大人数限制。
如果工作量变少,为了节约成本,就要释放部分外包人员。
如果工作量实在太大了,正式员工、外包人员也达到最大预算人数,且所有人都在拼命完成工作任务,这时候,就要拒绝一部分任务了。
接下来这几个参数就好理解了。
- corePoolSize: 核心线程数量,可以类比正式员工数量,常驻线程数量。
- maximumPoolSize: 最大的线程数量,公司最多雇佣员工数量(包含外包人员)。常驻+临时线程数量。
- workQueue:任务等待队列,所有的正式员工都在处理任务,再来任务就先放到队列吧,队列如果也满了,那就要找外包了。
- keepAliveTime:非核心线程空闲时间,就是外包人员等了多久,如果还没有活干,就被解雇了。
- threadFactory: 创建线程的工厂,在这个地方可以统一处理创建的线程的属性。比如每个员工的名字,工号不一致,方便区分和安排任务。
- handler:线程池拒绝策,什么意思呢?就是当任务实在是太多,人也不够,需求池也排满了,还有任务咋办?默认是不处理,抛出异常告诉任务提交者,我这忙不过来了。
我们使用线程池的时候,一般是直接调用execute,提交一个任务,对应到线程池里是怎么处理的呢?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //关键步骤1
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //关键步骤2
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //关键步骤3
reject(command);
}
首先得理解这个ctl是什么意思?
线程池里为了充分利用int型的每一位,使用一个AtomicInteger的ctl来记录线程池中线程的数量及当前线程池的状态。低29bit位表示线程池中线程的数量,高3bit位用来记录线程池的状态是RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED中的一种。
添加任务步骤
获取线程池中线程数量。
-
线程数量小于核心线程数吗。
- 小于核心线程数,添加一个核心线程,添加成功的话直接返回。
- 如果添加核心线程失败,因为是多线程同时执行,再获取一遍线程池中线程数量,继续下一步。
-
线程数量大于等于核心线程数或上面添加核心线程失败。
- 线程池还在运行且添加任务到任务队列成功。
- 重新检查线程池是否还在运行
- 线程池不在运行,且从任务队列删除任务成功,拒绝该任务。
- 线程池在运行,但线程池中没有线程(核心线程数也可以设置成0),添加一个非核心线程。
- 线程池不在运行或添加任务到任务队列失败
- 尝试添加非核心线程去处理该任务。
- 如果添加非核心线程失败,拒绝该任务。
- 重新检查线程池是否还在运行
- 线程池还在运行且添加任务到任务队列成功。
整个过程有点绕,可以对比图形,再理解一遍。
添加任务最关键的函数就是addWorker。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
要理解这个问题,首先要知道java里的标签是怎么用的,有点类似goto的意思,可以参考:https://blog.csdn.net/chanllenge/article/details/90266538
addWorker的核心思想是
- 添加线程时时,区分核心和非核心线程,并可指定该线程的第一个处理任务。
- 判断线程池状态及工作队列,线程数量等参数的合法性。
- 通过CAS自旋方式,增加线程数量。
- 加悲观锁,并结合参数合法性,添加一个线程worker,到线程队列workers。
- 线程添加成功且正常启动,返回true,其他情况,添加线程失败,移除该线程,并线程数量减1,返回false。
线程池中,通过一个set来存储所有的线程。
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
线程添加成功之后,该线程首先会执行指定的第一个处理任务,然后从工作队列的队首依次取任务去执行。
源代码:
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
看到这里,线程池的创建和执行你理解了吗?欢迎留言讨论!