Future和CompletableFutrue

作为一个java为主语言的后端开端,实际使用异步的场景也不多,对于Futrue等仅仅在会使用的阶段,内部的原理一知半解,借着学习CompletableFutrue的机会,重新整理自己的知识体系

一、Future

参考资料
Future、FutureTask实现原理浅析[https://blog.csdn.net/u012881584/article/details/85121144]

1.1 Future和FutureTask

Future只是一个接口,且只有一个get方法
FutureTask是真正的内部实现,他是一个实现Runnable, Future<V>的类

  • 本质上是调用线程,调用后,会park住,而真正的执行线程完成执行后,会将这些线程全部唤醒

1.2 Future.get时发生了什么?

对于使用者来说,我们基本上会用future.get即可,那么此时到底发什么了什么?
查看FutureTask.get方法

public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

大致上,可以看到FutureTask有一个state的标记位,当state满足要求时,返回结果。
当不满足时,会awaitDown,等待执行完成或超时

  • awaitDown是如何实现的
    FutureTask内部有一个单向链表,可以认为value为当前的线程
  static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

image.png

image.png

可以得知,主流程就是第一次进来创建一个节点,然后将当前线程赋值,如果状态还是执行中,就调用park方法将当前线程暂停。如果有多个线程还要get,则将这些线程放到单向列表中

这就起到了阻塞的作用

1.3 FutureTask如何异步获得结果

上文的get方法最终会阻塞,那么肯定会有一个方法执行后去唤醒暂停的线程
查看FutureTask的run方法,前面都是常规的调用实际方法,获得result,关键看获得result后干了什么


image.png

image.png

这里有一个状态的改变state从NEW->COMPLETING


image.png

可以看到,这里会通过一个for循环,将所有waitnode里的线程全部唤醒,异步调用完成

二、CompletableFuture

了解了Future的使用,这里就要谈谈Future的局限性。Future很难直接表述多个Future 结果之间的依赖性,开发中,我们经常需要达成以下目的:

Future vs CompletableFuture
Futrue在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来 判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

Future的主要缺点如下:

  • 不支持链式调用
    这个指的是对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。
  • 不支持多个Future合并
    比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。
  • 不支持异常处理
    Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。

2.1 CompletableFuture的使用

  • 1 .因为CompletableFuture实现了Future接口,所以你自然可以把他当future使用,实现原理基本相同,只是没有利用futuretask,而是在类自身就完成了

  • 2.因为CompletableFuture实现了CompletionStage,所以可以完成链式调用

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("hello world");
            return "result";
        }).thenApply(r -> {
            System.out.println("r");
            return r + "2";
        });
        System.out.println(a.get());
    }

2.2 CompletableFuture的实现

异步执行的思路主要是以下3步:
1.执行任务
2.添加任务完成之后的动作(回调方法)
3.执行回调

2.2.1执行任务

和future相同,supplyAsync

 public final boolean exec() {
            CompletableFuture<U> d; U u; Throwable ex;
            if ((d = this.dst) != null && d.result == null) {
                try {
                    //执行任务的代码
                    u = fn.get();
                    ex = null;
                } catch (Throwable rex) {
                    ex = rex;
                    u = null;
                }
                d.internalComplete(u, ex);
            }
            return true;
        }

2.2.2执行回调

再看这个肯定会有一个执行回调的方法,就是在d.internalComplete(u, ex)里最终调用的postComplete,顾名思义就是在执行完成后调用

/**
     * Removes and signals all waiting threads and runs all completions.
     */
    final void postComplete() {
        WaitNode q; Thread t;
        while ((q = waiters) != null) {
            if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
                (t = q.thread) != null) {
                q.thread = null;
                LockSupport.unpark(t);
            }
        }

        !从任务链的头部开始执行回调任务,这里的语法比较精炼,nodes不断遍历,h只是一个保存中间状态的节点
        CompletionNode h; Completion c;
        while ((h = completions) != null) {
            if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
                (c = h.completion) != null)
                c.run();
        }
    }

可以看到waitNode和future是一样的,唤醒所有调用get方法park住的线程。但后面还有个CompletionNode ,这个node就是一个任务链,属性completion实现了Runnable,是一个可以调度的任务

static final class CompletionNode {
        final Completion completion;
        volatile CompletionNode next;
        CompletionNode(Completion completion) { this.completion = completion; }
    }

    // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
    @SuppressWarnings("serial")
    abstract static class Completion extends AtomicInteger implements Runnable {
    }

2.2.3添加回调任务

