Executor
是一个接口,仅要求实现void execute(Runnable command);
,要求是在各种场合(新开的线程、线程池内等等),同步(在execute方法里立刻执行)或异步得完成某一任务。
ExecutorService
接口,继承自Executor
,扩展了Executor
并添加了一些生命周期管理的方法。
一个ExecutorService
的生命周期有三种状态:运行、关闭、中止。
当调用ExecutorService.shutdown()
后,处于关闭状态,isShutdown()
方法返回true
。此时无法新加任务。
所有已添加的任务关闭后,Executor处于终止状态,isTerminated()
返回true。这要求之前肯定调用过shutdown
或shutdownNow
。可以用awaitTermination
阻塞式得等待所有任务都已经关闭。
此外,调用多了submit和invoke:
submit
和executor
的区别在于它的入参可以是Runnable
也可以是Callable
,而且有返回值Future<?>
invoke系(invokeAll
、invokeAny
)的入参都有Collection<? extends Callable<T>> tasks
,返回T或T的集合。
Future FutureTask
Future<V>
是一个接口,规定了异步执行的操作,通过get()
方法可以获得操作的结果,如果异步操作还没有完成,则get()
会使当前线程阻塞,可以指定等待时间。cancel
方法取消操作。
FutureTask<V>
是一个类,实现了RunnableFuture<V>
,该接口继承自Runnable
和Future<V>
内部持有一个Callable<V>
,构造函数需要一个Callable<V>
但也可以接受一个Runnable
(转化this.callable = Executors.callable(runnable, result);
)
异步的原理:
- 持有一个
volatile int state
,表面目前的状态,是新建还是完成还是取消之类的。 - 内部类
WaitNode
,是一个存储thread的链表,当前节点指向当前线程,next指向下一个可用线程。通过CAS操作更改运行线程。 - 通过
LockSupport.park
在运行完成前阻塞获取,LockSupport.unpark
取消阻塞。
ThreadPoolExecutor
ThreadPoolExecutor
继承自AbstractExecutorService
,后者实现了ExecutorService
首先实现了ExecutorService
的各方法
构造函数和各参数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数
- maximumPoolSize:最大线程数
- keepAliveTime:线程空闲时间
- TimeUnit 时间尺度
- workQueue 阻塞队列,用于存放等待着的线程
- threadFactory 线程工厂,给内部类worker提供线程
- rejectedExecutionHandler:任务拒绝处理器
提供了四种方式来处理任务拒绝策略- 直接丢弃(DiscardPolicy)
- 丢弃队列中最老的任务(DiscardOldestPolicy)。
- 抛异常(AbortPolicy)
- 将任务分给调用线程来执行(CallerRunsPolicy)。
内部有一个worker
类作为任务执行者,HashSet<Worker> workers
作为线程池,workQueue
作为等待的队列
状态通过final AtomicInteger ctl
保存
五个状态:
- RUNNING 接受新任务和处理队列里的任务
- SHUTDOWN 不接受新任务但是处理队列里的任务
- STOP 不接受新任务,不处理队列里的任务,中断处理中的任务
- TIDYING 所有任务都被关闭(terminated),workerCount为0,将调用
terminated
- TERMINATED
terminated
已被调用
状态转换:
- RUNNING -> SHUTDOWN 调用
shutdown()
,可能隐式地在finalize()
时调用 - (RUNNING or SHUTDOWN) -> STOP 调用
shutdownNow()
时 - SHUTDOWN -> TIDYING 当队列和池都空的时候
- STOP -> TIDYING 当池空的时候
- TIDYING -> TERMINATED 当
terminated()
调用结束时
execute策略:
Proceed in 3 steps:
- 少于corePoolSize的线程在运行时,尝试新建一个线程处理这个任务。addWorker方法检查运行状态和workerCount。
- 如果一个线程可以被移出队列,检查状态判断是否要新加一个线程还是取消这个任务
- 如果不能从队列里拿出一个线程,就新加一个线程。失败了就取消这个任务。
线程池的工作过程如下(转个别人的总结):
- 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
- 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
- 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
Executors
工具类
创建线程池,本质是生成ThreadPoolExecutor,举例:
- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
- newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
- newCachedThreadPool
可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。 - newFixedThreadPool
定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。定长线程池的大小最好根据系统资源进行设置,如Runtime.getRuntime().availableProcessors()。 - newScheduledThreadPool
创建一个定长线程池,支持定时及周期性任务执行。 - newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
Executors
还提供默认的线程工厂defaultThreadFactory()