一.使用线程池的好处
与“为每一个任务分配一个线程”相比,线程池有一些好处。
- 重用已经创建的线程,减少了创建、销毁线程的开销。
- 任务到达时,可能线程池中已经有创建好的线程供使用了,避免了等待线程创建的时间开销。
二.Java线程池实现原理
在Java中创建线程池可以使用Executors提供的四个静态方法创建适用于特定情况的几种线程池。但这些构造方法还是根据需求直接传入特定参数实例化了ThreadPoolExecutor类。所以,我们要想从原理上理解线程池,还是要先学习一下ThreadPoolExecutor的构造方法,看看都有哪些参数。这些参数其实可以理解为线程池的配置信息,根据自己的需求传入不同的参数就能构造出不同的线程池。
ThreadPollExector类有4个构造方法,其它三个的参数较少,使用了一些但归根结底是传入的7个关键参数决定了这个线程池是什么样的。所以关键是这七个参数。
2.1 ThreadPoolExecutor构造方法的7个参数
方法声明如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
corePoolSize :
核心线程数。当线程池刚刚创建时线程数是0,这时如果每来一个任务就会创建一个新的线程,直到已创建的线程数等于核心线程数后就不再创建了,而是把新任务放入阻塞队里去。
关于初始线程数:如果调用了prestartAllCoreThreads()会直接创建出corePoolSize个线程;如果调用了prestartCoreThread()则会提前创建出一个线程。
关于线程存活时间:核心线程不受keepAliveTime的影响,创建后会一直存在,直到线程池关闭。除非调用了allowCoreThreadTimeOut(true)方法
这里的疑问是,如果现有线程是有空闲的,但没达到核心线程数,来了新任务会创建新的线程吗?
答:这个疑问如果理解了线程池的工作过程就不会问了。详见下面的线程池工作流程。
maximumPoolSize
最大线程数。是线程池最多允许存在的线程总数。如果当前线程数已经达到corePoolSize。那么就将任务放入队列,如果队列也满了。就判断一下当前存在线程数是否小于maximumPoolSize,如果是,则创建新的线程执行任务。这里创建的线程处于核心线程池外,受keepAliveTime的影响,如果空闲到达执行时间就会销毁。
*keepAliveTime
控制非核心线程在空闲状态下的存活时间。如果调用allowCoreThreadTimedOut方法,核心线程也可以受它的影响。
unit
keepAliveTime的时间单位,有秒、毫秒、分钟、小时、天等。
workQueue
指定缓存队列
threadFactory
线程工厂,用来创建线程。
handler
表示当前拒绝处理任务时的策略,当缓存队列已满,线程数也达到maximumPoolSize时就按指定的策略处理新提交的任务。
主要有以下四种取值:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
2.2 线程池工作流程
- 当有新任务提交给线程池时,首先检查一下当前线程数是否达到了corePoolSize,如果没达到则新创建线程,将当前任务作为该线程的第一个任务执行。
- 如果已经到达了corePoolSize或超过了,则将任务放入缓存队列。
- 如果缓存队列已满,则判断当前线程数是否已经达到了maximumPoolSize,如果没到达则创建新线程执行任务
- 如果线程数达到了maximumPoolSize,则使用拒绝策略处理
2.3 ThreadPoolExecutor源码分析
2.3.1 线程池状态
线程池有running,shutdown,stop,tidying,termienated几种状态。在jdk1.8的实现中,复用了一个AtomicInteger对象来同时存储线程状态和当前线程数。具体代码不做展开,理解这种做法即可。
2.3.2 参数介绍
- ReentrantLock mainLook 一个锁,添加工作线程时要先获取锁
- HashSet<Worker> workers 存储工作线程
- int largesetPoolSize 记录曾经达到的最大线程数
- long completedTaskCount 记录已经完成的任务数量
其他还有构造方法传入的7个参数也都有相应的属性进行保存
2.3.3 execute(Runnable command)方法分析
通过这个方法将任务提交给线程池执行。这个方法基本是2.2节线程池工作流程执行的。
public void execute(Runnable command) {
if (command == null) //空指针异常
throw new NullPointerException();
//获取clt,这个AtomicInteger对象中存储着当前线程数和线程运行状态
int c = ctl.get();
//如果当前线程数小于核心线程数则执行添加线程动作
if (workerCountOf(c) < corePoolSize) {
//addWorker的第二个参数表示添加的是否是核心线程,这里是true
if (addWorker(command, true))
return;
//添加后重新获取状态值,因为线程数已经有变化了,线程池状态也可能变了
c = ctl.get();
}
//执行到这里,说明添加核心线程不成功,可能是数量达到corePoolSize或线程池shutdown了
//如果线程池还在运行尝试将任务加入缓冲队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果添加到队列后,线程池停止运行了,将任务从队列移除
if (! isRunning(recheck) && remove(command))
reject(command); //移除成功后使用拒绝策略处理任务
else if (workerCountOf(recheck) == 0) //分析1:如果核心线程池为空,添加一个非核心线程,处理队列中可能的任务
addWorker(null, false);
}
else if (!addWorker(command, false)) //队列满了,启动非核心线程执行任务
reject(command); //非核心线程启动失败,执行拒绝策略
}
2.3.4 addWorker(Runnable,Boolean)分析
addWorker方法用来给线程池中添加线程。第二个参数表示添加的是否是核心线程。下面来看一下它是怎么工作的吧!
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//分析1:注意这个if语句是和2.3.3中分析1相呼应的。
//调用shutdown()将空闲线程interrupt,正在执行的线程继续执行,将状态设为shutdown
//调用shutdownNow()将所有线程中断,不管有没有执行完。
//如果shutdown()后,核心线程都关闭了,队列中却还有元素,2.3.3分析1就添加了新的非核心线程处理,就是这里的!(rs==SHUTDOWN&&……)这种情况
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//超过执行线程数,返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//cas将线程计数加1,失败后重试
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//线程状态发生变化重新循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//下面是添加线程的过程
boolean workerStarted = false; //标识线程是否启动
boolean workerAdded = false; //标识线程是否被添加
Worker w = null;
try {
w = new Worker(firstTask); //新生成worker,线程池中的线程用它表达
final Thread t = w.thread; //获取实际的线程
if (t != null) {
//添加线程要先获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//获取当前状态
int rs = runStateOf(ctl.get());
//不是shutdown状态或者,处于shutdown状态但添加的tash是null,属于shutdown下添加线程处理队列中剩余任务的情况
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); //添加到workers,workers持有线程
int s = workers.size();
if (s > largestPoolSize) //记录最大线程数记录
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //启动线程执行
workerStarted = true;
}
}
} finally {
if (! workerStarted) //没成功
addWorkerFailed(w);
}
return workerStarted;
}
2.3.5 worker的执行
添加线程后,worker就开始执行了,在它的执行方法run里会直接调用tast的run方法,执行要干的事情。
执行完成之后会去队列获取新的任务执行。
如果没有新任务执行呢?
Worker是ThreadPoolExecutor的一个内部类,它的实现了Runnable方法。这里有个问题是Runnable需要传递给Thread才能执行。Worker是如何做到的呢?
原来,Worker类持有了一个Thread类型的变量thread,并在初始化时使用Worker本身初始化了thread
Worker的构造方法:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Worker的run方法调用了runWoker(this)。Worker还继承了AQS,这一块还有待学习。
第一次启动会执行初始化传进来的任务firstTask;然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//第一次时执行外部传过来的task,后面从getTask获取,getTask从队列获取任务执行,
//如果队列为空则等待keepAliveTime这么长的时间
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
2.3.5 其他内容
下一步应该分析getTask是如何从队列获取任务的了,这里不再展开讲了。
此外,还应该有队列的实现和选择问题,拒绝策略的具体等,目前先不做如此多的分析了,待未来时机成熟了再完善。
二.使用Executors创建具有默认配置的线程池
java.util.concurrent.Executors中提供了4个静态方法,可以用来创建具有指定特性的线程池。都是实例化了ThreadPoolExecutor对象。
2.1 newFixedThreadPool()方法
返回一个带缓存的线程池,该池在必要的时候创建线程,在线程空闲60s之后终止线程.
下面看一下这个方法的源码实现:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2.2 newFixedThreadPool
创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,指导达到线程池的最大数量,这是线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2.3newScheduledThreadPool
创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
2.4newSingleThreadExecutor
是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
Java并发之基础知识
Java并发之volatile关键字
Java并发之synchronized关键字
Java并发之原子类
Java并发之线程池
Java并发之并发工具类
Java并发之AQS原理
Java并发之ThreadLocal使用和源码分析