线程池ThreadPoolExecutor构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize: 核心线程数
- maximumPoolSize: 最大线程数
- keepAliveTime: 非核心线程的最大空闲时间
- unit: keepAliveTime时间单位
- workQueue: 任务队列
- threadFactory: 线程工厂
- handler: 任务拒绝策略
execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 工作线程数 < 核心线程数
if (addWorker(command, true))
// 添加核心线程成功,直接返回
return;
c = ctl.get();
}
// 上面未返回,则说明当前工作线程>=核心线程数,需要创建非核心线程
// ①线程池处于RUNNING状态,并且任务队列没有满,则将任务放入workQueue中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// ②如果当前线程处于非RUNNING或workQueue队列已满,尝试创建非核心线程,成功则直接执行任务,失败则使用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
在调用execute后,会根据当前的线程池状态、工作线程数、任务队列等进行不同的逻辑处理,大致流程图如下,部分异常场景未画出。
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// core标识为此正在被创建的线程是否为核心线程,仅仅只是在数量校验上起到作用,也就是说工作线程中没有所谓的核心线程与非核心线程的区别
return false;
if (compareAndIncrementWorkerCount(c))
// 修改工作数量成功,跳出外层循环
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// 线程池状态变化,继续下一次外层循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Runnable对象作为入参传入Worker中
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁保证线程安全:①workers为HashSet ②涉及到一些int的赋值、判断操作
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 通过ThreadFactory创建的线程,在执行start之前处于alive状态,说明使用者传入的ThreadFactory存在问题
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加工作线程至集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动新创建的工作线程,实际上是在t中执行java.util.concurrent.ThreadPoolExecutor.Worker.run
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker.run 执行任务
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在getTask失败时,则会执行processWorkerExit对线程进行销毁
从任务队列里获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 是否配置了allowCoreThreadTimeOut = true或者工作线程数 > 核心线程数
// ①满足上述条件则使用poll,非阻塞,如果超时未获取到任务,则r = null,此时会remove此worker,即销毁线程
// ②不满足上述条件则使用take,阻塞,直至获取到任务,即r != null
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
任务获取失败则销毁线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// remove worker, 即销毁线程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
小结
线程池创建线程、执行任务、销毁线程的流程是比较简单的。从这块源码的学习中, 改变了我之前的错误观念:"线程池中的线程有区别,分为核心线程和非核心线程"。看了源码之后,并没有所谓核心线程/非核心线程的概念,所谓核心线程数只是一个限制工作线程池的线程数的int值,也就是说,在默认情况下,最后稳定下来的状态,应该是只有corePoolSize数量的线程会存活下来,而这些线程可能并不是最先创建的,并且不一定会一直存活,因为如果工作线程数如果再次超过核心线程数,那么可能在空闲淘汰时,原本稳定存活的线程或被淘汰。
Executors提供的4种线程池
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
- FixedThreadPool: 核心线程数=最大线程数,并且使用无界阻塞队列(队列容量为int最大值),所以线程数固定、keepAliveTime无效,容易因为任务数过多而导致OOM问题
- SingleThreadExecutor: 与FixedThreadPool基本一致,唯一区别是线程数固定为1,容易因为任务数过多而导致OOM问题
- CachedThreadPool: 核心线程数为0,最大线程数为int最大值,任务过多时,会创建大量线程,会消耗大量CPU资源,也可能导致OOM
- ScheduledExecetorService: 适合执行定时任务
Executors中提供的几种线程池都有一个特点,不够灵活,没有将线程池的7个参数都暴露给使用者,所以都存在一定的弊端。所以不推荐使用Executors创建线程池,而是通过new ThreadPoolExecutor自定义参数,手动创建线程池。