Dubbo源码分析----过滤器之FutureFilter

FutureFilter主要是用来处理事件通知的过滤器,这么讲可能不太清楚,先看下下面的demo
先定一个是事件通知的类:

public class Notify {
    public void oninvoke(String msg){
        System.out.println("oninvoke:" + msg);
    }
    public void onreturn(String msg) {
        System.out.println("onreturn:" + msg);
    }
    public void onthrow(Throwable e) {
        System.out.println("onthrow:" + e);
    }
}

然后xml配置如下:

    <bean id="notify" class="com.alibaba.dubbo.demo.consumer.Notify"/>

    <dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" check="false" timeout="5000">
        <dubbo:method name="sayHello" onreturn="notify.onreturn"/>
    </dubbo:reference>

注意dubbo:method的配置,有个onreturn属性,意思是在return结果的时候调用Notify的onreturn方法,onthrow和oninvoke同理,实现这个处理的逻辑就在FutureFilter中,看下实现

@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {
//....
    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);// 判断dubbo:method中async属性的值,即是否异步
        
        fireInvokeCallback(invoker, invocation);// oninvoke方法的处理
        //需要在调用前配置好是否有返回值,已供invoker判断是否需要返回future.
        Result result = invoker.invoke(invocation);
        if (isAsync) {
            asyncCallback(invoker, invocation);// 异步回调oninvoke和onthrow
        } else {
            syncCallback(invoker, invocation, result);// oninvoke和onthrow的处理
        }
        return result;
    }
//....
}

fireInvokeCallback方法如下:

    private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
        //onInvokeMethod 即为java的一个Method对象,代表Notify的onInvoke方法,这个很好理解
        final Method onInvokeMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
        // Notify对象
        final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
        // 没有设置的话就直接返回
        if (onInvokeMethod == null  &&  onInvokeInst == null ){
            return ;
        }
        if (onInvokeMethod == null  ||  onInvokeInst == null ){
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() +" has a onreturn callback config , but no such "+(onInvokeMethod == null ? "method" : "instance")+" found. url:"+invoker.getUrl());
        }
        
        if (onInvokeMethod != null && ! onInvokeMethod.isAccessible()) {
            onInvokeMethod.setAccessible(true);
        }
        
        Object[] params = invocation.getArguments();
        try {
            onInvokeMethod.invoke(onInvokeInst, params);//反射调用
        } catch (InvocationTargetException e) {// 异常情况调用onthrow配置的方法
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }

syncCallback方法如下:

    private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
        if (result.hasException()) {//异常情况触发onthrow
            fireThrowCallback(invoker, invocation, result.getException());
        } else {//否则触发onreturn
            fireReturnCallback(invoker, invocation, result.getValue());
        }
    }

fireReturnCallback方法如下:

    private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
        //....和oninvoke处理一样
        
        Object[] args = invocation.getArguments();
        Object[] params ;
        Class<?>[] rParaTypes = onReturnMethod.getParameterTypes() ;
        if (rParaTypes.length >1 ) {// 如果Notify方法的参数有多个
            // 有两个参数,且第二个参数为Object或者Object的数组
            if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)){
                // 构造两个参数的数组,一个为结果result,一个为请求入参
                params = new Object[2];
                params[0] = result;
                params[1] = args ;
            }else {
                // 这种情况,假设Notify有3个参数,如果本来方法入参有2个
                // 那么第一个为结果,后面为入参,如果入参只有1个,那么会导致异常,因为参数不匹配
                params = new Object[args.length + 1];
                params[0] = result;
                System.arraycopy(args, 0, params, 1, args.length);
            }
        } else {
            params = new Object[] { result };
        }
        try {
            onReturnMethod.invoke(onReturnInst, params);
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }

这种情况下和onvoke多了参数的一些判断

asyncCallback方法如下:

    private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
        Future<?> f = RpcContext.getContext().getFuture();
        if (f instanceof FutureAdapter) {
            ResponseFuture future = ((FutureAdapter<?>)f).getFuture();
            future.setCallback(new ResponseCallback() {
                public void done(Object rpcResult) {
                    //....
                    Result result = (Result) rpcResult;
                    if (result.hasException()) {
                        fireThrowCallback(invoker, invocation, result.getException());
                    } else {
                        fireReturnCallback(invoker, invocation, result.getValue());
                    }
                }
                public void caught(Throwable exception) {
                    fireThrowCallback(invoker, invocation, exception);
                }
            });
        }
    }

如果是异步的方法,那么返回的就是一个future了,这时候在future上注册一个回调, 在future已经完成的情况下触发配置好的回调

注意:低版本的有个BUG,在使用oninvoke的时候会报找不到bean的错误,这是因为在解析method标签的时候,没有处理oninvoke这个节点,导致失败,具体代码在com.alibaba.dubbo.config.spring.schema.DubboBeanDefinitionParser#parse中,其中处理了onthrow和onreturn,但是少了oninvoke

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容