一 概述
1.1线程池的类继承结构
class ThreadPoolExecutor extends AbstractExecutorService
abstract class AbstractExecutorService implements ExecutorService
interface ExecutorService extends Executor
interface Executor
Executor
首先从最顶层的Executor开始。依旧先从理解API文档开始:
一个用来执行已提交的runnable列表任务的对象。该接口提供了一种将任务提交从任务执行中解耦的机制,包括从中分离出线程使用、调度的细节。我们可以更多地使用Executor而不是动辄显式地创建线程。或许我们是这样使用的:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
在实际的执行中并没有严格要求executor按照异步执行。
在一些简单的示例中,一个executor可以迅速执行一个任务线程
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
不过这不是正常的用法,更多的时候我们会把任务线程提交给不同实现的线程池来执行。不一样的线程池会在线程在什么时候执行、如何被执行这些细节上采取不一样的实现。
更常用的可实现的接口是ExecutorService,在此基础上有一个广泛使用的实现ThreadPoolExecutor。
这个顶层的接口下只声明了一个方法
void execute(Runnable command);
即用来执行每一个传入线程池的任务线程的方法。这个方法在不同的线程池下可能就有 不一样的实现了。
ExecutorService
继承了顶层的Executor接口,然后在这个基础上定义了一系列关于线程池的执行方法。
提供了终止线程池的方法。这里有两种不同的策略:shutdown()方法允许之前已经提交的任务线程在线程池终止之前继续执行;shutdownNow()则会阻止等待中的任务被执行并且尝试停止当前正在执行的线程。
通过这些终止措施,一个executors不会再有正在执行中的任务,不会有新提交的任务,也不会再有等待被执行的任务。一个不再被使用的ExecutorService应该被终止以释放资源。
接口继承了来自父类的execute方法,创建一个future对象来检测和切换线程的状态。invokeAny与invokeAll是常用的批量执行方法。批量执行一组任务并等待直到至少一个任务/全部任务完成。
API中的使用示例
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}
终止一个线程池的示例
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
二 线程池核心类ThreadPoolExecutor
2.1 API阅读
ExecutorService各自的实现对提交上来的任务有不同的执行方式,常用的方方式是executor工厂方法。
线程池解决了两个问题:通过减少单个任务的调用开销提升了在执行大量的异步任务时的表现;提供用来管理线程池资源的机制(比如线程),也提供ing一些基础的统计信息,比如已完成任务的数量。
现有的工厂方法能够提供一些常用类型的线程池,比如
Executors.newCachedThreadPool()
Executors.newFixedThreadPool(int)
Executors.newSingleThreadExecutor()
如果要自行配置,那么如下指南需要仔细阅读
Core and maximum pool sizes:ThreadPoolExecutor会根据corepoolsize与maximum的参数值自适应、调整当前线程池的大小。当一个新的任务以execute方法被提交,并且当前线程池中正在运行的线程数量少于corepoolsize的值时,会创建一个新的线程来处理这个任务请求,即使其他的工作线程处于空闲状态。如果当前正在运行的线程数超过了corepoolsize而小于maximum,则只有在队列已满的情况下才会创建新的线程。把corepoolsize和maximum设置为相同的值,可以得到一个固定容量大小的线程池;通过把maximum设置为一个接近无限大的值比如如Integer.MAX来让线程池可以接纳无限多的任务。更常用的方式是在使用构造函数的时候就设置好corepoolsize与maximum的值。然后在使用中通过方法setCorePoolSize(int) and setMaximumPoolSize(int) 来动态修改值。
预先启动执行任务的线程:默认情况下,执行线程会在任务被提交的时候才创建,但是我们可以通过方法prestartCoreThread()和 prestartAllCoreThreads() 来预先创建执行线程。
创建新的线程:一般通过ThreadFactory来创建新的线程。这个可以通过Executors.defaultThreadFactory()实现,创建出来的线程有相同的线程组、相同的线程优先级NORM_PRIORITY、和非守护线程性质。通过不同的线程池,可以编辑线程名称、线程组、优先级和是否是守护线程等。
keep alive times:如果当前线程池的工作线程数超过了corepoolsize,多出的线程会在超过存活时间过后终止。这可以节约资源。如果稍后线程池的使用频率再次回升,新的线程又会被创建。这个参数依然可以动态地修改setKeepAliveTime(long, TimeUnit)。默认情况下超时终止的策略只有在工作线程数超过corepoolsize的时候才有效。不过这个也可以修改,allowCoreThreadTimeOut(boolean)方法可以让线程池中的所有线程都能再超时终止策略下生效。
-
Queue队列:任何阻塞队列都可以用来运输和状态被提交的任务,此队列的大小和线程池size相关。
- 如果正在运行的线程数少于corepoolsize,那么executor会创建新的线程,而不是将任务入队。
- 如果正在运行的线程数大于等于corepoolsize,那么executor会将请求任务入队而不是创建新的线程。
- 如果请求不能排队,则会创建一个新线程,除非这会超过maximumPoolSize,在这种情况下,该任务将被拒绝。
//TODO 排队策略
-
Rejected tasks拒绝任务:在线程池已经被终止或者有界队列达到饱和的情况下,通过submit提交的新的任务会被拒绝。所有情况下execute方法都会调用RejectedExecutionHandler的rejectedExecution(Runnable, ThreadPoolExecutor)方法。有如下的拒绝策略可用:
- ThreadPoolExecutor.AbortPolicy 默认的拒绝策略是队列达到饱和后再有新的任务提交则抛出异常RejectedExecutionException。
- ThreadPoolExecutor.CallerRunsPolicy 由调用execute方法的线程来执行这个任务,包含一个较简单的反馈机制,减缓新提交任务的执行速度。
- ThreadPoolExecutor.DiscardPolicy 直接拒绝被提交的任务
- ThreadPoolExecutor.DiscardOldestPolicy 如果exexutor没有终止,那么丢弃队首的任务,然后重复尝试执行新加入的任务。
Hook methods钩子方法:这个类提供受保护的前置执行方法beforeExecute(Thread, Runnable)和后置执行方法afterExecute(Runnable, Throwable),用来在执行execute方法前后执行。可以用来清理执行环境,比如初始化threadlocal,收集一些静态信息或者加日志等等。此外,terminated() 方法可以重写用来在线程池中止后立刻执行一些清理工作。
Queue maintenance 维护队列:getQueue() 方法用于在调试和监管任务队列,不建议在在使用这个方法的时候做其他的操作。有两个补充方法remove(Runnable)和purge() 用于在队列中有大量的排队任务被取消时协助回收。
2.2 线程池简单使用
public class demo {
public static void main(String[] args) {
/*
* corePoolSize核心池大小10;maximumPoolSize线程池最大容量20;空闲线程存活时间50秒;任务队列长度10
* */
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 20, 50, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
while(true) {
executor.execute(new Task());
}
}
}
这个demo中我们直接使用了ThreadPoolExecutor的构造来创建了一个线程池,各项的配置参数都已经注明,不过这并不是推荐选项。相对地,concurrent包下有一个Executor类下有四种已经给出的实现来生成线程池。这种执行方式的内部,依然是使用ThreadPoolExecutor来实现,不同地地方在于帮我们配置好了一部分参数。这里着重说前面两种线程池。
CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
额,如果硬要生硬地翻译的话,我觉得可以称之为缓冲线程池。仅在有需要的情况下创建新的线程。看着这个构造会发现CorePoolSize值为0,而MaximumPoolSize几乎为无限大。设定了空闲线程的存活时间为60秒。注意这里使用的任务队列SynchronousQueue。这是一种很特殊的阻塞队列,如果有线程试图从队列中插入元素,这次插入操作成功的前提是此刻有线程试图从队列中获取元素。
总结
- 这个线程池制定了空闲线程的回收策略,线程空闲时间超过1min则会被回收。因为它的核心池值为0而最大值特别大,所以只要是运行中的线程都会进入回收策略的监管范围内。
- 虽然ThreadPoolExecutor的默认拒绝策略是抛出异常,但是因为maximumpoolsize为MAX,所以大量的任务也不会产生异常抛出的场景。
FixedThreadPool
Executors.newFixedThreadPool(int nThreads);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
定长线程池,任务队列是没有规定长度的,即一个无界阻塞队列。在一个具体的时刻,最多会有nThreads条线程正在执行任务。注意它调用ThreadPoolExecutor的构造入参。对于空闲的工作线程的处理方式上,没有存活时间设定,即被创建出来的工作线程会一直存活,直到线程池被关闭。如果在线程池满负荷运转的时候,有新的任务提交上来,那么这些任务会等待直到有线程完成任务空闲出来。
定长线程池采用的rejectPolicy是abortPolicy。但是测试结果表明持续添加任务并不会抛出rejectException。这里的原因是因为线程池采用了无界队列。因为任务队列LinkedBlockingQueue本身是没有长度限制的,所以不会达到任务队列长度的上限,因此也就不会有rejectException异常抛出了。
此外,定长线程池的池子大小的关键参数corePoolSize与MaximumPOOLSize大小一致。即为一个固定容量大小的线程池。
定长线程池简单的总结:
- CorePoolSize与MaximumPoolSize值相等的固定容量大小的线程池
- 虽然调用的ThreadPoolExecutor构造采用了默认的任务拒绝策略,但是使用了不限长度的阻塞队列LinkedBlockingQueue,所以在不断地添加任务的场景中不会抛出rejectException异常。所有的额外的任务都会被加入到任务队列中排队等待。
- 没有设定空闲线程的存活时间-设置的值为0。即创建出来的线程会一直存活。而不会因为空闲超时销毁掉。
ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize)
定时任务线程池,这个类的具体实现较前两者要复杂一些,这里不赘述了。创建一个定长线程池,支持定时、周期性地执行任务。
SingleThreadExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
仅有一个执行任务线程的线程池,遵从FIFO先进先出顺序。这里也不具体展开了。