面试几乎必问的问题就多线程的,一般都是以下面几个开始逐个深入:
1.使用多线程吗?
2.多线程有哪些优势和缺点?
3.线程池的了解过吗?还有挖坑问题JDK有几种线程城池,你们生产环境中用的哪个一个?
4.线程池都有哪些参数呀?跟着会问有哪些队列类型,拒绝策略都有哪些?
5.线程池的线程复用原理是什么?
为了回答好这些问题,这几天认真的读了一些线程池的源码和《深入了解Java虚拟机》书,来记录以下自己的学习;
首先线程分为用户态线程和内核态线程,用户线程串行执行,是依托于进程来调用cpu执行指令,并且数据相互之间不可见;而内核线程是依附于内核创建的,每个内核进程都可以调用cpu,jvm是内核线程;但是Java中的sun虚拟机中的是采用应用自己创建出来的线程是用户线程;用户线程通过轻量级进程来实现对内核线程的调用的;如图:
当java thread1的时间片执行完后切换到java thead 2时,由于两个线程之间数据不可见,需要把java thead 1中的寄存器/缓存/指令/中间数据刷新到主内存中;线程的创建和销毁是比重,Java的线程都会依赖于内核线程,创建线程都需要操作系统状态切换;为了避免过度的资源消耗就诞生了线程池;线程池就是一个线程的缓存,负责对线程统一分配调用和监控;
下面开始介绍线程池中类的关系图;
从ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:这下就弄明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;然后ThreadPoolExecutor继承了类AbstractExecutorService。在ThreadPoolExecutor类中有几个非常重要的方法:
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
shutdown()和shutdownNow()是用来关闭线程池的。
ThreadPoolExecutor类概述
````
//间接调用最后一个构造函数,采用默认的拒绝策略AbortPolicy和默认的线程工厂
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue)
//间接调用最后一个构造函数,采用默认的默认的线程工厂
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, RejectedExecutionHandler)
//间接调用最后一个构造函数,采用默认的拒绝策略AbortPolicy
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, ThreadFactory)
//前面三个分别调用了最后一个
ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue, ThreadFactory, RejectedExecutionHandler)
//最后一个构造函数的具体实现
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//参数合法性检验,核心线程数目、最大线程数目、线程空闲回收时间不得小于0,最大线程池不得小于核心线程数数目
if (corePoolSize <0 ||maximumPoolSize <=0 ||maximumPoolSize < corePoolSize ||keepAliveTime <0)
throw new IllegalArgumentException();
//参数合法性检验,阻塞队列,线程工厂,决绝策略不得为空
if (workQueue ==null || threadFactory ==null || handler ==null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;//核心线程数大小
this.maximumPoolSize = maximumPoolSize;//最大线程数目
this.workQueue = workQueue;//阻塞队列
this.keepAliveTime = unit.toNanos(keepAliveTime);//空闲回收时间
this.threadFactory = threadFactory;//线程工厂
this.handler = handler;//拒绝策略
}
````
线程池主要几个参数说明
corePoolSize:(volatile int)类型,表示线程池核心池的大小。
maximumPoolSize:(volatile int)类型,表示线程池最多创建的线程数目。
注:Java线程池分两部分,一块是核心池,一块是临时池,当核心池的线程满了且阻塞队列中任务满了,再有任务提交执行时,则创建"临时线程",可创建的临时线程的数目为maximumPoolSize-corePoolSize。当线程总数等于maximumPoolSize且阻塞队列满了,再有任务提交时,采取拒绝策略。
workQueue:阻塞队列。常用的有ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue。
keepAliveTime:线程空闲回收时间。
threadFactory:生成线程的工厂类。默认为:Executors.defaultThreadFactory();
handle:拒绝策略。默认为defaultHandler = new AbortPolicy();
注:常用拒绝策略有以下几种:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
```
/*AtomicInteger类型,用来标识线程池的状态,以及线程池里面线程的数量,
初始值为1110 0000 0000 0000 0000 0000 0000 0000 前三位是线程池的状态,其中:
000 SHUTDOWN 不接受新任务但是处理阻塞队列中的任务
010 TIDYING 所有任务都被终止,工作线程为0
001 STOP 不接受新任务也不处理阻塞队列中的任务并且中断所有线程池中正在运行的任务
011 TERMINATED 不接受新任务也不处理阻塞队列中的任务并且中断所有线程池中正在运行的任务
111 RUNNING 接受新的任务并处理阻塞队列中的任务
注:关于阻塞队列,后续会说,暂且理解为一个存放还未执行线程的队列就好。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//在某些情况下用来存储任务,并将任务提供给线程池中的工作线程
private final BlockingQueue<Runnable> workQueue;
//用来对pooSize、corePoolSize、maximumPoolSize、runState、workers修改时候同步
private final ReentrantLock mainLock = new ReentrantLock();
//线程池中所有线程的集合,访问和修改需要mainLock的配合
private final HashSet<Worker> workers = new HashSet<Worker>();
//用来支持waitTemination
private final Condition termination = mainLock.newCondition();
//跟踪线程池中线程的最大值,具体的猜测是为了矫正poolsize,访问和修改需要配合mainLock
private int largestPoolSize;
//已完成任务的数量,在任务处于Terminate状态时才更新,访问和修改需要mainLock的配合
private long completedTaskCount;
/*
* 一下参数都是用户控制的,全部被声明为了Volatile类型的值,这样能够确保在多线程下,每个
* 线程都能够获取到最新值。
*/
//线程工厂,用户可以自定义,以便在想线程池创建线程时附加一些个人操作
private volatile ThreadFactory threadFactory;
//当线程池处于shutdown或者处于饱和时执行的拒绝策略
private volatile RejectedExecutionHandler handler;
//设置线程池中空闲线程等待多时毫秒被回收
private volatile long keepAliveTime;
//指定线程池中的空闲线程是否一段时间被回收,false一直存活
private volatile boolean allowCoreThreadTimeOut;
//核心线程池大小,若allowCoreThreadTimeOut被设置,全部空闲超时被回收的情况下会为0
private volatile int corePoolSize;
//最大线程池大小,不得超过CAPACITY
private volatile int maximumPoolSize;
```
从代码中能看出是将线程状态和线程数放在了一个int值里面,高三位代表线程状态,低29位表示线程数目。
其中clt(AtomicInteger)可以理解为线程安全的整数。关于clt常用的几个操作,即对线程状态和线程数的操作:
runStateOf(int c) 是通过与的方式,在clt字段中获取到clt的前三位,也就是线程池的状态标识。
workerCountOf(int c)是通过与的方式,在clt字段中获取到clt的后29位,也就是线程池中的线程数量。
ctlOf(int rs, int wc) 是通过或的方式,将修改后的线程池状态rs和线程池中线程数量打包成clt。
isRunning(int c) SHUTDOWN的状态是0左移29为得到的,比他大的均是线程池停止或销毁状态
核心方法
线程池初始化之后,可以通过调用submit和execute方法来执行任务。其实submit内部也是调用execute方法,源码如下:
```
//sumit函数是在ThreadPoolExecute的父类AbstractExecutorService实现的,最终还是调用的子类的execute方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
```
接下来看execute的源码
```
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();//AtomicInteger
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
```
执行流程为下图,图片来至https://blog.csdn.net/u011637069/article/details/79591330
AddWorker 源码 通过源码发现addWorker实现了
1)才用循环CAS操作来将线程数加1;2)新建一个线程并启用。
```
private boolean addWorker(Runnable firstTask, boolean core) {
//(1)循环CAS操作,将线程池中的线程数+1.
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//core true代表是往核心线程池中增加线程 false代表往最大线程池中增加线程
//线程数超标,不能再添加了,直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS修改clt的值+1,在线程池中为将要添加的线程流出空间,成功退出cas循环,失败继续
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果线程池的状态发生了变化回到retry外层循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//(2)新建线程,并加入到线程池workers中。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//对workers操作要通过加锁来实现
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//细化锁的力度,防止临界区过大,浪费时间
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
//判断线程池的状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//判断添加的任务状态,如果已经开始丢出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将新建的线程加入到线程池中
workers.add(w);
int s = workers.size();
//修正largestPoolSize的值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//线程添加线程池成功,则开启新创建的线程
if (workerAdded) {
t.start();//(3)
workerStarted = true;
}
}
} finally {
//线程添加线程池失败或者线程start失败,则需要调用addWorkerFailed函数,
//如果添加成功则需要移除,并回复clt的值
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
```
执行流程如下:
再看一下worker内部类
继承自AQS,具有锁的功能,实现了Runable接口,具有线程的功能
```
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
//线程池中正真运行的线程。通过我们指定的线程工厂创建而来
final Thread thread;
//线程包装的任务。thread 在run时主要调用了该任务的run方法
Runnable firstTask;
//记录当前线程完成的任务数
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//利用我们指定的线程工厂创建一个线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
```
从源码可以看出来,Worker类的run方法实际上调用的还是ThreadPoolExecutor的runworker方法。下面将看一下ThreadPoolExecutor的runworker源代码和注释解析。
```
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//线程池处于stop状态或者当前线程被中断时,线程池状态是stop状态。
//但是当前线程没有中断,则发出中断请求
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted()) {
wt.interrupt();
}
try {
//开始执行任务前的Hook,类似回调函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务执行后的Hook,类似回调函数
afterExecute(task, thrown);
}
} finally {
//执行完毕后task重置,completedTasks计数器++,解锁
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//线程空闲达到我们设定的值时,Worker退出销毁。
processWorkerExit(w, completedAbruptly);
}
}
```
大概意思就是当前任务不为null或者从队列中取的任务不为null时,worker线程就一直去执行任务。当无要执行的任务时,尝试回收线程。
runWorker函数中最重要的是getTask(),他不断的从阻塞队列中取任务交给线程执行。下面分析一下:
```
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果线程池处于shutdown状态,
//并且队列为空,或者线程池处于stop或者terminate状态,
//在线程池数量-1,返回null,回收线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//标识当前线程在空闲时,是否应该超时回收
boolean timed;
for (;;) {
int wc = workerCountOf(c);
//如果allowCoreThreadTimeOut 为ture
//或者当前线程数量大于核心线程池数目,
//则需要超时回收
timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(1)
//如果线程数目小于最大线程数目,
//且不允许超时回收或者未超时,
//则跳出循环,继续去阻塞队列中取任务(2)
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
//如果上面if没有成立,则当前线程数-1,返回null,回收该线程
if (compareAndDecrementWorkerCount(c))
return null;
//如果上面if没有成立,则CAS修改ctl失败,重读,cas循环重新尝试修改
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
(2)
try {
//如果允许空闲回收,则调用阻塞队列的poll,
//否则take,一直等到队列中有可取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//取到任务,返回任务,
//否则超时timedOut = true;进入下一个循环,
//并且在(1)处会不成立,进而进入到cas修改ctl的程序中
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
```