Dubbo provider Filter链原理

开篇

 在dubbo的使用过程中会在<dubbo:service>标签中会配置filter变量,但是filter具体如何生效却不是特别清楚,这篇文章就是针对Filter的加载过程进行下分析,尝试描述清楚过程。

 在这篇文章中会尝试解释ProtocolFilterWrapper被调用过程,协议发布的时候都会走到ProtocolFilterWrapper,而这个类是Filter的加载入口,其核心方法在buildInvokerChain()当中。

 进而在buildInvokerChain()方法中通过ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension()获取所有的Filter(包括系统和自定义的Filter对象)的加载过程。


Filter调用链分析

  • Dubbo的调用链如下图,调用顺序按照ProtocolListenerWrapper => ProtocolFilterWrapper,包含关系是 ProtocolListenerWrapper包含ProtocolFilterWrapper。
  • 执行export()方法是会按照ProtocolListenerWrapper => ProtocolFilterWrapper的顺序执行,最终会调用ProtocolFilterWrapper的export()方法,进而进入buildInvokerChain()构造Filter链的逻辑。
InvocationChain构建链
Dubbo 调用链


  • RegistryProtocol => DubboProtocol的调用链如下图。
Reistry-Dubbo协议调用链
  • RegistryProtocol 中URL内容如下,所以Protocol@Adaptive访问RegistryProtocol对象。
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-api-provider&dubbo=2.0.2
&pid=50012&registry=zookeeper&timestamp=1571727131732
  • DubboProtocol 中URL如下,所以Protocol@Adaptive访问DubboProtocol对象。
dubbo://172.17.32.176:20880/org.apache.dubbo.demo.DemoService?
anyhost=true&application=dubbo-demo-api-provider&bind.ip=172.17.32.176
&bind.port=20880&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false
&interface=org.apache.dubbo.demo.DemoService&methods=sayHello
&pid=50012&release=&side=provider&timestamp=1571727131742


  • org.apache.dubbo.rpc.Protocol文件内容如下图,其中按照先后顺序是先filter后listener。
文件:
org.apache.dubbo.rpc.Protocol

内容:
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper


  • Protocol对应ExtensionLoader对象中cachedWrapperClasses包含filter对象和listener对象,且顺序先filter后listener。
Protocol对应ExtensionLoader


Filter调用链源码分析

  • ProtocolListenerWrapper 包含一个Protocol类型的构造函数,会根据构造函数进行初始化。
  • ProtocolFilterWrapper 包含一个Protocol类型的构造函数,会根据构造函数进行初始化。
public class ProtocolListenerWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolListenerWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
}


public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
}


  • 适配器Protocol$Adaptive的export()方法会调用ExtensionLoader的getExtension()方法。
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {

    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) 
                 throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        
        if (arg0.getUrl() == null) throw new IllegalArgumentException(
                 "org.apache.dubbo.rpc.Invoker argument getUrl() == null");

        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException(
                    "Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" 
                   + url.toString() + ") use keys([protocol])");
        
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader
               .getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        
        return extension.export(arg0);
    } 
}


  • getExtension()方法内部调用createExtension()创建扩展。
  • createExtension()内部主要包括四个核心步骤。
  • 步骤一:根据扩展名获取扩展类,getExtensionClasses().get(name)。
  • 步骤二:创建扩展类的实例对象,clazz.newInstance()。
  • 步骤三:根据set方法注入实例依赖的对象,injectExtension(instance)。
  • 步骤四:创建包装类对象并注入扩展类实例对象,wrapperClass.getConstructor(type).newInstance(instance)。
  • 在Protocol的ExtensionLoader的cachedWrapperClasses包括ProtocolFilterWrapper和ProtocolListenerWrapper。
  • 继续分析ProtocolFilterWrapper类。
public class ExtensionLoader<T> {

    public T getExtension(String name) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        final Holder<Object> holder = getOrCreateHolder(name);
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    instance = createExtension(name);
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }


    private T createExtension(String name) {
        // 根据扩展名获取扩展类clazz
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                // 创建扩展类对象
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }

            // 注入instance对象,这里假设为DubboProtocol对象作为例子
            // injectExtension方法内部遍历set方法并从上下文获取并set到对象当中
            injectExtension(instance);

            // 获取包装类WrapperClasses并挨个进行包装
            // 包装类以instance的type作为构造函数的参数
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {

                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }



    private T injectExtension(T instance) {

        if (objectFactory == null) {
            return instance;
        }

        try {
            for (Method method : instance.getClass().getMethods()) {
                if (!isSetter(method)) {
                    continue;
                }

                if (method.getAnnotation(DisableInject.class) != null) {
                    continue;
                }
                Class<?> pt = method.getParameterTypes()[0];
                if (ReflectUtils.isPrimitives(pt)) {
                    continue;
                }

                try {
                    // 获取set方法的属性property
                    String property = getSetterProperty(method);
                    // 这里的objectFactory为AdaptiveExtensionFactory
                    // objectFactory.getExtension执行
                    // SpiExtensionFactory的getExtension()方法
                    // 返回loader.getAdaptiveExtension()对象
                    // Protocol$Adaptive对象
                    Object object = objectFactory.getExtension(pt, property);
                    if (object != null) {
                        method.invoke(instance, object);
                    }
                } catch (Exception e) {
                }
            }
        } catch (Exception e) {
        }

        return instance;
    }
}
  • 补充一句,当injectExtension()的参数instance为RegistryProtocol,RegistryProtocol包含setProtocol()方法,执行反射调用method.invoke(instance, object)注入Protocol$Adaptive对象。比较抽象,需要好好debug理解


