Java8源码阅读 - FutureTask

FutureTask是一个表示可异步计算任务的抽象,扩展了Runnable和Future接口,意味着拥有二者相结合的特性;该类提供了启动、取消计算、查询是否完成计算以及检索计算结果的方法。结果只有在计算完成后才可检索,如果计算还没有完成,检索方法将被阻塞直达完成。一旦任一线程完成了计算,就不能再次重新启动或取消计算。

Treiber Stack

Treiber Stack在 R. Kent Treiber在1986年的论文Systems Programming: Coping with Parallelism中首次出现。它是一种无锁并发栈,其无锁的特性是基于CAS原子操作实现的。

Java并发编程实战上面关于Treiber算法的实现

可以看到就是通过cas修改栈顶指针来保证线程安全性的,同时AtomicReference#compareAndSet可以提供可见性,与volaitle类型的变量有着相同的内存效果;

FutureTask的实现

private volatile int state;
private static final int NEW          = 0; // 新建FutureTask时所处的状态
private static final int COMPLETING   = 1; // 表示已经完成计算后的状态,过度状态
private static final int NORMAL       = 2; // 计算过程非异常,结束计算后的状态
private static final int EXCEPTIONAL  = 3; // 计算过程异常,结束计算后的状态
private static final int CANCELLED    = 4; // 取消状态,还未发生计算
private static final int INTERRUPTING = 5; // 表示已经遇到中断,过度状态
private static final int INTERRUPTED  = 6; // 中断的最终状态
// 状态转变可能的过程
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

FutureTask可处的状态有以上几种,除了COMPLETINGINTERRUPTING是中间过渡状态,其余的都是最终状态;

FutureTask<Integer> futureTask = new FutureTask<>(() -> {
    Thread.sleep(2000);
    System.out.println("Complete the calculation ... ");
    return new Random().nextInt(100);
});
new Thread(() -> {
    Thread.sleep(1000);
    System.out.println("Start task ... ");
    futureTask.run();
}).start();
new Thread(() -> {
    try {
        System.out.println("Blocking get until complete the calculation ... ");
        System.out.println("Thread1:" + futureTask.get());
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
    }
}).start();
new Thread(() -> {
    try {
        System.out.println("Blocking get until complete the calculation ... ");
        System.out.println("Thread2:" + futureTask.get());
    } catch (InterruptedException e) {
    } catch (ExecutionException e) {
    }
}).start();
// 结果
Blocking get until complete the calculation ...
Blocking get until complete the calculation ...
Start task ...
Complete the calculation ...
Thread1:69
Thread2:69

FutureTask的一个使用场景,逐步拆分内部实现;

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // 确保callable的可见性
}
// 指定返回结果
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; 
}

提供了两个构造区,区别是其中一个在构造器中指定了计算的结果,并在每次初始化时将状态设置为NEW

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        // 状态还是COMPLETING或者NEW时会进入阻塞状态
        s = awaitDone(false, 0L);
    return report(s);
}

FutureTask没有完成任务时获取结果会被阻塞,通过上面提到的Treiber Stack结构来维护阻塞队列;

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}
private volatile WaitNode waiters; // 栈定指针

每个节点就是表示一个线程,入栈操作如下

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    // 是否需要超时处理
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        // 中断处理
        if (Thread.interrupted()) {
            removeWaiter(q); // 如果遇到中断,将q移除队列
            throw new InterruptedException(); // 向上抛出中断异常
        }
        int s = state;
        if (s > COMPLETING) {
            // 保证每个task只能被执行一次
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)
            // 其他线程已经完成计算,但是还是处理结果值
            // 该线程先尝试让出时间片,等其他线程处理完成
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            // 竞争失败或者第一次竞争资源的线程需要通过cas来竞争
            // 相当于Treiber Stack的入栈操作
            // q.next = waiters;
            // UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                // 超时时间过来将节点移除队列然后返回
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos); // 阻塞自己
        }
        else
            LockSupport.park(this); // 阻塞自己
    }
}

大致的逻辑就是自旋直到入栈成功后将自己阻塞,大概的逻辑还是很简单的,这里有个值得思考的地方就是为什么需要一个过渡状态COMPLETING,直接将状态由NEW转为最终状态不可行吗?

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) { 
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next; // q是当前正在遍历的节点
                if (q.thread != null) 
                    pred = q; // 该节点不需要被移除队列,向后遍历
                else if (pred != null) {
                    // q.thread == null, q节点需要被移除,并且q节点不是栈顶
                    pred.next = s; // 相当于 q.front.next = q.next
                    if (pred.thread == null) // 将节点衔接后重新检查q.front是否被移出队列
                        continue retry;
                }
                // q是栈顶且q.thread==null,将栈顶元素移除
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                    continue retry; // cas失败,重新遍历节点
            }
            break;
        }
    }
}

