参数概述
ExecutorService使用线程池中的线程执行每个提交的任务,从它的构造函数可以看到,创建一个线程池可以指定它的核心线程数量,最大线程数量,线程存活时间,阻塞队列,线程工厂和拒绝策略。
默认情况创建时线程池中线程数量为0,当向线程池提交一个任务,如果线程池中线程数量少于corePoolSize,那么线程池会用threadFactory创建一个新线程来处理该请求。如果线程数量已经达到corePoolSize,线程池会将任务放入workQueue中等待处理。当workQueue已满且此时线程池又接受到一个新的任务,如果corePoolSize<maximumPoolSize,线程池会创建额外的线程执行任务,该线程在线程池中存活时间由keepAliveTime决定。当线程数量等于maximumPoolSize时,再出现新的任务,线程池使用拒绝策略。
BlockingQueue
BlockingQueue用于存放等待执行的任务,主要有三种队列策略:
1.Direct handoffs 直接握手队列
使用容量为0的SynchronousQueue,它直接将任务交给线程而不会保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。
2.Unbounded queues 无界队列
当所有corePoolSize线程繁忙时,使用无界队列(没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。
3.Bounded queues 有界队列
一个有界的队列(一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽。让队列大小和maximumPoolSizes需要相互权衡。
Rejected tasks
当线程池关闭或任务队列已满且线程数量等于maximumPoolSizes时,线程池会执行拒绝策略拒绝新添加的任务,拒绝策略可以自定义,J.U.C预定义了四种处理策略:
AbortPolicy:默认测策略,抛出RejectedExecutionException运行时异常;
CallerRunsPolicy:这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;
DiscardPolicy:直接丢弃新提交的任务;
DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);
Executors中预置的线程池
FixedThreadPool
FixedThreadPool是一个固定大小的线程池,corePoolSize与maximumPoolSize相等,即其线程全为核心线程,由于keepAliveTime默认对核心线程无效该参数设置为0,workQueue 是一个LinkedBlockingQueue实现的无界阻塞队列。FixedThreadPool线程池的线程数量增加到corePoolSize后就会固定,提交的任务要么被线程执行,要么在BlockingQueue中等待。
CachedThreadPool
corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即不限制线程池中的线程数,但没有核心线程,线程空闲60s后自动结束。workQueue 为 SynchronousQueue 同步队列,因此不会有任务在队列中等待,提交的任务直接交给线程执行。线程池中的线程数量会有很大的波动。
SingleThreadExecutor
SingleThreadExecutor可以看做线程数为1的FixedThreadPool。区别在于FixedThreadPool创建后可以动态修改核心线程数,而SingleThreadExecutor创建后无法被修改。
源码分析
成员变量
ThreadPoolExecutor的成员变量很多,但很多变量都是用于设置参数,真正作为数据结构使用的变量并不多。
ctl
线程池使用一个AtomicInteger的ctl变量将 workerCount(工作线程数量)和 runState(运行状态)两个字段压缩在一起 (类似ReentrantReadWriteLock的state变量,高低16位分别存储读锁数量和写锁数量),ctl前3位表示runState,剩下位表示workerCount,线程池每创建一个线程执行新的任务,或每一个线程消亡,都会CAS修改workerCount的值。
runState有五种状态,可以通过线程池的shutdown和shutdownNow方法改变线程池状态。
Works:
当线程池创建一个新的线程执行提交的任务时,线程池会将线程和任务封装到内部类Work,加入到works中。一个work对象可以看做是一个线程对象的封装,work执行完当前任务后会在等待队列等待新的任务执行,直到线程池关闭或超时,work才会死亡。
内部类Work
Worker类继承了AQS,在AQS的基础上封装了线程和任务,并用completedTasks记录了当前线程执行的任务数。
Worker实现的AQS非常简单,就是一个不可重入的互斥锁。Worker实现AQS不是为了线程安全,主要是为了控制线程是否可interrupt,以及判断当前线程是否正在执行任务。
核心方法
execute方法
execute方法的核心功能就是调用addWorker方法执行接收的任务,这里分为3种情况。
1.当前线程<核心线程,调用addWorker(command, true) 创建核心线程执行当前任务(addWorker第二个参数表示是否创建核心线程)
2.核心线程数已满,调用workQueue.offer方法将任务加入阻塞队列。注意,加入成功后要做一个判断,如果当前线程数量为0,即进入阻塞队列的任务没有线程执行,需要调用addWorker(null, false): 创建一个非核心线程,直接去执行队列中的任务。
3.队列已满,加入队列失败,调用addWorker(command, false):创建非核心线程执行当前任务。
addWorker方法
addWorker核心逻辑就是创建线程执行任务,源码比较长,下面只摘取核心代码分析。
首先程序判断是否需要创建线程
如果线程池已经关闭,直接返回false,这里存在上面描述过的例外情况,即当前线程数量为0而队列中存在任务等待执行,这种情况即使线程池关闭也要创建线程。
如果线程数超过上限(超过核心线程或最大线程),返回false。
确定需要创建线程后,首先CAS让WorkerCount加1,然后新创建一个worker对象加入到works中,最后启动work中的线程执行任务。
runWorker方法
worker启动后会调用runWorker方法,runWorker方法的核心就在这个while循环语句中。
runWorker方法会先执行分配的第一个任务,执行完毕后就调用getTask()方法在等待队列中寻找任务执行,在getTask方法中可能会因为队列空让当前线程阻塞。
在任务执行中会有一段代码处理中断,如果线程池状态为stop,即调用shutdownNow,那么就调用Interrupt方法中断正在执行的线程。如果不是stop状态,那么调用interrupted方法取消中断状态,保证线程不会被中断。
getTask方法
getTask方法逻辑就是让当前线程从队列中获取任务,并且处理线程池关闭或超时时线程的清除。
从源码中可以看到线程池中线程删除的逻辑:
1.线程池处于stop状态时,所有线程都不再接受任务,直接死亡
2.线程池处于shutdown状态时,线程会继续执行等待队列中的任务,直到队列为空,线程死亡
3.线程池中没有区别核心线程和非核心线程,只要当前线程数超过核心线程数(timed为true)并且超时,当前线程就会死亡。
ScheduledThreadPoolExecutor
在 ThreadPoolExecutor 的基础上扩展了线程周期调度功能,阻塞队列使用的是DelayedWorkQueue。
具体参考
https://www.jianshu.com/p/5d994ee6d4ff
线程池关闭
线程池自动关闭的两个条件:1、线程池的引用不可达;2、线程池中没有线程;
因此关闭线程池时必须调用shutdown/shutdownNow方法,否则线程池无法被回收造成内存泄漏。