一 为什么要使用线程池
- 降低资源消耗。
- 提高响应速度。
- 提高线程的可管理性。线程是稀缺资源,使用线程池可以避免线程无限制的创建,并进行统一分配,调优和管理。
二 线程池实现原理
- 当前线程数小于corePoolSize,则创建新线程来执行任务。
- 当前线程数大于等于corePoolSize,则将任务加入BlockingQueue。
- 若队列已满,则创建新线程处理任务。
- 若线程数超过maxinumPoolsize, 则任务将被拒绝,并调用RejectExecutionHandler.rejectedExecution() 方法。
三 线程池使用
3.1 创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
** corePoolSize **:核心池的大小,该参数与之后的线程池实现原理有很大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用prestartAllCoreThreads()和prestartCoreThread()方法,从方法名字可以看出,是预创建线程的意思,即在没有任务到来之前,就创建corePoolSize个线程或1个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
** maximumPoolSize **:线程池中的最大线程数,表示线程池中最多能创建多少个线程。
** keepAliveTime **:表示线程没有任务执行时最多存活多久。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,知道线程池中的线程不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean value)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
** unit **:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; // 天
TimeUnit.HOURS; // 时
TimeUnit.MINUTES; // 分
TimeUnit.SECONDS; // 秒
TimeUnit.MILLISECONDS; // 毫秒
TimeUnit.MICROSECONDS; // 微妙
TimeUnit.NANOSECONDS; // 纳秒
- ** workQueue **: 一个阻塞队列,用来存储等待执行的任务。该参数也很重要,会对线程池的运行过程产生巨大影响,一般而言,有一下几种选择:
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序;
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列;
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列;
PriorityBlockingQueue:一个具有优先级的无限阻塞队列;
** threadFactory **:线程工厂,主要用于创建线程;
** handler **:饱和策略,即当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略:
ThreadPoolExecutor.AbortPolicy:
丢弃任务并抛出RejectedExecutionException异常;
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常;
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程);
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务;
每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。
corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:
假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
然后就将任务也分配给这4个临时工人做;
如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。
largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。不过,在分析问题时,可以知道线程池是否满过。
3.2 提交任务
两种方式:
// 不需要返回值
threadpool.execute(new Runnable() {
@Override
public void run() {
// do something
}
});
// 需要返回值
Future<Object> future = threadpool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
});
try {
Object result = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
threadpool.shutdown();
}
3.3 关闭线程池
可以通过调用shutdown或shutdownNow方法来关闭线程池。
他们的原理都是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以,无法响应中断的任务可能永远无法停止。
区别在于,shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有正在执行或者暂停任务的线程,并返回等待执行任务的列表,而shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
只要调用这两个方法中的任意一个,isShutdown方法都会返回true。当所有任务都关闭后,才表示线程池关闭成功,这时调用isTerminated方法会返回true。
3.4 合理配置线程池
根据任务的性质来配置
- 任务性质:CPU密集型,IO密集型,混合型。
CPU密集型应配置尽可能小的线程,如N(CPU) + 1;IO密集型任务应配置尽可能多的线程,如2 * N(CPU);可通过Runtime.getRuntime().availableProcessors();
- 任务优先级:高,中,低。
可使用优先级队列。
- 任务执行时间:长,中,短。
可用不同规模的线程池处理。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,CPU空闲较多,线程数应设置大些。
建议使用有界队列
增加系统稳定性和预警能力。
线程池的监控
- taskCount 返回过去任务的大概总数(包含queue size)。
- completedTaskCount 已完成任务数量,<= taskCount
- largestPoolSize 曾创建过的最大线程数
- getPoolSize 线程池的线程数量
- getActiveCount 活动线程数
可扩展线程池进行监控,如,任务平均执行时间,最大执行时间等。通过重写,beforeExecute, afterExecute等方法。