移除等待队列的元素,核心逻辑是将元素的线程置为null,然后将元素前置节点和元素的后继节点衔接,这里有个疑问,为什么衔接完成后需要判断是否把前置节点的线程置为null,如果没有这个判断会怎么样?

可以总结下将节点的thread置为null的操作会发生的场景:

  • removeWaiter
  • finishCompletion
  • awaitDone

那么这个判断成立的场景有下面类似于下面

  1. 队列上面的节点都是处于等待中的
  2. 在awaitDone执行的过程中,其他线程已经完成或取消或中断了futureTask的计算任务;
  3. 多个线程执行removeWaiter,恰好把当前线程移出队列后又把前置节点也移出了队列;

根本的原因就是多线程下产生对同一个数据结构修改需要重新遍历列表,那么不要这个重新遍历列表行吗,首先没有这个重新遍历列表操作,那么会从当前节点继续向下遍历直到遍历完成,前置节点尽管已经没有实质作用,但是还是在队列中,有了这个检查就能尽早的将废弃节点移出队列,所以这个检查的作用是尽可能的缩小队列的长度,因为每一个移出元素操作都需要遍历一次节点,那么这个开销还是比较大的;

// s是任务的当前状态
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

get方法最后会通过report返回计算结果,计算结果通过outcome字段保存,除了是计算结果之外还有可能是异常;

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
        // state != NEW 或者runner已经被其他线程占用
        // 保证了每个task只能被执行一次
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call(); // 执行
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner必须是非空的直到最终的确定状态,以防止对run()的并发调用
        runner = null;
        // 可能在计算的过程中使用了cancel导致状态变成中断
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

protected void set(V v) {
    // 计算完成后先设置一个中间状态COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        // putOrderedInt延迟设置状态,作用是优化对volatile字段写的开销,
        // 副作用就是写入数据可能并不会被其他线程马上看到
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // lazy set
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    // 计算完成后先设置一个中间状态COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // lazy set
        finishCompletion();
    }
}

private void finishCompletion() {
    // 任务完成,将所有等待队列节点唤醒
    for (WaitNode q; (q = waiters) != null;) {
        // 将栈顶元素置null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); // 唤醒阻塞线程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    done();
    callable = null; 
}

private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
        // INTERRUPTING状态是个过渡状态
        while (state == INTERRUPTING)
            Thread.yield(); // 让出时间片直到cancel函数完成,状态变为INTERRUPTED
}

过程比较简单易懂,但是还是之前的那个问题,为什么需要一个中间状态COMPLETING来过渡,而不是直接由NEW转换到NORMALEXCEPTIONAL

首先run中已经保证了只有一个线程能执行call(),那么也就是一个task只能执行一次setException或者set,那么按道理来说UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)替换成UNSAFE.compareAndSwapInt(this, stateOffset, NEW, NORMAL)或者UNSAFE.compareAndSwapInt(this, stateOffset, NEW, EXCEPTIONAL)貌似也没有什么问题,如果改写成上面这个样子,那么相应的awaitDone可以变成

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    ... 
    for (;;) {
        ...
        int s = state;
        if (s  == NORMAL || s == EXCEPTIONAL || s == CANCELLED 
           || s == INTERRUPTING || s == INTERRUPTED) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            ...
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this); 
    }
}

所以推测增加COMPLETING状态只是为了让状态中转逻辑更加清晰,状态判断时代码可以更加整洁;

结尾

看源码的时候喜欢思考这段代码到底为什么这样写,很多时候甚是不能理解,难道其他的写法就不是最优解了吗,想要逐字逐句的理解每行代码的含义,遇到这种情况的时候可能是思考的角度错了,看源码的时候个人认为不应该钻牛角尖,可以试着从旁侧击,比如让自己来实现时会怎么处理这块逻辑,相比源码下有何优势和劣势,有些时候可能只是作者个人的写法偏好,看到最后无论是否真的理解,最重要的是看源码的时候自己有思考,还有就是怀疑的态度,因为怀疑往往就会强迫自己思考,也许最后发现是xjb乱想,但是思考的过程所带来的价值有时可能比结果更加重要;

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342