0. ThreadPoolExecutor简介
-
ExecutorService
的一种实现类,提供线程池的管理方法
ThreadPoolExecutor
继承了AbstractExecutorService
抽象类,主要提供了线程池生命周期的管理、任务提交的方法。
提交任务:execute
submit
方法
关闭线程池:shutdown
shutdownNow
方法
1. 主要属性介绍
ctl
AtomicInteger ctl
线程池的状态及容量控制,低29位表示容量,高3位表示状态
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
corePoolSize
核心线程数,当前运行线程数小于此数目时直接创建核心线程,大于此数目时会先将任务入队列,入队列失败才会再创建非核心线程,但保证总数目不大于maximumPoolSize
,失败执行reject
方法。
maximumPoolSize
线程池中最多线程数目。
keepAliveTime
线程存活时间,线程数目大于corePoolSize
或者allowCoreThreadTimeOut
为true
时,如有线程在此时间内没有执行任务则会结束线程。
allowCoreThreadTimeOut
是否允许核心线程到时间后结束线程。
workQueue
BlockingQueue
对象,存放待执行任务。
2. 主要方法介绍
构造方法
构造方法有4个,但是最终都是调用最后一个,主要是设置一些属性
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
execute
向线程池提交任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果当前运行的线程数小于corePoolSize,尝试启动一个新线程
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 成功放入队列,重复检查是否需要创建一个新线程
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 不能入队,尝试添加一个新线程,如果失败,拒绝任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//添加一个核心线程
if (addWorker(command, true))
return;
c = ctl.get();
}
//达到corePoolSize,尝试放入等待队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//二次检查,若线程池关闭,移除任务,拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
//若当前没有线程,添加一个非核心线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//放入等待队列失败,尝试添加非核心线程
else if (!addWorker(command, false))
//添加非核心线程失败,拒绝任务
reject(command);
}
addWorker
添加一个工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//检查队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//检查线程容量限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//线程数+1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一个worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将worker放入工作集中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动刚才的线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker
在创建worker对象时,线程参数是worker自身
this.thread = getThreadFactory().newThread(this);
所以启动worker线程时执行的是runWorker
方法
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
getTask
方法负责从workQueue
等待队列中取出待执行任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//getTask表示某个worker的当前任务完成,来取下一个任务,如果线程池已经关闭,则不继续执行,worker数目-1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//数量超出限制或者超时,worker数目-1
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//限时用poll,否则take阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//r为null,超时了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
runWorker
方法执行上面取到的task
final void runWorker(Worker w) {
//获取执行线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果线程池STOP了,但是wt没中断,中断之
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行task
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//worker退出时执行,如果是异常中断,可能会新建一个worker来代替
processWorkerExit(w, completedAbruptly);
}
}
reject
任务提交失败时,拒绝任务
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
ThreadPoolExecutor
提供了4中拒绝策略,分别是
-
CallerRunsPolicy
在调用线程中执行 -
AbortPolicy
丢弃任务,抛出RejectedExecutionException
-
DiscardPolicy
仅丢弃任务 -
DiscardOldestPolicy
丢弃队列中最早的任务,然后添加本任务
shutdown、shutdownNow
关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查权限,确保调用者有关闭权限
checkShutdownAccess();
//将线程池状态设置为shutdown
advanceRunState(SHUTDOWN);
//中断空闲线程
interruptIdleWorkers();
//结束回调
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//状态设置为stop
advanceRunState(STOP);
//中断所有线程
interruptWorkers();
//返回未执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
3. 线程池的使用
通过JUC包内提供的工具类Executors
来创建一个线程池
- 线程数固定的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
- 单线程的线程池,顺序执行任务
其中FinalizableDelegatedExecutorService
是ExecutorService
的另一个实现类,使用了代理模式,其行为全部代理给ThreadPoolExecutor
对象。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
- 缓存线程池
没有核心线程,随用随建,60s内无任务则结束
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
4. 总结
- 通过JUC工具类
Executors
创建线程池 - 通过
execute
submit
向线程池提交任务- 提交
Callable
任务时,submit
会返回Future
对象,可以通过此对象获取结果 -
submit
也是通过execute
方法来提交任务
- 提交
- 提交任务时
- 如果当前线程数小于核心线程数,会创建一个核心线程,即使当前有空闲线程
- 如果大于核心线程数,任务会入队,入队失败的话,会创建一个非核心线程来处理,如果创建失败,则会拒绝任务
- 线程的结束
- 非核心线程在
keepAliveTime
时间内未执行任务则会结束 - 如果
allowCoreThreadTimeOut
为true
,核心线程在keepAliveTime
时间内未执行任务也会结束
- 非核心线程在
- 拒绝任务策略
-
CallerRunsPolicy
在调用线程中执行 -
AbortPolicy
丢弃任务,抛出RejectedExecutionException
-
DiscardPolicy
仅丢弃任务 -
DiscardOldestPolicy
丢弃队列中最早的任务,然后添加本任务
-
- 线程池的结束
-
shutdown
关闭线程池,不再接受新任务,但是会执行完等待队列的任务 -
shutdownNow
关闭线程池,执行完或中断当前运行线程,返回等待队列的任务列表
-
5. 参考
- ThreadPoolExecutor源码build 1.8.0_121-b13版本
- 并发编程3:线程池的使用与执行流程