今天看了部分线程池的代码,对于线程池运行机制又有了一些新的认识, 本文是结合占小狼简书 加了一些自己的理解和注释.
该文只是作为个人对于占小狼关于深入分析java线程池的实现原理文的自己一些补充,建议先看他的文章之后再来看该文。有理解不对的地方望提出来共同学习。
1.线程池内部状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
AtomicInteger变量ctl的功能非常强大:利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态:
1、RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
2、SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
3、STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
4、TIDYING : 2 << COUNT_BITS,即高3位为010;
5、TERMINATED: 3 << COUNT_BITS,即高3位为011;
2.任务执行
//得到c从而拿到内部状态
int c = ctl.get();
//拿到线程池中的线程数,如果线程数小于核心线程数,那么直接创建新的线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//会发现线程池中很多地方都会重复的得到内部状态,因为线程池主要是用CAS来保证的线程安全性,从而导致如果swap失败的一些情况需要再次获得最新的内部状态用之后做判断。
//这里如果加入创建新的线程执行任务的时候出现错误,重新获取最新的内部状态用于之后使用
c = ctl.get();
}
//workQueue.offer会跟根据不同的阻塞队列从而导致线程池出现不同的线程添加机制,我会在2.1中着重说不同阻塞队列的情况。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果在我们添加到阻塞队列之后,状态不是RUNNING状态,会将当前任务从阻塞队列移除,并拒绝这次任务
if (! isRunning(recheck) && remove(command))
reject(command);
//这种情况是由于corePoolSize允许为0,当corePoolSize为0时,第一次会运行到这步,并添加线程到线程池中。当corePoolSize等于0时,会相当于只在核心线程池中添加一个线程用于消费阻塞队列的任务,这里也会在2.1结合不同阻塞队列说
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
2.1 不同阻塞队列对于线程池添加的影响。(这里建议先看过上面推荐的博客,并了解了核心线程池中的workers是什么以及线程池的流程)
workers其实就是存储这里的MaxPool中包含的所有的工作线程的一个HashSet
workQueue.offer其实就是要将任务添加到阻塞队列中,如果能够添加则返回true并添加到阻塞队列中,否则返回false。
addWorker这个步骤其实就是要向workers中创建一个新的线程用于消费阻塞队列中的任务。
但是对于不同的阻塞队列,offer的行为是不同的。
LinkedBlockingQueue,当我们初始化的时候没有给他初始容量,那么这里,他每次offer都可以添加到我们的阻塞队列中,因为LinkedBlockingQueue是基于链表结构的无界阻塞队列。那么我们如果corePoolSize不是0,则相当于只有当前workers中只有CorePool,当workerCountOf(c) > corePoolSize的时候,我们只是向阻塞队列中添加任务,供之后线程消费,而不会再添加新的worker到workers了,所以这个时候的MaxPool和CorePool是一样大的,maxmumPoolSize参数也就没有了意义。如果corePoolSize是0,则相当于只有一个线程在线程池中,之后的任务都直接进入到阻塞队列
LinkedBlockingQueue赋予了初始化容量,那么我的理解是和ArrayBlockingQueue作用是一样的。当我们的数量达到了核心线程数,接下来会向阻塞队列中添加任务,当我们的阻塞队列也满了。则再创建新的worker加入到workers中,当达到最大线程数时,最后会reject。
当我们当前的线程池核心线程数大小小于corePoolSize的时候,每次都会创建新的woker来执行,当我们等于核心线程数的时候,如果这个时候存在空闲的worker,那么会直接使用空闲的worker执行,当没有空闲worker的时候会向阻塞队列中添加command
SynchronousQueue,扩展先阅读下http://ifeve.com/java-synchronousqueue/.SynchronouseQueue主要是用来做信号量方面的应用的,Excutors.newCachedThreadPool就是用的这种方式从而达到缓存线程池的作用。
这里的SynchronouseQueue用的很精妙.
offer不是每回都返回false,而是当我们当前的可用的worker队列中,如果不存在可用的worker的时候才会返回false,如果存在可用的worker的时候,会返回 true,这是因为当我们worker执行完当前work的任务的时候,我们回去在继续getTask,发现task不存在的时候,我们会去调用workQueue.take或poll,从而阻塞在这里,当我们有可用woker的时候,相当于阻塞队列存在一个待消费的take,这个时候offer会返回true
因为之前一直对不同队列对线程池运行有什么影响不是很理解,这一部分是我经过测试并看了一部分源码得出的一些理解,有问题希望大家能提出来。