一、核心成员变量
1. task状态
private volatile int state;
// 任务的初始状态都是NEW, 这一点是构造函数保证的
private static final int NEW = 0;
// 任务结束,正在设置任务结果,这是任务可能正常执行完成,可能抛异常了,可能被取消了,反正任务已经不在跑了
private static final int COMPLETING = 1;
// 任务正常执行完毕
private static final int NORMAL = 2;
// 任务执行过程中发生异常
private static final int EXCEPTIONAL = 3;
// 任务被取消
private static final int CANCELLED = 4;
// 正在中断运行任务的线程
private static final int INTERRUPTING = 5;
// 任务被中断
private static final int INTERRUPTED = 6;
- 总共有7种状态:包括了1个初始态,2个中间态和4个终止态
- 任务的初始状态都是NEW, 这一点是构造函数保证的
- 任务的中间状态有2种
- COMPLETING 正在设置任务结果
- INTERRUPTING 正在中断运行任务的线程
- 任务的终止状态有4种
- NORMAL:任务正常执行完毕
- EXCEPTIONAL:任务执行过程中发生异常
- CANCELLED:任务被取消
- INTERRUPTED:任务被中断
- 任务的中间态并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果,其实只要state不处于 NEW 状态,就说明任务已经执行完毕
- 可能的状态变化
- NEW -> COMPLETING -> NORMAL:任务正常完成
- NEW -> COMPLETING -> EXCEPTIONAL:执行任务时抛出了异常
- NEW -> CANCELLED: 执行任务时被取消且mayInterruptIfRunning参数为false
- NEW -> INTERRUPTING -> INTERRUPTED:执行任务时被取消且mayInterruptIfRunning参数为true
2. 其它
// 任务本身
private Callable<V> callable;
// 任务的结果
private Object outcome; // non-volatile, protected by state reads/writes
// 任务的执行者,指得是线程
private volatile Thread runner;
- callable属性代表了要执行的任务本身,即FutureTask中的“Task”部分,为Callable类型,这里之所以用Callable而不用Runnable是因为FutureTask实现了Future接口,需要获取任务的执行结果
- outcome属性代表了任务的执行结果或者抛出的异常,为Object类型,也就是说outcome可以是任意类型的对象,所以当我们将正常的执行结果返回给调用者时,需要进行强制类型转换,返回由Callable定义的V类型
- runner属性代表执行任务的线程,在执行run方法时会赋值
二、内部类WaitNode
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
- WaitNode是个单向链表,用来保存所有等待任务执行完毕的线程的集合
- WaitNode只包含了一个记录线程的thread属性和指向下一个节点的next属性
- WaitNode是当做Treiber栈来使用的,同一时刻可能有多个线程都在获取任务的执行结果,如果任务还在执行过程中,则这些线程就要被包装成WaitNode扔到Treiber栈的栈顶,即完成入栈操作,这样就有可能出现多个线程同时入栈的情况,因此需要使用CAS操作保证入栈的线程安全,对于出栈的情况也是同理
三、重要方法
1. run方法
public void run() {
// state != NEW 表示任务已经跑完了;
// !RUNNER.compareAndSet(this, null, Thread.currentThread())表示已经有线程在执行这个任务了
// 满足上面其中一个条件就没必要再执行这个任务了,直接返回
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 任务不为空,且状态是NEW,那么就执行任务
// 这里又判断了state == NEW是防止任务被外部线程中断或者取消
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 执行任务并返回结果
result = c.call();
// 执行完成
ran = true;
// 执行任务时抛出了异常
} catch (Throwable ex) {
// 执行结果为空
result = null;
// 执行失败
ran = false;
// 设置异常
setException(ex);
}
if (ran)
// 如果执行成功,设置执行结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 最后将执行者置为空
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 如果线程是中间状态正在被打断,将while循环至中间状态被变成最终状态
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- 将runner属性设置成当前正在执行run方法的线程
- 调用callable成员变量的call方法来执行任务
- 设置执行结果outcome, 如果执行成功, 则outcome保存的就是执行结果;如果执行过程中发生了异常, 则outcome中保存的就是异常,设置结果之前,先将state状态设为中间态
- 对outcome的赋值完成后,设置state状态为终止态(NORMAL或者EXCEPTIONAL)
- 唤醒Treiber栈中所有等待的线程
- 善后清理(waiters, callable,runner设为null)
- 检查是否有遗漏的中断,如果有,等待中断状态完成
protected void set(V v) {
// 如果是NEW状态,将状态改为COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 设置返回结果
outcome = v;
// 将状态改为NORMAL
STATE.setRelease(this, NORMAL); // final state
// 主要是唤醒waiters里的线程
finishCompletion();
}
}
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 这里是把异常对象赋给outcome
outcome = t;
// 将状态变成EXCEPTIONAL
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
// 遍历等待获取任务执行结果的线程
for (WaitNode q; (q = waiters) != null;) {
// 将栈顶元素先置为空
if (WAITERS.weakCompareAndSet(this, q, null)) {
for (;;) {
// 获取等待的线程
Thread t = q.thread;
// 如果线程不为空,将线程置为空,并unpark
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
// 获取下一个结点
WaitNode next = q.next;
// 如果下一个结点为空,直接退出里面的循环,
// 由于队列的当前结点已经置为空,所以外面的循环也退出了
if (next == null)
break;
// 将当前结点的下一个结点置为空
q.next = null; // unlink to help gc
// 下一个结点作为新的栈顶结点
q = next;
}
break;
}
}
// 空方法,留给子类实现
done();
// 任务置为空
callable = null; // to reduce footprint
}
2. get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 返回任务运行的状态
s = awaitDone(false, 0L);
// 返回任务运行的结果
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
// 代表即将被放入栈中的结点
WaitNode q = null;
// 是否放入栈中,一开始为false
boolean queued = false;
for (;;) {
// 获取任务的状态
int s = state;
// 大于COMPLETING代表任务执行完成
if (s > COMPLETING) {
// 如果已经创建了WaitNode结点,将等待的线程置为空,最后返回任务运行状态
if (q != null)
q.thread = null;
return s;
}
// 任务是COMPLETING状态,说明正在处理结果,让出线程调度,等下一次循环再判断状态
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
// 如果等待的线程被打断了,从等待队列里删除该队列,并抛出异常
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 走到这,说明任务还未完成,再判断是否有等待时间,如果等待时间到了,返回状态,如果没到,创建结点
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
// 走到这,说明任务还是未完成,且等待结点已经创建了,那么将结点放入栈中
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
// 如果有等待时间
else if (timed) {
final long parkNanos;
// 还没初始化等待的一开始时间
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
// 防止虚假唤醒,重新计算park时间
} else {
long elapsed = System.nanoTime() - startTime;
// 如果等待时间到了,那么将结点从栈中移除
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
// park指定时长
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
// 将当前线程阻塞
else
LockSupport.park(this);
}
}
- 如果任务已经进入终止态(s > COMPLETING),我们就直接返回任务的状态;
- 否则,如果任务正在设置执行结果(s == COMPLETING),我们就让出当前线程的CPU资源继续等待
- 否则,就说明任务还没有执行,或者任务正在执行过程中,那么这时,如果q现在还为null, 说明当前线程还没有进入等待队列,于是我们新建了一个WaitNode, WaitNode的构造函数我们之前已经看过了,就是生成了一个记录了当前线程的节点;
- 如果q不为null,说明代表当前线程的WaitNode已经被创建出来了,则接下来如果queued=false,表示当前线程还没有入队,执行入栈操作
- 如果以上的条件都不满足,则再接下来因为现在是不带超时机制的get,timed为false,则else if代码块跳过,然后来到最后一个else, 把当前线程挂起,此时线程就处于阻塞等待的状态
- 什么时候唤醒被挂起的线程
- 任务执行完毕了,在finishCompletion方法中会唤醒所有在Treiber栈中等待的线程
- 等待的线程自身因为被中断等原因而被唤醒
private void removeWaiter(WaitNode node) {
if (node != null) {
// 现将线程置为空
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 因为已经将待删除结点的thread置为空了,这里不为空说明还没遍历到待删除的结点
if (q.thread != null)
pred = q;
// 走到这里,说明找到了待删除结点,且待删除结点不在栈顶
else if (pred != null) {
// 将待删除结点的前一个结点的下一个结点指向待删除结点的下一个结点
pred.next = s;
// removeWaiter是可以并发执行的,如果有其它线程将pred结点的thread置为空,再往后遍历已经无意义了,所以再重头遍历
if (pred.thread == null) // check for race
continue retry;
}
// 说明删除的结点是在栈顶,将栈顶替换成下一个结点
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}
- 传入一个需要移除的节点,我们会将这个节点的thread属性设置成null,以标记该节点
- 无论如何,会遍历整个链表,清除那些被标记的节点(只是简单的将节点从链表中剔除)
- 如果要清除的节点就位于栈顶,则还需要注意重新设置waiters的值,指向新的栈顶节点
- 虽说removeWaiter方法传入了需要剔除的节点,但是事实上它可能剔除的不止是传入的节点,而是所有已经被标记了的节点,这样不仅清除操作容易了些(不需要专门去定位传入的node在哪里),而且提升了效率(可以同时清除所有已经被标记的节点)
private V report(int s) throws ExecutionException {
Object x = outcome;
// 任务正常完成,返回结果值
if (s == NORMAL)
return (V)x;
// 任务未正常完成,抛出异常
if (s >= CANCELLED)
throw new CancellationException();
// 抛出异常
throw new ExecutionException((Throwable)x);
}
3. cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果任务状态不是NEW,说明任务完成,直接返回false,无法取消
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 如果mayInterruptIfRunning为真,打断执行任务的线程
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
- 任务完成了,也就是非NEW状态,或者状态更新失败,那么返回false,也就是取消失败,也就是说如果任务完成了或者任务未完成但是无法终止,就会取消失败
- 其它情况返回true,也就是取消成功,但是cancel操作返回true并不代表任务真的就是被取消了
- 如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;
- 如果发起cancel时任务已经在运行了,则这时就需要看mayInterruptIfRunning参数了:
- 如果mayInterruptIfRunning 为true, 则当前在执行的任务会被中断
- 如果mayInterruptIfRunning 为false, 则可以允许正在执行的任务继续运行,直到它执行完
- cancel方法实际上完成以下两种状态转换之一
- NEW -> CANCELLED (对应于mayInterruptIfRunning=false),这条路径,虽说cancel方法最终返回了true,但它只是简单的把state状态设为CANCELLED,并不会中断线程的执行。但是这样带来的后果是,任务即使执行完毕了,也无法设置任务的执行结果,因为前面分析run方法的时候我们知道,设置任务结果有一个中间态,而这个中间态的设置,是以当前state状态为NEW为前提的
- NEW -> INTERRUPTING -> INTERRUPTED (对应于mayInterruptIfRunning=true)