Java Future的实现原理

前阵子在用C++ 98(是比较落后了,嗯,C++11原生支持Future)开发的时候,对脱离业务的公共逻辑抽象出来了一个简单的任务执行框架,里面主要是线程池和一些同步异步的任务。在开发异步任务的时候,为了实现类似java Future模式的能力,对实现方式考量了好久,最终使用了信号量这么重的东西来实现了Future的能力,同时也不禁对java的Future实现产生兴趣,java的Future是怎么实现的呢?

concurrent 包

java Future相关的代码基本都在java.util.concurrent的包里面,Future.java是一个接口,定义了最基本的一些任务操作和状态判断。

// Future.java

boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit);

我们从FutureTask.java去了解Future的内部机制,FutureTaskFuture有比较接地气的实现,其他的实现或多或少都加入了新的一些特性,对了解原理没太大帮助。

FutureTask.java是对FutreRunnable最简单的实现,所以FutureTask是一个可执行的异步对象。

FutureTask的状态

FutureTask定义了7种状态,这7种状态里面包含了简单的状态机,使用了一个用volatile修饰的int来记录状态。如下:

/** Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

状态的变换基本在FutureTask.java的所有函数中都有体现,我们来看看几个典型的状态变换:

  1. 构造函数

    构造函数完成时,将状态置为NEW

  2. 取消操作

    取消操作对任务状态进行判断。

    1. 如果任务正在执行但没有完成时,发出中断,并将任务状态置为中断中状态,并在执行线程完成后,置为中断完成状态NEW -> INTERRUPTING -> INTERRUPTED

    2. 当任务还没有执行,则直接置为取消状态 NEW -> CANCELLED

  3. 执行操作

    执行分执行成功和执行异常两种,状态转换路径是这两个。

    NEW -> COMPLETING -> NORMALNEW -> COMPLETING -> EXCEPTIONAL

FutureTask的执行

FutureTask因为封装了Runnable的接口,实现了run函数,所以可以直接执行,直接执行使用主线程;FutureTask的另外一种执行方式是提交到线程池去执行,由线程池去分配执行线程;只有提交到线程池去执行才能体现异步的特性。不过我们不关注执行的方式,我们关注执行的逻辑。

FutureTask的构造函数传入了CallableRunnable对象,也即是需要执行的业务逻辑,他是业务逻辑的基本表现形式,保存在类属性callable,在run函数里面,调用callalbe.call()来执行业务逻辑。run函数主要完成以下几个操作。

  1. 判断当前状态是否为NEW,如果不是,说明任务被执行过,或者已被取消,直接返回。
  2. 如果状态为NEW接着会通过unsafe类把任务执行线程保存在runner字段中,如果保存失败,则直接返回
  3. 执行任务
  4. 任务执行成功,set保存结果,不成功setException保存异常信息,任务的状态在这里改变。

以下是run函数的逻辑。

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread())) // 1, 2
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call(); // 3
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex); // 4
            }
            if (ran)
                set(result); // 4
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

由于在任务执行的过程中,可能被取消,所以在finally块里,会任务的根据状态来做一些善后的工作。

FutureTask的取消

任务的取消比较简单,我们知道,在执行的时候,执行任务的线程会保存在runner属性中,所以对于正在执行的任务,取消的本质就是将执行的线程取出来,向该线程发出interupt信号。但对于一个较为完备的取消动作,cancel做了一下几个动作。

  1. 判断任务当前执行状态,如果任务状态不为NEW,则说明任务或者已经执行完成,或者执行异常,不能被取消,直接返回false表示执行失败。

  2. 判断需要中断任务执行线程,则

    1. 把任务状态从NEW转化到INTERRUPTING。这是个中间状态。
    2. 中断任务执行线程。
    3. 修改任务状态为INTERRUPTED
  3. 如果不需要中断任务执行线程,直接把任务状态从NEW转化为CANCELLED。如果转化失败则返回false表示取消失败。

  4. 调用finishCompletion

    这个函数将阻塞在等待这个任务完成的线程唤醒,具体操作是LockSupport.unpark(t),这些线程都是在awaitDone函数内的LockSupport.park(this)中阻塞的,关于awaitDone函数的作用后文还会继续介绍。finishCompletion除了在这里有调用以外,在setsetException中也有调用。

    以下是finishCompletion的实现

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;        // to reduce footprint
    }
    

以下是cancel函数的实现。

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // 1, 2
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // 3
            }
        }
    } finally {
        finishCompletion(); // 4
    }
    return true;
}

FutureTask的结果

异步任务提交到线程池去执行后,由于无法预知什么时候结束,所以必须得提供接口来获取任务执行的结果,在FutureTask中,get函数用来获取任务执行的结果。该函数有了设置超时和不超时的两种实现。

除去超时的差异,get操作对任务的状态进行判断,当状态还没有完成的时候,调用awaitDone函数来等待完成,我们在上面提到过这个函数,对于未完成的任务awaitDone阻塞,而finishCompletion唤醒阻塞。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

我们来详细看看awaitDone函数做了什么操作,它是如何阻塞的,它是怎么设置超时和完成返回的。

awaitDone主题是一个死循环,轮询判断任务的状态。

  1. 当执行的线程被中断时,调用removeWaiter移除等待节点WaitNode,抛出中断异常

  2. 当状态为已经完成,直接返回

  3. 当状态为完成中,通过Thread.yield()让出CPU时间

  4. 如果当前线程还没有创建WaitNode等待节点保存到等待队列里面去,则新建一个等待节点,插入到等待链表,表明当前线程也准备进入等待该任务完成的队列中去。

  5. 最后是进入阻塞的动作,通过LockSupport.park,如果设置了超时的时间,则将时间作为参数传递到park中去。

    综上,awaitDone函数除了对状态的判断以外,核心就是LockSupport的阻塞等待完成的操作了。我们后面还会探讨一下LockSupport

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
    

awaitDone的阻塞完成以后,就会将结果返回,将结果返回是通过report函数来实现的,返回的是执行完成的结果或者是执行中获得的异常信息。

LockSupport

异步执行任务,在获取任务状态时,阻塞是必然的,在开头引子我简单的提到了在自己实现的那个简陋的框架内,使用了信号量的方式来设置异步任务的阻塞和执行状态。在这里是通过LockSupport来实现的,阻塞(park)和唤醒(unpark)。

public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
}

可以看到LockSupport则是通过UNSAFE的同名函数来实现的。java不能直接访问操作系统底层,而是通过本地方法来访问。Unsafe类提供了硬件级别的原子操作。

Unsafe类是在sun.misc包下,不属于java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Cassandra、Hadoop、Kafka等。Unsafe类在提升Java运行效率,增强Java语言底层操作能力方面起了很大的作用。Unsafe类使Java拥有了像C语言的指针一样操作内存空间的能力,同时也带来了指针的问题。过度的使用Unsafe类会使得出错的几率变大,因此Java官方并不建议使用的,官方文档也几乎没有。

所以在这里java也是使用了C级别的线程同步机制来完成这些操作的,在这就不再展开了。


为保持文章内容的连贯性,部分内容参考自:

  1. https://www.skyreal.me/future-task-yuan-ma-jie-xi/
  2. http://beautyboss.farbox.com/post/study/shen-ru-xue-xi-futuretask

原文链接

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容