线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的读者更加明确线程池的运行规则,规避资源耗尽的风险。
- FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
- CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
// maximumPoolSize 必须大于 0,且必须大于 corePoolSize
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
第 1 个参数:corePoolSize 表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。
第 2 个参数:maximumPoolSize 表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。
第 3 个参数:keepAliveTime 表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。
第 4 个参数:unit 表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。
第 5 个参数:workQueue 表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。
第 6 个参数:threadFactory 表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程。
从源码的角度分析ThreadPoolExecutor的使用
添加任务的过程是怎么样的;
核心线程怎么保持不被销毁的;
核心线程、非核心线程、工作队列之间的协调。
从execute()
方法开始,从该方法看出线程池新建线程执行任务的策略:
- 当前线程数小于核心线程数的时候直接新建线程执行任务;
- 如果队列能够添加任务,则优先将任务添加到队列,而不是新建进程;
- 在队列已满的时候,如果线程数小于最大线程数,则新建非核心线程执行任务,否则报错。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当前正在工作的线程小于核心的线程数,跳转到addWorker()方法,新建线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//大于等于核心线程数:当前线程在运行状态,且添加到任务队列成功了。
//对于LinkedBlockingQueue队列,添加任务总会成功(它的任务队列长度很大),所以代码走这里,
//对于SynchronousQueue因为添加失败(不能添加任务到队列),所以不会从这个分支走。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//重新检查状态,可能这个时候线程池被停止了,要remove新添加的任务
if (! isRunning(recheck) && remove(command))
reject(command);
//判断当前线程数是不是0,如果是,则新建一个线程执行新加到队列里面的任务,否则会没有线程去执行新添加进阻塞队列的任务(command)。
//如果当前线程数大于0,则不需要添加,因为有线程在运行,在线程执行完成之后,后唤醒阻塞队列里面的任务,新加到阻塞队列里的任务还是能够被执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//对于SynchronousQueue会走到这里,所以,在线程数小于最大线程数的时候,会新建线程执行任务,当大于最大线程数时,会直接报错reject(),此时addWorker()会失败
else if (!addWorker(command, false))
reject(command);
}
执行流程图如下:
addWorker()
源码分析:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//在SHUTDOWN状态下,如果队列不为空,且firstTask == null,阻塞队列里面还有未完成的任务,则还是会继续执行后面的操作,
//说明了在该状态下,能够执行阻塞队列里面的任务,firstTask为空说明了是添加了一个执行阻塞队列里面任务的线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//获取当前正在执行的线程数,判断添加进来的任务是不是核心任务,如果是,则正在执行线程数要<=corePoolSize ,
//如果不是,则正在执行线程数要<=maximumPoolSize。否则新建线程失败。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//线程数增加1,添加成功,跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新建一个Worker,下面说明这是个什么东西
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//当前线程池的正在运行或者是SHUTDOWN状态且firstTask为空,则要新建线程去执行任务。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//会执行Worker的run()方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}