并发编程—— FutureTask 源码分析

1. 前言

当我们在 Java 中使用异步编程的时候,大部分时候,我们都会使用 Future,并且使用线程池的 submit 方法提交一个 Callable 对象。然后调用 Future 的 get 方法等待返回值。而 FutureTask 是 Future 的一个实现,也是我们今天的主角。

我们就从源码层面分析 FutureTask.

2. FutureTask 初体验

我们一般接触的都是 Future ,而不是 FutureTask , Future 是一个接口, FutureTask 是一个标准的实现。在我们向线程池提交任务的时候,线程池会创建一个 FutureTask 返回。

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

newTaskFor 方法就是创建一个了一个 FutureTask 返回。

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

而线程池就会执行 FutureTask 的 run 方法。

那么,我们看看 FutureTask 的 UML。

image.png

可以看出,FutureTask 实现了 Runnable,Future 。Runnable 就不必说了,一个 run 方法,那 Future 呢?

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;   

主要是这 5 个方法撑起了 Future,功能相对而言比较薄弱,毕竟这只是一个 Future ,而不是 Promise。

FutureTask 还有一个内部类,WaitNode ,结构如下:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

看起来是不是和 AQS 的节点似曾相识呢?

FutureTask 内部维护了一个栈结构,和 AQS 的队列有所区别。

实际上,在之前的版本中,FutureTask 确实直接使用的 AQS ,但是 Doug lea 又对该类进行了优化,优化的目的是 :

主要是为了避免有些用户在取消竞争期间保留中断状态。

而内部依然使用了一个 volatile 的 state 变量来控制状态,同时使用了一个栈结构来保存等待的线程。

至于原因,当然是 FutureTask 的 get 方法是支持并发的,多个线程可以获取到同一个 FutureTask 的同一个结果,而这些线程在 get 的阻塞过程中必然是要挂起自己等待的。

知道了 FutureTask 的结构。我们知道,线程池肯定会执行 FutureTask 的 run 方法,所以,我们到他的 run 方法看看。

同时,我们也要看看关键方法 —— get 方法。

3. FutureTask 的 get 方法

代码如下:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

首先判断状态,然后挂起自己等待,最后,返回结果,代码很简单。

注意:FutureTask 中有 7 种状态:

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

构造时,状态就是 NEW,当任务完成中,状态变成 COMPLETING。当任务彻底完成,状态变成 NORMAL。

我们重点看看 awaitDone 和 report 方法。

awaitDone 方法代码:

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

上面的方法相对于 JUC 其他的类,还是比较简单的。需要注意一个点:get 方法是可以并发访问的,当并发访问的时候,需要将这些线程保存在 FutureTask 内部的栈中。

简单说说方法步骤:

  1. 如果线程中断了,删除节点,并抛出异常。
  2. 如果字体大于 COMPLETING ,说明任务完成了,返回结果。
  3. 如果等于 COMPLETING,说明任务快要完成了,自旋一会。
  4. 如果 q 是 null,说明这是第一次进入,创建一个新的节点。保存当前线程引用。
  5. 如果还没有修改过 waiters 变量,就使用 CAS 修改当前 waiters 为当前节点,这里是一个栈的结构。
  6. 根据时间策略挂起当前线程。
  7. 当线程醒来后,继续上面的判断,正常情况下,返回数据。

再看看 report 方法:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

也还是很简单的,拿到结果,判断状态,如果状态正常,就返回值,如果不正常,就抛出异常。

总结一下 get 方法:

FutureTask 通过挂起自己等待异步线程唤醒,然后拿去异步线程设置好的数据。

4. FutureTask 的 run 方法

上面总结说,FutureTask 通过挂起自己等待异步线程唤醒,然后拿去异步线程设置好的数据。

那么这个过程在哪里呢?答案就是在 run 方法里。我们知道,线程池在执行 FutureTask 的时候,肯定会执行他的 run 方法。所以,我们看看他的 run 方法:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

方法逻辑如下:

  1. 判断状态。
  2. 执行 callable 的 call 方法。
  3. 设置结果并唤醒等待的所有线程。

看看 set 方法是如何设置结果的:

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

先将状态变成 COMPLETING,然后设置结果,再然后设置状态为 NORMAL,最后执行 finishCompletion 方法唤醒等待线程。

finishCompletion 代码如下:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

该方法先将 waiters 修改成 null,然后遍历栈中所有节点,也就是所有等待的线程,依次唤醒他们。

最后执行 done 方法。这个方法是留个子类扩展的。FutureTask 中是个空方法。比如 Spring 的 ListenableFutureTask 就扩展了该方法。还有 JUC 里的 QueueingFuture 类也扩展了该方法。

如果异常了就将状态改为 EXCEPTIONAL。

如果用户执行了 cancel(true)方法。该方法 Java doc 如下:

试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。

也就是说,这个 mayInterruptIfRunning 决定当任务已经在执行了,还要终止这个任务。如果 mayInterruptIfRunning 是 true ,就会先将状态改成 INTERRUPTING,然后调用线程的 interrupt 方法,最后,设置状态为 INTERRUPTED。

在 run 方法的 finally 块中,对 INTERRUPTING 有判断,也就是说,在 INTERRUPTING 和 INTERRUPTED 的这段时间,会执行 finally 块,那么这个时候,就需要自旋等待状态变成 INTERRUPTED。

具体代码如下:

private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}

5. 总结

关于 FutureTask 就介绍完了,该类最重要的就是 get 方法和 run 方法,run 方法负责执行 callable 的 call 方法并设置返回值到一个变量中, get 方法负责阻塞直到 run 方法执行完毕任务唤醒他,然后 get 方法回去结果。

同时,FutureTask 为了多线程可以并发调用 get 方法,使用了一个栈结构保存所有等待的线程。也就是说,所有的线程都等得到 get 方法的结果。

虽然 FutureTask 的设计很好,但我仍然觉得使用异步是更好的选择,效率更高。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容