前情提要
如果你是一个面向对象语言的从业者,那么你一定或多或少都对线程池有所了解,在工作中或许也曾用过线程池。那么当有人问到线程池的工作原理时,你能把线程池的工作模型讲清楚吗?
什么是线程池,为什么要用线程池
在一个应用程序中,我们需要多次使用线程。而线程的创建和销毁工作都是需要占用系统资源的,那么能不能把这部分系统开销节约出来,让应用程序更加关注于处理任务而不是在花时间在处理任务的同时还要处理线程相关的开销呢?
答案是可以的。而且正是基于上面的问题,所以在Java中引入了线程池。那么线程池是怎么工作的呢?
简而言之,线程池内部维护了一个或多个线程的集合,通过在任务到达之前就创建好可用线程的方式来达到复用和平衡系统开销。
通过上面的说明,可以简单列举出使用线程池的优势:
- 提高响应速度:当有任务到达时,基于已存在的线程,可直接开始任务,不用在线程创建和销毁上花额外的时间和资源;
- 降低系统资源消耗:通过复用的方式减少了线程创建销毁次数;
- 除了上面两点之外,线程池还提供了其他的便利之处。比如:控制线程的并发数,防止应用程序无节制的创建线程资源(比如在一个8核CPU的机器上同时跑10000个线程,这个系统调度时间都够应用程序喝一壶了)。
另外,线程池是一种池化思想的实现。比如数据库的连接池、HttpClient连接池等等都是池化思想的落地方案。池化思想是一种技术手段,没有严格的落地准则,只要你达到复用的目的就行。池化技术是一种学习成本低,收益却相当可观的技术。如果你在工作中有遇到大量创建对象,并且这些对象可进行复用的场景,想要提高系统响应速度,减少系统开销,请大胆尝试把池化技术引入到项目中吧。
Java线程池的实现
到目前为止,Java给我们提供的线程池的生态可以说是比较健全的。这些所有的线程池的模型都直接或间接实现了java.util.concurrent包下面的Executor接口。由于类图相当大,这里就不贴出来了。可自行下去查看。
在所有的线程池实现种类中,常用的有ThreadPoolExecutor、ForkJoinPool、ScheduledThreadPoolExecutor等。本篇只会设涉及到ThreadPoolExecutor相关的内容,对于其他的类型可自行查看源码及Api文档。因为这部分源码都相对简单,在你真正了解了ThreadPoolExecutor的工作方式之后再去看其他种类的线程池时就可以举一反三,类比分析了。
ThreadPoolExecutor线程池的工作原理
以ThreadPoolExecutor为例,来看一下线程池的主要工作原理。ThreadPoolExecutor有好几个构造方法,因为其他的构造方法都是基于全参构造方法实现的。这里直接来看一下ThreadPoolExecutor的全参构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以看到该构造方法有7个参数,分别为corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory和handler。下面分别了解一下这七个参数是什么,有什么作用。
(一)corePoolSize
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* 除非设置了allowCoreThreadTimeOut,否则即使它们处于空闲状态也要保留在池中的线程数
corePoolSize叫核心线程数,意思就是核心工作线程的个数。这里的“核心”一词是相对于“临时”线程来说的,我们可以把核心工作线程想象为正式编制内的员工,临时工作线程就相当于外包工。当一个项目工期很紧,公司内的正式员工不够用时,很多公司就会招聘外包工来赶项目进度。
在源码注释中提到了allowCoreThreadTimeOut属性。当allowCoreThreadTimeOut设置为false时,即便线程池中所有线程都处于空闲(没事可做)的时候,线程池仍然要维护这些空闲的线程。而当allowCoreThreadTimeOut设置为true的时候,表示我希望线程池中所有的线程都处于没事可干的时候,能销毁所有线程,而不是让这些空闲的线程一直占用着资源。
(二)maximumPoolSize
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* 池中允许的最大线程数
maximumPoolSize好理解,定义也很简单:最大线程数,也就是当前线程池所能承受的最大的线程数量是多个。
(三)keepAliveTime
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* 当线程数大于核心数时,这是多余的空闲线程将在终止之前等待新任务的最长时间。
关于keepAliveTime的参数描述不太容易理解,这里暂且给一个定义叫最长空闲时间。当线程池中当前存活的线程数量比允许的核心线程数(corePoolSize)大的时候,对当前线程池中的所有空闲线程进行活性检测,一旦超时(空闲时间比keepAliveTime长),就销毁这个线程。事实上,当allowCoreThreadTimeOut设置为true时,尽管当前线程数并不大于核心线程数,也会触发空闲线程的活性检测,因为这个活性检测的结果会作为是否销毁一个线程的依据。换句话说,要销毁一个空闲的线程,那么它的活性检测结果一定是超时。
(四)unit
* @param unit the time unit for the {@code keepAliveTime} argument
* keepAliveTime参数的时间单位
上面提到keepAliveTime是一个时间范围,提到时间就应该有单位,unit就是keepAliveTime的单位。
(五)workQueue
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* 在执行任务之前用于保留任务的队列。 此队列将仅保存由{@code execute}方法提交的{@code Runnable}任务。
workQueue就是工作队列的意思,可理解为任务的暂存。为什么叫暂存呢,workQueue并不处理任务,它只负责把收到的任务转交给线程池中的具体线程去执行。什么时候需要用到workQueue呢?当线程池中的线程已经达到了核心线程数,并且所有线程都处于活动状态(正干着其他任务)的时候,才会将任务放到workQueue中去,等线程池中的线程有空闲了再来队列中取任务执行。
workQueue是阻塞队列,因为阻塞队列不需要考虑额外的同步处理及唤醒策略。同步处理很好理解,说白了就是在某一个时刻永远只有一个线程在进行入队和出队的操作,这是为了保证线程安全。而唤醒策略稍微难理解一点,比方说,当前的线程需要到队列中取一个任务,而队列中没有任务,那么这个线程会被阻塞。一旦队列中有了任务,那么线程会自动被唤醒去拿任务。如果这里使用的不是阻塞队列,那么我们就得去写阻塞线程和唤醒线程的代码。
(六)threadFactory
* @param threadFactory the factory to use when the executor
* creates a new thread
* 线程池创建新线程时要使用的工厂
threadFactory是用来定制线程的创建过程的,该参数在需要使用其他的线程创建过程时使用。
(七)handler
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* 因为线程数和队列容量都已经达到极限时,所要采取的机制
handler可理解为饱和策略,意思是线程池中的线程数和队列中都已经堆满了任务时,如果还有任务被提交进来时,该线程池该怎么处理这种情况。
在ThreadPoolExecutor里面定义了4种handler的策略,他们分别是:
- CallerRunsPolicy:这个策略重试添加当前的任务,他会自动重复调用execute()方法,直到成功为止;
- AbortPolicy:对饱和任务抛弃处理,并且抛出异常;
- DiscardPolicy:对饱和任务直接抛弃;
- DiscardOldestPolicy:抛弃队列里面等待最久的一个线程,然后把饱和任务加到队列中。
工作流程分析
我们已经把所有的参数都看了一遍,在每个参数中,已经对这个参数的作用做了简单的介绍。下面我们来看一下线程池到底是怎么工作的,各个参数在里面起了什么作用,以及怎么协调工作的。
比如,如下的代码是怎么在线程池中工作的呢?
public class ThreadPoolTest {
public static class Say implements Runnable{
private int threadId;
Say(int threadId){
this.threadId = threadId;
}
@Override
public void run() {
System.out.println("第 " + threadId + "个线程 say Hello...");
try {
// 模拟长耗时任务
Thread.sleep(10000);
}catch (Exception e){
System.out.println("第 " + threadId + "个线程 have a problem...");
}
System.out.println("第 " + threadId + "个线程 say Bye...");
}
}
public static void main(String[] args){
/**
* 线程池:核心线程数=2
* 最大线程数=3
* 最长空闲时间=60s
* 有界队列=6
* 饱和策略=抛弃任务并抛出异常
*/
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,3,60,
TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(6),
Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
for(int i = 1 ; i < 11 ; i++){
threadPoolExecutor.execute(new Say(i));
}
threadPoolExecutor.shutdown();
}
}
为了方便演示整个任务的提交及执行情况,此处假设一个极端的场景:假设直到最后一个任务进来时,第一个任务都还没有执行完。这样的场景可以排除因为某个任务很快被执行完后,当前线程又接了另外的任务对结果造成的影响。在代码中,我加了一个Thread.sleep()的方法,就是模拟这些任务都是耗时任务。
针对上面的代码执行过程,我们可以画一个简单的图来进行描述。
现在针对上面的代码片段结合图片进行工作流程分析:
- 线程池初始化,此时线程池中没有运行的线程;
- 第一个任务提交进来,创建一个线程,这个线程为核心线程,此时线程池中运行的核心线程为1;
- 第二个任务提交进来,创建一个线程,这个线程为核心线程,此时线程池中运行的核心线程为2;
- 第三个任务提交进来,此时核心线程数已达到最大值,将该任务丢进队列中;
- 第四个任何一直到第八个任务同上一步;
- 第九个任务提交进来,此时队列已满,创建一个临时线程,此时线程池中运行的核心线程为2,临时线程为1;
- 第十个任务提交进来,此时线程数已达到最大线程数,队列也处于满载,此时执行饱和策略;
看一下代码片段最终的执行结果:
// 第 2个线程 say Hello...
// 第 9个线程 say Hello...
// 第 1个线程 say Hello...
// Exception in thread "main" java.util.concurrent.RejectedExecutionException: ***
// 第 2个线程 say Bye...
// 第 1个线程 say Bye...
// 第 9个线程 say Bye...
// 第 4个线程 say Hello...
// 第 3个线程 say Hello...
// 第 5个线程 say Hello...
// 第 5个线程 say Bye...
// 第 3个线程 say Bye...
// 第 4个线程 say Bye...
// 第 7个线程 say Hello...
// 第 6个线程 say Hello...
// 第 8个线程 say Hello...
// 第 6个线程 say Bye...
// 第 8个线程 say Bye...
// 第 7个线程 say Bye...
上面结果也侧面印证了一些事情:比如第9个任务抢在3~8之前被执行说明第9个任务启用了临时线程,第1和第2个任务是最先被执行的,说明核心线程数是2。345和678是分成了两组被执行,说明那段时间之内是由三个线程同时工作。尽管其中有一个是临时线程,但因临时线程那段时间并未处于空闲状态(或者说没有超时),所以临时线程同样在工作中。
扩展区域
扩展区域主体
这是一个没有实现的扩展。
上一篇:什么是死锁?怎么解决死锁?