FutureTask源码解析

概述

future-task.png

   FutureTask实现了RunnableFuture接口,它既可以作为Runnable被提交给Executor去执行,又可以作为Future获取异步任务的执行结果,或者取消异步任务。一句话定义,FuntureTask代表的是一个可取消的异步任务。

FutureTask的状态机模型

  JDK1.8中,FutureTask的实现采用了状态机模型,并且不同状态的数值是经过精心设计的。FutureTask内部维护了一个状态变量,用来表示任务执行的各个阶段。

    // 使用volatile禁用线程、cpu缓存
    // 以便其它线程能立即观测到状态的改变
    private volatile int state;

    // 初始状态
    private static final int NEW          = 0;
    // 等待完成中
    private static final int COMPLETING   = 1;
    // 正常完成
    private static final int NORMAL       = 2;
    // 异常结束
    private static final int EXCEPTIONAL  = 3;
    // 已取消
    private static final int CANCELLED    = 4;
    // 响应打断中
    private static final int INTERRUPTING = 5;
    // 已打断
    private static final int INTERRUPTED  = 6;

其中,COMPLETINGINTERRUPTING是中间状态,NORMALEXCEPTIONALCANCELLEDINTERRUPTED是终态,一旦到达终态后续就不能再发生状态转换了。同时状态的转换并不是任意的,比如不能从COMPLETING -> INTERRUPTING,可能的状态转换是以下几种情形之一:

    /**
     * Possible state transitions:
     *
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */

FutureTask的实例变量

    /** 实际被执行的Callable,因为Runnable不能返回结果 */
    private Callable<V> callable;
    /** 任务的执行结果,或者执行过程中出现的异常 */
    private Object outcome;
    /** 执行Callable的线程 */
    private volatile Thread runner;
    /** 
     * Treiber stack构成的等待线程栈,
     * 因为可能会有多个线程调用Future#get()
     * 来获取任务执行结果
     */
    private volatile WaitNode waiters;

        /** 单链表实现的Treiber stack */
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

  Treiber stack这个词在整个java.util.concurrent包中出现的次数特别多,那么它到底是怎样的一个数据结构呢?简单来说,Treiber stack是利用细粒度并发原语(CAS)实现的一种无锁结构。Treiber stack内部维护链表头结点,在pushpop时通过CAS来保证线程安全,下面是一个简单的Treiber stack实现。

public class TreiberStack<E> {

    /**
     * 使用AtomicReference维护栈顶元素,用于后续CAS操作
     */
    private final AtomicReference<Node<E>> top = new AtomicReference<>();

    /**
     * 入栈时首先检查栈顶元素,只有在确定其它线程
     * 没有修改时才能入栈成功,否则进入重试
     */
    public void push(E e) {
        Node<E> old;
        Node<E> cur = new Node<>(e);
        do {
            old = top.get();
            cur.next = old;
        } while (!top.compareAndSet(old, cur));
    }

    /**
     * 出栈时也需要检查栈顶元素,其它线程没有修改时
     * 才能出栈,否则进入重试
     */
    public E pop() {
        Node<E> old;
        Node<E> cur;
        do {
            old = top.get();
            if (old == null) {
                return null;
            }
            cur = old.next;
        } while (!top.compareAndSet(old, cur));
        return old.value;
    }

    private static class Node<E> {
        final E value;
        Node<E> next;
        Node(E value) {
            this.value = value;
        }
    }
}

构造函数

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

  FutureTask实际上是通过将Runnable包装成Callable来获取异步任务的执行结果,而显示初始化state则是利用了Happens-Before中的程序次序规则和volatile变量规则来保证Callable的线程可见性:

  • 程序次序规则:在一个线程内,按照控制流顺序,书写在前面的操作Happens-Before于书写在后面的操作。
  • volatile变量规则:对一个volatile变量的写操作Happens-Before于后面对这个变量的读操作,这里的"后面"指时间上的先后顺序。

