Executor接口
如果查看jdk文档,会发现java线程池都源自于这个超级接口Executor,但是这个接口本身比较简单:
public interface Executor {
/**
在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,
这由 Executor 实现决定。
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution.
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
可以看到Executor 中只有一个execute 方法。此接口提供一种将任务提交与每个任务将如何运行的机制分离开来的方法,相比较为每个人物调用new Thread(Runnable r).start() ,我们更偏向于使用Executor (执行器)来运行任务:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
实现一个执行器也很简单:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
ExecutorService接口
Executor 提供的方法太少了!根本不能满足日常所需,而从它派生下来的接口ExecutorService 则显得更通用,毕竟它也是个Service。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
...
}
可以看到,ExecutorService 接口中包含了我们平常使用的线程池的绝大多数方法,其中的一些方法在上文已经介绍过了。
AbstractExecutorService
AbstractExecutorService是一个抽象类,并且实现了ExecutorService接口。
public abstract class AbstractExecutorService implements ExecutorService
在这个类中,提供了ExecutorService 一些方法的默认实现,比如submit ,invokeAll ,首先看submit 的实现:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
其中使用了newTaskFor 方法:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
newTaskFor 方法只是简单的将给定可调用任务包装成一个RunnableFuture ,使其具有取消运行的特性。而submit 中直接将任务交给execute() 运行.
再来看invokeAll() :
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
//创建一个list保存所有的结果
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f); //运行任务
}
for (Future<T> f : futures) {
if (!f.isDone()) { //依次取结果
try {
f.get(); //这里使用get是为了等待运行完成,如果没完成就会阻塞
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done) //如果发生异常,则取消所有任务
for (Future<T> f : futures)
f.cancel(true);
}
}
ThreadPoolExecutor简单介绍
ThreadPoolExecutor,线程池类,继承自 AbstractExecutorService
public class ThreadPoolExecutor extends AbstractExecutorService
构造方法
ThreadPoolExecutor 提供了四种构造方法实现(这里只介绍一种):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
有必要对每个参数解释一下:
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
配置规则
为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。jdk文档中建议在通常情况下,使用 Executors 提供的工厂方法配置,也就是提供好了的线程池。若非要手动配置,需要遵循以下规则:
核心和最大池大小
ThreadPoolExecutor 将根据 corePoolSize 和 maximumPoolSize 设置的边界自动调整池大小。当新任务在方法execute(java.lang.Runnable) 中提交时:
- 运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
- 运行的线程多于 corePoolSize 而少于 maximumPoolSize,则把任务放进队列,由空闲线程从队列中取任务,仅当队列满时才创建新线程。
- 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
- 如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE ),则允许池适应任意数量的并发任务。
还要注意以下两点:
在大多数情况下,核心和最大池大小仅基于构造器来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。
当池中的线程数大于 corePoolSize 的时候,多余的线程会等待 keepAliveTime 长的时间,如果无请求可处理就自行销毁。
创建新线程
使用 ThreadFactory 创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
ThreadFactory 是线程工厂,它是一个接口:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
ThreadPoolExecutor 中的 threadFactory 是由 Executors 工具类提供的:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
//// 创建的线程以“pool-N-thread-M”命名,N是该工厂的序号,M是线程号
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
// 设为非后台线程
if (t.isDaemon())
t.setDaemon(false);
// 优先级为normal
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
DefaultThreadFactory 是一个静态内部类
排队策略
前面说到,当线程池中运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程,将任务加入队列有三种策略(具体参见jdk文档)。
被拒绝的任务
两种情况下,新提交的任务将会被拒绝:
- 当 Executor 已经关闭
- Executor 将有限边界用于最大线程和工作队列容量,且已经饱和
被拒绝的任务, execute 方法都将调用其 RejectedExecutionHandler 的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略:
- 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
- 在 ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
- 在 ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。
- 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
定义和使用其他种类的 RejectedExecutionHandler 类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。
钩子方法
此类提供两个 protected 可重写的 钩子方法:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
这两种方法分别在执行 每个任务 之前和之后调用。它们可用于操纵执行环境;注意这里是每个任务,即每次运行新任务时都会执行一遍。例如,重新初始化 ThreadLocal 、搜集统计信息或添加日志条目。此外,还可以重写方法 terminated() 来执行 Executor 完全终止后需要完成的所有特殊处理。
如果钩子 (hook) 或回调方法抛出异常,则内部辅助线程将依次失败并突然终止。
jdk文档中提供了一个可以暂停和恢复的线程池例子:
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused)
unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}
ThreadPoolExecutor运行原理
ThreadPoolExecutor深入剖析
线程池的五种状态
ThreadPoolExecutor 类中将线程状态( runState)分为了以下五种:
- RUNNING:可以接受新任务并且处理进入队列中的任务
- SHUTDOWN:不接受新任务,但是仍然执行队列中的任务
- STOP:不接受新任务也不执行队列中的任务
- TIDYING:所有任务中止,队列为空,进入该状态下的任务会执行 terminated()方法
- TERMINATED: terminated()方法执行完成后进入该状态
状态之间的转换
1.RUNNING -> SHUTDOWN
调用了 shutdown()方法,可能是在 finalize()方法中被隐式调用
2.(RUNNING or SHUTDOWN) -> STOP
调用 shutdownNow()
3.SHUTDOWN -> TIDYING
当队列和线程池都为空时
4.STOP -> TIDYING
线程池为空时
5.TIDYING -> TERMINATED
terminated()方法执行完成
线程池状态实现
如果查看 ThreadPoolExecutor的源码,会发现开头定义了这几个变量来代表线程状态和活动线程的数量:
//原子变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
这个类中将二进制数分为了两部分,高位代表线程池状态( runState),低位代表活动线程数( workerCount), CAPACITY代表最大的活动线程数,为2^29-1,下面为了更直观的看到这些数我做了些打印:
public class Test1 {
public static void main(String[] args) {
final int COUNT_BITS = Integer.SIZE - 3;
final int CAPACITY = (1 << COUNT_BITS) - 1;
final int RUNNING = -1 << COUNT_BITS;
final int SHUTDOWN = 0 << COUNT_BITS;
final int STOP = 1 << COUNT_BITS;
final int TIDYING = 2 << COUNT_BITS;
final int TERMINATED = 3 << COUNT_BITS;
System.out.println(Integer.toBinaryString(CAPACITY));
System.out.println(Integer.toBinaryString(RUNNING));
System.out.println(Integer.toBinaryString(SHUTDOWN));
System.out.println(Integer.toBinaryString(STOP));
System.out.println(Integer.toBinaryString(TIDYING));
System.out.println(Integer.toBinaryString(TERMINATED));
}
}
输出:
11111111111111111111111111111
11100000000000000000000000000000
0
100000000000000000000000000000
1000000000000000000000000000000
1100000000000000000000000000000
打印的时候会将高位0省略
可以看到,第一行代表线程容量,后面5行提取高3位得到:
111 - RUNNING
000 - SHUTDOWN
001 - STOP
010 - TIDYING
011 - TERMINATED
分别对应5种状态,可以看到这样定义之后,只需要通过简单的移位操作就可以进行状态的转换。
重要方法
execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**分三步执行
* 如果workerCount<corePoolSize,则创建一个新线程执行该任务
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //创建成功则return
return;
c = ctl.get(); //创建失败重新读取状态,随时保持状态的最新
}
/**
* workerCount>=corePoolSize,判断线程池是否处于运行状态,再将任务加入队列
* */
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); //用于double check
//如果线程池处于非运行态,则将任务从缓存队列中删除
if (! isRunning(recheck) && remove(command))
reject(command); //拒绝任务
else if (workerCountOf(recheck) == 0) //如果活动线程数为0,则创建新线程
addWorker(null, false);
}
//如果线程池不处于RUNNING状态,或者workQueue满了,则执行以下代码
else if (!addWorker(command, false))
reject(command);
}
可以看到,在类中使用了 Work类来代表任务,下面是 Work类的简单摘要:
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
...
Work类实现了 Runnable接口,使用了线程工厂创建线程,使用 runWork方法来运行任务
创建新线程时用到了 addWorker()方法:
/**
* 检查在当前线程池状态和限制下能否创建一个新线程,如果可以,会相应改变workerCount,
* 每个worker都会运行他们的firstTask
* @param firstTask 第一个任务
* @param core true使用corePoolSize作为边界,false使用maximumPoolSize
* @return false 线程池关闭或者已经具备关闭的条件或者线程工厂没有创建新线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 只有当rs < SHUTDOWN才有可能接受新任务
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); // 工作线程数量
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 不合法则返回
return false;
if (compareAndIncrementWorkerCount(c)) // 将工作线程数量+1
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) // 判断线程池状态有没有改变,改变了则进行外循环,否则只进行内循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 创建新线程
Worker w = new Worker(firstTask);
Thread t = w.thread;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 再次检查状态,防止ThreadFactory创建线程失败或者状态改变了
int c = ctl.get();
int rs = runStateOf(c);
if (t == null || (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null))) {
decrementWorkerCount(); // 减少线程数量
tryTerminate();// 尝试中止线程
return false;
}
workers.add(w);// 添加到工作线程Set集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}
t.start();// 执行任务
// 状态变成了STOP(调用了shutdownNow方法)
if (runStateOf(ctl.get()) == STOP && !t.isInterrupted())
t.interrupt();
return true;
}
再看 Work中的 runWork方法:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;//线程是否异常中止
try {
//先取firstTask,再从队列中取任务直到为null
while (task != null || (task = getTask()) != null) {
w.lock();
clearInterruptsForTaskRun();
try {
beforeExecute(w.thread, task);//实现钩子方法
Throwable thrown = null;
try {
task.run();//运行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//实现钩子方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;//成功运行,说明没有异常中止
} finally {
processWorkerExit(w, completedAbruptly);
}
}