Future、CompletableFuture与dubbo异步调用

Future

在java 8之前,我们可以使用Callable+Future来异步执行任务和获取结果,比如

ExecutorService service = new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
Future<String> f = service.submit(()->{
                    Thread.sleep(200);
                    return "helloWorld";
                }
        );
        System.out.println(f.get(300,TimeUnit.MILLISECONDS));

其获取结果,get方法实现本质是轮询校验结果状态积,阻塞实现依赖的是LockSupport.park()方法。
那么在dubbo交给Apache进行孵化之前的版本中,比如2.6.1版本中,其异步调用机制ResponseFuture的实现就借鉴了jdk的Future的模式,以DubboInvoker#doInvoke方法为例

if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }

可以看到,同步与异步的本质区别就是调用get()方法的时机不同,同步调用的话,请求的同时由dubbo线程直接调用get方法阻塞,获取结果;而异步调用,dubbo直接返回RpcResult,后续由业务线程再来调用get方法获取结果。

dubbo虽然借鉴了jdk的Future,但是代码全部是自己写的,以DefaultFuture#get()为例

public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

可以看到,dubbo的DefaultFuture实现,主要依赖lock+condition的模式,不是jdk Future的LockSupport.park()模式。
这种模式的缺点有很多,最大的缺点就是结果获取是阻塞的。

CompletableFuture

在java 8之后,jdk引入了CompletableFuture类,可以看到其实现了Future和CompletionStage,所以我们可以继续像使用Future一样使用CompletableFuture。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 

那么CompletionStage 是做什么的呢,用类文件注释的第一句话说,其代表一种异步阶段,执行一些行为或者计算,执行完毕后,会触发其他CompletionStage的执行。

A stage of a possibly asynchronous computation, that performs an
 action or computes a value when another CompletionStage completes.

相较于Future,CompletableFuture提供的很多新特性都依赖与这个CompletionStage,这里主要介绍其在dubbo异步调用中的应用,其他特性不多介绍,重点介绍下其回调机制,先看用法

CompletableFuture<String> f = new CompletableFuture();
        try {
            f.whenComplete((v,t)->{
                if(t!=null){
                    System.out.println("Exception");

                }else{
                    System.out.println(v);
                }

            });
            f.complete("HelloWorld");

当CompletableFuture拿到结果的时候,会回调whenComplete方法注册的回调逻辑,其核心实现见CompletableFuture#postComplete, 用注释的话说,每一步,这个stack会pop and run。回调也是基于此实现(Doug Lea大神的作品不是简单能说明白的,后续再开一文研究)

/**
     * Pops and tries to trigger all reachable dependents.  Call only
     * when known to be done.
     */
    final void postComplete() {
        /*
         * On each step, variable f holds current dependents to pop
         * and run.  It is extended along only one path at a time,
         * pushing others to avoid unbounded recursion.
         */
        CompletableFuture<?> f = this; Completion h;
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
            if (f.casStack(h, t = h.next)) {
                if (t != null) {
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    h.next = null;    // detach
                }
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }

那么dubbo的异步调用是怎么利用这个回调机制的呢?见DubboInvoker#doInvoke (2.7.3版本)

 if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                asyncRpcResult.subscribeTo(responseFuture);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(responseFuture);
                return asyncRpcResult;
            }

之前2.6.1版本中,同步异步的区别是谁来调get()方法,那么在2.7.3版本,DubboInvoker对同步异步调用的处理直接统一了,都会返回一个AsyncRpcResult, 这个AsyncRpcResult本身就继承自CompletableFuture,同时其会subscribe一个响应的CompletableFuture,这里就有了两个CompletableFuture;那么subscribe做了什么呢?

public void subscribeTo(CompletableFuture<?> future) {
        future.whenComplete((obj, t) -> {
            if (t != null) {
                this.completeExceptionally(t);
            } else {
                this.complete((Result) obj);
            }
        });
    }

subscribe会对响应CompletableFuture注册了一个回调,响应完成时,触发这个回调;这个回调逻辑就是执行AsyncRpcResult自身的complete方法,那么如果AsyncRpcResult也有注册回调,此时就会被链式触发。
新版本的dubbo既然在DubboInvoker这里对于同步异步的处理是一样的,都是直接返回一个AsyncRpcResult,那么对于我们使用者来说,怎么来区别同步和异步呢?其实关键就在于怎么用这个AsyncRpcResult。如果我们拿到AsyncRpcResult直接get,可以认为这就是同步调用,如果我们拿到AsyncRpcResult,不去调用get,而是去注册一个回调函数,等待链式触发,用回调的方式拿结果,那么这就是异步。

总结:老版本dubbo的异步调用可以认为是假异步,因为结果的获取是阻塞的,新版本随着jdk引入CompletableFuture,由于回调机制的存在,我们业务代码使用dubbo时候,也可以注册回调,实现真正的异步非阻塞。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353

推荐阅读更多精彩内容