Executor的主要作用是解耦任务提交和任务执行(包括如何使用线程,如何调度)
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
Executor本身并不表示使用线程
ExecutorService提供了关闭机制以及提交任务返回Future对象用于追踪任务执行进度或取消任务。
先分析一下AbstractExecutorService实现中用到的Future的实现类FutureTask的实现机制
如果当前的任务是Runnable,通过RunnableAdapter转为Callable
FutureTask自身实现了Runnable,包装内部的Callable或Runnable
Future#get实现机制:如果任务还没开始,调用线程加入任务的等待队列,等待任务完成或取消时被唤醒,否则等待任务到达最终状态,正常执行返回结果或异常时抛出ExecutionException异常
Future#cancle实现机制:如果任务还没开始,状态改为INTERRUPTING或CANCELLED,如果支持中断,打断当前线程(参考run方法,先设置runner线程,再修改状态,所以可能当前状态是NEW,但是runner已经设置了),然后唤醒所有之前等待的线程。
AbstractExecutorService的实现机制:任务的具体执行都委托给从Executor继承的execute方法,主要实现了submit和invokeAll,invokeAny方法。
任务的执行都委托给Executor,所有提交的任务都用QueueingFuture包装,任务执行完加入内部的BlockingQueue。
invokeAny:先提交一个任务,然后循环检查ExecutorCompletionService的阻塞队列是否有已完成的任务,有就返回,没有就再提交一个新任务,直到任务都提交完,然后阻塞。第一个任务完成后,cancel所有可以cancel的任务。
AbstractExecutorService有两个具体的子类:ThreadPoolExecutor和ForkJoinPool,ScheduledThreadPoolExecutor又继承了ThreadPoolExecutor
ThreadPoolExecutor:
workQueue表示任务队列,workers表示当前执行任务的线程集合。
Worker继承了AbstractQueuedSynchronizer,自身就是一个简单的互斥锁,实现了Runnable,Worker在构造时内部会利用ThreadFactory产生一个线程,线程启动时,执行Worker自身的run方法。
Worker执行过程中,会通过getTask获取任务,每次执行任务之前都会获取worker自身的互斥锁
getTask通过返回null(线程池stop,或shutdown之后任务队列为空,或者动态调整参数之后线程太多,或者获取任务超时(说明任务太少了,不需要那么多线程)),控制Worker结束循环
Worker循环结束有两种原因:执行的任务抛出异常,getTask返回null。
如果是后者,再次检查以确保目前的线程数不低于最低要求,线程数不够时添加worker线程。因异常而结束任务循环也会添加新的worker线程。
添加worker失败的原因有三:线程池stop;shutdown之后任务队列为空;当前线程数超过最大线程数。worker添加成功之后,启动内部的线程,开始循环处理任务。
关键点在于,核心线程全部启动之后,任务会先加入任务队列,只有任务队列是有界队列,且队列满了才会启动非核心线程!!!
shutdown之后,修改状态为SHUTDOWN,然后打断所有idle线程,所谓idle,就是可以获取worker的互斥锁,说明worker当前在等待任务而不是执行任务,参考runWorker方法。如果当前所有worker正巧都在等待任务,所有worker都会被打断(processWorkerExit方法会在worker退出循环时调用,根据情况再添加worker)。tryTerminate中会先检查如果当前状态是SHUTDOWN但是任务队列不为空,不能进入terminal状态,如果当前是shutdown且任务队列为空且线程数为空,修改状态为过渡状态TIDYING,然后修改为最终状态TERMINATED。
打断所有已经启动的worker,返回所有还未执行的任务。
shutdown之后线程池并不一定关闭!!!所以正确的做法是shutdown之后调用awaitTermination等待所有任务执行完后所有线程被打断。
ThreadPoolExecutor可控制参数:
corePoolSize:核心线程数,worker数量小于corePoolSize时每次提交任务都启动一个core线程,可以使用set方法在运行时调整。
maximumPoolSize:最大线程数,包括core和非core线程,从上面的源码分析可以直到只有任务队列为有界队列时才会启动非core线程。
workQueue:任务队列,只有任务队列为有界队列时才会启动非core线程。
keepAliveTime:worker在指定时间内获取不到任务,说明此时人浮于事,需要裁员,getTask会返回null,结束获取任务超时的worker。
threadFactory:定义如何产生线程,默认直接new Thread。
handler:提交任务时任务队列满了或线程池shutdown之后的行为,默认抛出RejectedExecutionException异常,可选策略包括忽略(DiscardPolicy),在提交任务的线程中执行(CallerRunsPolicy),移除任务队列里最前面的任务(DiscardOldestPolicy)。
keepAliveTime:如果通过set设置了值,如果一个worker超过指定时间未获得任务就会timeout而结束循环,如果当前线程数超过了corePoolSize,不会再添加新的worker,默认不支持timeout。
allowCoreThreadTimeOut:默认线程数小于corePoolSize,timeout之后就会添加新的worker,如果设置了allowCoreThreadTimeOut,只有当前线程为0时才会添加新的worker。
下面分析一下ThreadPoolExecutor的子类ScheduledThreadPoolExecutor的实现机制:
从构造上看,主要是任务队列使用了DelayedWorkQueue,DelayedWorkQueue是一个简单的基于二叉堆实现的优先级阻塞无界队列,所有任务按触发时刻排序,keepAliveTime为0,不支持worker超时。从上文的分析可知,使用无界队列时是不会启动非core线程的,maximumPoolSize设置成了Integer.MAX_VALUE而不是corePoolSize,避免运行时修改corePoolSize时还要修改maximumPoolSize。
所有提交的任务都会用ScheduledFutureTask包装
任务先按触发时刻排序,同时触发的任务按提交顺序排序
如果是重复任务,任务执行完,计算下次触发时刻,重新加入任务队列。此处有一个细节:就算是fixed-rate的任务,也是上次执行完之后才会再次加入任务队列。
shutdown之后不允许提交新任务,如果是之前提交的延迟任务还没到时间或者是周期性任务,根据参数决定是否还能继续执行,默认运行继续等待执行延迟任务,不允许执行周期任务。
ForkJoinPool:日后补充!!!
下面来分析一下Executors里的静态方法构造的都是什么线程:
无界队列,不支持timeout,固定线程数。
newSingleThreadExecutor = newFixedThreadPool(1)
使用特殊的队列SynchronousQueue,相当于容量为1的阻塞队列,只有这样,如果已经有任务在等待执行了,再次提交任务时才会启动非core线程。