既然已经看懂了回调任务的执行,那么肯定会有一个环节添加任务完成之后的动作
例子中thenApply()就是一个异步任务执行完后需要完成的动作,即回调,我们可以看看内部实现,是什么时候添加进去的

 private <U> CompletableFuture<U> doThenApply
        (Function<? super T,? extends U> fn,
         Executor e) {
        if (fn == null) throw new NullPointerException();
        CompletableFuture<U> dst = new CompletableFuture<U>();

        !ThenApply是一个继承Completion的任务
        ThenApply<T,U> d = null;

        Object r;
        !如果当前任务没完成,则创建一个任务节点,添加任务,
        !将任务节点追加的当前completableFuture的任务链中,可以看到这里追加是追加在头部的
        if ((r = result) == null) {
            CompletionNode p = new CompletionNode
                (d = new ThenApply<T,U>(this, fn, dst, e));
            while ((r = result) == null) {
                if (UNSAFE.compareAndSwapObject
                    (this, COMPLETIONS, p.next = completions, p))
                    break;
            }
        }
        !如果当时任务已经完成了,我们主要看没有报错的情况,直接执行 u = fn.apply(t);直接当前线程就执行了
        if (r != null && (d == null || d.compareAndSet(0, 1))) {
            T t; Throwable ex;
            if (r instanceof AltResult) {
                ex = ((AltResult)r).ex;
                t = null;
            }
            else {
                ex = null;
                @SuppressWarnings("unchecked") T tr = (T) r;
                t = tr;
            }
            U u = null;
            if (ex == null) {
                try {
                    if (e != null)
                        execAsync(e, new AsyncApply<T,U>(t, fn, dst));
                    else
                        u = fn.apply(t);
                } catch (Throwable rex) {
                    ex = rex;
                }
            }
            if (e == null || ex != null)
                dst.internalComplete(u, ex);
        }
        helpPostComplete();
        return dst;
    }

从以上分析可以看出,其实你的任务真正是哪个线程执行的,还和你前序任务的执行时间有关,我们做个测试

  • 第一种情况,前序的任务执行很快
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("ThreadName:"+Thread.currentThread().getName());
            return "result";
        }).thenApply(r -> {
            System.out.println("ThreadName:"+Thread.currentThread().getName());
            return r + "2";
        });
        System.out.println(a.get());
    }

结果为,可以看到是主线程执行的

ThreadName:ForkJoinPool.commonPool-worker-1
ThreadName:main
result2
  • 第二种情况,前序的任务执行比较慢
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("ThreadName:"+Thread.currentThread().getName());
            try {
                Thread.sleep(3*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "result";
        }).thenApply(r -> {
            System.out.println("ThreadName:"+Thread.currentThread().getName());
            return r + "2";
        });
        System.out.println(a.get());
    }

结果为,可以看到是第一个异步任务的线程执行了

ThreadName:ForkJoinPool.commonPool-worker-1
ThreadName:ForkJoinPool.commonPool-worker-1
result2

2.3 CompletableFuture的异常处理

多线程的异常总是比较复杂的,当你真正要在生产运行时,你肯定得考虑到各种异常分支,特别是当你大量使用链式调用后,此时如果某一个任务出现了异常,后续会如何处理?
先看执行结果为result,可以看到要么是真正的result,要么是一个 AltResult,里面其实就是exception

    volatile Object result;    // Either the result or boxed AltResult
static final class AltResult {
        final Throwable ex; // null only for NIL
        AltResult(Throwable ex) { this.ex = ex; }
    }
  • 当程序出现异常发生时,result会变成什么?即最终的CompletableFuture会变成什么?
    这里三元运算符比较多,得慢慢理。我们来理一下有异常,即ex!=null
    1.先判断ex是否为null,此处不为null
    2.取 new AltResult((ex instanceof CompletionException) ? ex :
    new CompletionException(ex)));
    3.此处ex假设不是CompletionException,则转化为CompletionException
    4.最终包装成AltResult
/**
     * Triggers completion with the encoding of the given arguments:
     * if the exception is non-null, encodes it as a wrapped
     * CompletionException unless it is one already.  Otherwise uses
     * the given result, boxed as NIL if null.
     */
    final void internalComplete(T v, Throwable ex) {
        if (result == null)
            UNSAFE.compareAndSwapObject
                (this, RESULT, null,
                 (ex == null) ? (v == null) ? NIL : v :
                 new AltResult((ex instanceof CompletionException) ? ex :
                               new CompletionException(ex)));
        postComplete(); // help out even if not triggered
    }
  • 现在我们知道了异常最终也会包含在CompletaFuture中,那么实际业务代码要如何编写呢?
public T get() throws InterruptedException, ExecutionException {
        Object r; Throwable ex, cause;
        if ((r = result) == null && (r = waitingGet(true)) == null)
            throw new InterruptedException();
        if (!(r instanceof AltResult)) {
            @SuppressWarnings("unchecked") T tr = (T) r;
            return tr;
        }
        if ((ex = ((AltResult)r).ex) == null)
            return null;
        if (ex instanceof CancellationException)
            throw (CancellationException)ex;
        if ((ex instanceof CompletionException) &&
            (cause = ex.getCause()) != null)
            ex = cause;
        throw new ExecutionException(ex);
    }

当你最后调用get方法时,会抛出异常。这时候你可以通过handle进行处理,他的功能时最强大的
参考文档:https://mincong.io/2020/05/30/exception-handling-in-completable-future/
如:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("ThreadName:" + Thread.currentThread().getName());
            int b=1/0;
            return "result";
        }).thenApply(r -> {
            System.out.println("ThreadName:" + Thread.currentThread().getName());
            return r + "2";
        }).handle((result,ex)->{
           if (ex!=null){
               System.out.println("Error:"+ex.toString());
               return null;
           }else {
               System.out.println("success:"+result);
               return result;
           }
        });
        System.out.println(a.get());
        Thread.sleep(10*1000);
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容