核心方法

run
/**
 * run方法保证任务只会被执行一次
 */
public void run() {
    // 1. 任务必须处于初始状态NEW
    // 2. 当前没有分配线程来执行底层的callable
    // 只有同时满足这两个条件,才能启动任务
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // callable会在任务执行完成之后置空
        // state可能在#runAndReset()中恢复到初始状态
        // 这一步检查也是为了确保确实有必要继续执行
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                // 正常执行完成
                ran = true;
            } catch (Throwable ex) {
                result = null;
                // 执行过程中遇到异常
                ran = false;
                // 转换状态到EXCEPTIONAL
                setException(ex);
            }
            // 如果正常执行完成
            if (ran)
                // 转换状态到NORMAL
                set(result);
        }
    } finally {
        // 整个任务执行期间,runner都是不为null的
        // 保证了run()方法不会被多个线程同时执行
        runner = null;
        // run()方法结束之后,state必须到达终态
        // 如果在执行过程中遭遇打断,此时状态可能
        // 还是INTERRUPTING,没有到达INTERRUPTED
        int s = state;
        if (s >= INTERRUPTING)
            // 自旋等待state到达终态INTERRUPTED
            handlePossibleCancellationInterrupt(s);
    }
}
   /**
    * #run()正常结束
    */
     protected void set(V v) {
        // CAS切换到中间状态COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 设置返回值
            outcome = v;
            // 设置为正常结束
            // 到达终态后不能再继续转换,因此可以使用lazySet
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            // 唤醒执行期间因#get()阻塞的线程
            finishCompletion();
        }
    }

    /**
     * #run()异常结束
     */
    protected void setException(Throwable t) {
        // CAS切换到中间状态COMPLETING
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            // 设置异常信息
            outcome = t;
            // 设置为异常结束
            // 到达终态后不能再继续转换,因此可以使用lazySet
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            // 唤醒执行期间因#get()阻塞的线程
            finishCompletion();
        }
    }
    /**
     * 公共的完成逻辑:清除并唤醒执行期间因#get()阻塞的线程
     */
        private void finishCompletion() {
        // assert state > COMPLETING;
        // 获取链表头节点
        for (WaitNode q; (q = waiters) != null;) {
            // 清除表头,除了局部变量q再没有其它引用指向它了
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 循环处理每一个链表节点
                for (;;) {
                    // 唤醒因#get()阻塞的线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    // 检查到达尾节点
                    WaitNode next = q.next;
                    if (next == null)
                        // 是则表示已完成
                        break;
                    // 断掉引用
                    // 这样GC时可以及时发现并清理
                    q.next = null; // unlink to help gc
                    // 转到下一个节点
                    q = next;
                }
                break;
            }
        }
        
                // 调用钩子方法
        done();
        // callable在结束时置空
        callable = null;        // to reduce footprint
        // 方法退出时,waiters实例变量为null,局部变量q超出作用域
        // 这样就完成了对整个Treiber stack的清理
    }
    /**
     * 确保在#run()退出时state到达终态
     */
    private void handlePossibleCancellationInterrupt(int s) {
            // 如果确实处于中间状态INTERRUPTING
        if (s == INTERRUPTING)
            // 那么就自旋等待
            while (state == INTERRUPTING)
                // 告诉调度器出让cpu时间
                Thread.yield(); // wait out pending interrupt
                // 按照约定,INTERRUPTING之后必然是INTERRUPTED
        // assert state == INTERRUPTED;
    }
