什么是线程池
Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建和销毁将消耗大量的计算资源。针对这种情况,我们需要使用线程池来管理线程,带来的好处有3个:
① 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
② 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
③ 提高线程的可管理性。线程是稀缺资源,不能无限制创建,否则不但会消耗资源,还会降低系统的稳定性,而使用线程池可以进行统一分配、调优和监控。而这些离不开对线程池原理的深入了解。
我想了解的东西
1.线程池是如何做到线程复用的
2.线程复用是怎么让线程不被回收而去等待执行下一个任务的(如CacheThreadPool可以让线程保留60s)
线程池构造方法与参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:等待时间
unit:时间单位
workQueue:阻塞任务队列
handler:拒绝策略
当线程池被创建出来后,通过调用其execute(Runnable command)方法将继承了Runnable接口的任务提交到线程池中。
1.当任务数量不足corePoolSize时,直接创建线程执行任务即可。
2.当任务数量达到corePoolSize时,将任务放入阻塞队列等待执行。
3.当阻塞队列也满员了,这时候会尝试创建线程执行新任务,当前线程数量<maximumPoolSize,则创建线程成功,否则执行拒绝策略拒绝执行任务。
4.当一个线程执行完毕后会尝试从阻塞队列中读取新任务执行。(这里就是线程复用的关键)
5.keepAliveTime等待时间是当线程数量大于corePoolSize并且当前没有任务执行时生效,意义为非核心线程在没有任务执行时保持多久自动销毁。(核心线程一旦创建是不会被销毁的,但是线程池调用allowCoreThreadTimeOut(boolean)时,该参数对核心线程也会生效。)
java内置的线程池
java内置了一些线程池的实现通过工厂模式方便开发人员调用。通过Executors的静态方法,直接得到一些线程池的默认实现。
//CachedThreadPool
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
内置的线程池如下
1.newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建一个单线程的线程池。这个线程池只有一个线程在工作(核心/最大线程为1),也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
2.FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建固定大小的线程池(核心=最大=n)。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
3.CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建一个可缓存的线程池(无核心线程,最大线程几乎无穷,由于没有核心线程,都是从任务队列里拉去任务执行,我们今天源码就分析它)。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
4.ScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
创建一个定长线程池,支持定时及周期性任务执行。
源码分析
重头戏来了,为了这一刻查阅好多东西,结合源码来看,终于弄懂了。
线程池提交任务execute分析
我们创建线程池之后,想让它实际运行,第一步就是向线程池提交任务。通过execute方法。源码如下:
public void execute(Runnable command) {
// (1)如果任务为null,抛出NullPointerException
if (command == null)
throw new NullPointerException();
//(2)获取当前线程池的状态和线程池中运行的线程个数
int c = ctl.get();
//(3)判断当前运行的线程是否小于核心线程数?
if (workerCountOf(c) < corePoolSize) {
//(3.1)小于核心线程数,执行addWorker(command, true)方法,让worker去执行,成功执行则返回,没成功说明在并发状态下,其他线程执行了execute方法导致核心线程满了,我们再次更新c,即当前运行的线程数。
if (addWorker(command, true))
return;
c = ctl.get();
}
//(4)判断线程池的状态(主要看有没有被shundown),正常的runnable则添加任务到阻塞队列(因为(3)没有命中,说明此时核心线程满了)
if (isRunning(c) && workQueue.offer(command)) {
//(4.1)再次检查线程池状态,没啥好说的
int recheck = ctl.get();
//(4.2)如果当前线程池状态不是runnable了,从队列中删除任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//(5)(4也未命中说明阻塞队列也满了,这时候尝试创建新线程,调用addWorker方法,注意此时参数未false,代表此线程为非核心线程)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//(6)(5也未命中说明当前线程数已经达到最大线程数了,直接执行拒绝策略)
else if (!addWorker(command, false))
reject(command);
}
我们再进一步,进入addWorker方法中一探究竟,因为在execute就是用它来执行的创建线程执行任务的。
源码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//(1)第一层死循环,检查线程池状态,不为runnable返回false
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//(2)第二层死循环,通过CAS操作增加线程个数
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加线程个数,同时执行的只有一个成功
if (compareAndIncrementWorkerCount(c))
break retry;
//CAS失败了,先看线程池状态,变化了就跳到外层循环去检查目前线程池状态,未变化则重新进入下一轮循环去CAS增加线程池个数。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//(3)到这里代表CAS成功了
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个Worker(worker是任务的实际执行者,等会要重点分析的源码,注意它的属性),并将任务firstTask作为参数给它(firstTask就是execute提交来的command)
w = new Worker(firstTask);
//获得worker里面的一个属性thread(它就是我们线程池里面的主角,线程池里的线程都是它)
final Thread t = w.thread;
//如果线程不为空,直接用该线程执行任务
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//(4)添加任务
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//(5)执行线程,执行任务
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
t.start()方法调用了t的run,而run是执行了runWorker(Worker)方法。
public void run() {
runWorker(this);
}
不明白?往下看
这是我们的线程池的实际执行者类Worker的构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;//任务最终到了Worker的属性中
this.thread = getThreadFactory().newThread(this);//创建一个线程(所以在addWorker中的t不会为空的,因为在构造方法中就新建了)
}
最终是我们的执行者runWorker登场!
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//firstTask给task
Runnable task = w.firstTask;
//将worker中的firstTask设为空,为什么?往下看
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// (1)如果当前task为空(task执行完了),或从阻塞队列中读取为空(无任务),跳出while循环
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//(2)真正的执行任务!为什么是run?start在那?还记得addWorker中得t.start吗,
//t就是worker中的那个thread属性啊!!这里的run当作普通方法调用。t只有那一个,
//所以是一个线程执行了所有该线程接到任务的run()!是不是复用了线程!
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
关于1和2在注释中都说完了,还有一点,为什么cachepool能让线程等待60秒再销毁(同样,为啥threadpool的核心线程在创建后能一直维持而不被销毁,我们的任务都执行完了啊。)答案就在1上,阻塞队列,我们在线程执行下一个提交任务的时候是看当前的task是否执行完了,还有阻塞队列中是否有任务!而阻塞队列是可以让线程阻塞在while的判断中!!大胆猜测,cacheThreadpool用读取任务方式是BlockingQueue中的poll(60,TimeUnit.SECONDS)!
至此,整个线程池的工作原理剖析完了,问题也说明了,舒服,之后会补充个线程池工作流程图。
补充:
源码中一直出现的int c = ctl.get();是在干什么?
要说明这个问题首先我们要看线程池想要记录什么,我们的源码一直在程序中判断,线程池有没有被shundown?有没有超过核心/最大线程数?
所以线程池要记录两个数据,线程个数,线程池状态。
ctl就是用来记录这两个数据的,它是一个integer的原子变量,32位2进制表示,其中高三位表示线程状态,低29位表示线程个数。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
111 running
000 shundown
001 stop
010 tidying
011 terminated
获取前三位:
private static int runStateof(int c)
获取后29位
private static int workerCountof(int c)