ProtocolFilterWrapper分析

  • 导出Dubbo协议过程中ProtocolFilterWrapper的Protocol为DubboProtocol对象。
  • 执行ProtocolFilterWrapper的export()的参数为buildInvokerChain()包装后包含过滤器的invoker对象
  • buildInvokerChain()方法包含两个核心步骤,获取Filter列表和组装Filter列表。
  • 先关注Filter列表的过程,获取过程后面继续分析
  • 假设Filter链为"A,B,C",那么实际串联后的顺序为A => B => C => Invoker。
  • 串联的逻辑代码在注释中标注了,逻辑是通过next记录上一次对象并保存当前Filter当中。
public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }



    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        // 保存引用,后续用于把真正的调用者保存到过滤器链的最后
        Invoker<T> last = invoker;
        // 获取系统和自定义的Filter过滤器
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class)
                           .getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            // 用过滤器包装真正的invoker,过滤器的包装顺序是从尾到头,按照顺序逆向包装。
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                // 每次把last对象设置为next,挂到当前Filter的next当中,实现串联。
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    // 只关注重点代码
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                            // onError callback
                            if (filter instanceof ListenableFilter) {
                                Filter.Listener listener = ((ListenableFilter) filter).listener();
                                if (listener != null) {
                                    listener.onError(e, invoker, invocation);
                                }
                            }
                            throw e;
                        }
                        return asyncResult;
                    }
                };
            }
        }

        return new CallbackRegistrationInvoker<>(last, filters);
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }
}


  • Dubbo原生的filter定义在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.filter文件。
echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter


  • Filter的类图是下图所示,其中接口Filter包含了SPI注解。
Filter类图


  • 以EchoFilter作为例子,我们可以看到Filter的实现都带有@Activate注解,加载类的时候会把带有@Activate注解的类进行单独缓存。
@Activate(group = CommonConstants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
            return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
        }
        return invoker.invoke(inv);
    }

}


  • ExtensionLoader的getActivateExtension()内部主要执行获取系统Filter和用户自定义Filter两步骤。
  • 获取系统Filter当中,如果filter标签不带有"-default"字段,就会执行获取系统Filter对象。
  • 获取系统Filter对象是从cachedActivates获取的,而cachedActivates是在ExtensionLoader加载扩展类的时候缓存带有@Activate的类,也就是@Activate的Filter会在加载扩展类的时被缓存。
  • 系统Filter保存在exts并按照一定规则进行排序。
  • 获取自定义Filter根据filter标签定义的Filter,加载过程中不加载带有排除符号"-"的Filter,在关键字"default"之前的Filter会被优先加载。
  • 自定义Filter一般不带@Activate标签,如果带@Activate标签就会进入系统Filter的加载流程。
public class ExtensionLoader<T> {

    public List<T> getActivateExtension(URL url, String key, String group) {
        String value = url.getParameter(key);
        return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
    }

    
    public List<T> getActivateExtension(URL url, String[] values, String group) {
        List<T> exts = new ArrayList<>();
        List<String> names = values == null ? new ArrayList<>(0) : Arrays.asList(values);
        // 如果Filter中不带有"-default"字段,就会加载系统扩展Filter对象。
        if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
            // 获取所有的扩展类
            getExtensionClasses();
            for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
                String name = entry.getKey();
                Object activate = entry.getValue();

                String[] activateGroup, activateValue;

                if (activate instanceof Activate) {
                    activateGroup = ((Activate) activate).group();
                    activateValue = ((Activate) activate).value();
                } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
                    activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
                    activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
                } else {
                    continue;
                }
                if (isMatchGroup(group, activateGroup)
                        && !names.contains(name)
                        && !names.contains(REMOVE_VALUE_PREFIX + name)
                        && isActive(activateValue, url)) {
                    exts.add(getExtension(name));
                }
            }
            // 按照一定的比较规则进行排序
            exts.sort(ActivateComparator.COMPARATOR);
        }
        // 加载用户自定义扩展Filter对象
        List<T> usrs = new ArrayList<>();
        for (int i = 0; i < names.size(); i++) {
            String name = names.get(i);
            // 带有排除符号"-"的Filter不加载
            if (!name.startsWith(REMOVE_VALUE_PREFIX)
                    && !names.contains(REMOVE_VALUE_PREFIX + name)) {
                if (DEFAULT_KEY.equals(name)) {
                    // 在default之前的Filter会被系统Filter之前被加载
                    if (!usrs.isEmpty()) {
                        exts.addAll(0, usrs);
                        usrs.clear();
                    }
                } else {
                    usrs.add(getExtension(name));
                }
            }
        }
        if (!usrs.isEmpty()) {
            exts.addAll(usrs);
        }
        return exts;
    }
}
  • 总体来说,针对Filter加载的过程需要搞清楚两个问题,一个问题是ProtocolFilterWrapper对象的调用链路,另外一个是buildInvokerChain的执行过程,两个问题在上面已经都提到并进行了一定的解释。


参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,635评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,628评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,971评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,986评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,006评论 6 394
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,784评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,475评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,364评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,860评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,008评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,152评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,829评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,490评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,035评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,428评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,127评论 2 356

推荐阅读更多精彩内容