cancel
    /**
     * 取消任务
     * @param mayInterruptIfRunning 如果已经开始执行,是否需要打断执行中的任务
     */
        public boolean cancel(boolean mayInterruptIfRunning) {
        // 只有未达到终态的任务才能取消
        if (!(state == NEW &&
              // 如果已经开始执行,根据参数决定是否打断任务的执行
              // 当然,这里也是使用CAS控制并发
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            // 如果需要打断的话
            if (mayInterruptIfRunning) {
                try {
                    // 就打断它
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    // 打断完成进入终态
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 同样,取消成功以后需要进行清理
            finishCompletion();
        }
        return true;
    }
get
    /**
     * 获取任务执行结果
     */
        public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // 未开始或执行中的任务才有可能获取到结果
        if (s <= COMPLETING)
            // 阻塞当前线程,直到终态
            s = awaitDone(false, 0L);
        // 根据状态获取结果
        return report(s);
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        // 未开始或执行中的任务才有可能获取到结果
        if (s <= COMPLETING &&
            // 等待结束后,如果仍未到达终态,则为超时
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        // 根据状态获取结果
        return report(s);
    }
    /**
     * 等待任务产生执行结果(阻塞式),或者等待过程中被打断的话,抛出异常
     * @param timed 是否是带有时间限制的等待
     * @param nanos 最长等待时间
     */
        private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // 计算等待的截止时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        // 根据LockSupport.parkXXX系列方法的文档
        // park系列方法会因为
        //  1. 其它线程调用unpark()
        //  2. 到达parkXXX指定时间
        //  3. 线程被打断
        //  4. 无理由
        // 以上4种情况返回,因此park系列方法需要保证在循环中,在方法返回时再次进行条件检测
        for (;;) {
                // 检测是否因为线程打断返回
            if (Thread.interrupted()) {
                // 是的话进行节点清理
                removeWaiter(q);
                // 抛出异常
                throw new InterruptedException();
            }
                        // 读取当前状态
            int s = state;
            // 状态机的值是经过精心设计的
            // COMPLETING之后的状态要么是终态
            // 要么是打断,而打断已经处理过了
            // 因此大于COMPLETING也就是到达了终态
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 处于COMPLETING状态,说明任务很快就会结束了
            // 这里的处理是不进行阻塞等待,而是调用Thread.yield()
            // 出让cpu时间,等待任务执行完成,从而由上一步s > COMPLETING处理
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            // 来到这里,说明s == NEW,这是真正需要进行阻塞等待的状态
            // 如果还没有节点,新建之
            else if (q == null)
                q = new WaitNode();
            // 还没有入栈的话,入栈之
            // 这两步就是一个普通的Treiber stack入栈操作了
            // 经过这两步之后,调用#get()的线程就成功加入等待队列了
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                // 计算需要等待的时间
                nanos = deadline - System.nanoTime();
                // 小于0表示等待已经结束了
                if (nanos <= 0L) {
                    // 清理节点
                    // 对应Treiber stack出栈操作
                    removeWaiter(q);
                    return state;
                }
                // 带有时间限制的等待
                LockSupport.parkNanos(this, nanos);
            }
            else
                // 无限期等待
                LockSupport.park(this);
        }
    }

        /**
         * 报告任务的执行结果
         */
        private V report(int s) throws ExecutionException {
        Object x = outcome;
        // NORMAL表示正常执行完成
        if (s == NORMAL)
            // outcome携带的是callable的返回值
            return (V)x;
        // 已取消的话,抛出异常
        if (s >= CANCELLED)
            throw new CancellationException();
        // 剩下的就是执行异常的情况了
        throw new ExecutionException((Throwable)x);
    }
isCancelled/isDone
    public boolean isCancelled() {
        // 比CANCELLED大的只有INTERRUPTING和INTERRUPTED
        // 而INTERRUPTING和INTERRUPTED同样是取消的状态之一
        // 它表示任务已经开始执行的情况下的强制取消
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

  得益于状态机数值的精心设计,isCancelledisDone可以根据状态简单的判定。注意,isDone并不仅仅表示任务正常执行结束或者执行遇到异常,任务被取消或者被打断同样也计入isDoneFutureTask的源码就分析到这里了,下次再分享一下线程池的源码分析吧。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容