Dubbo中 Filter 探究

Filter 机制也称拦截器机制,在众多框架或者语言中很常见,可以实现登录鉴权,网关拦截、封装全局状态返回等,博主文章以下几个问题展开:

  1. Filter 的例子

  2. Dubbo中内置的Filter是怎样的?Consumer 和Provider 默认使用的 Filter 有哪些?

  3. Filter 何时初始化?

  4. Filter 何时会被调用?

  5. Filter 中 ListenableFilter 有何作用?

  6. Consumer 和Provider 如何使用Dubbo 内置Filter?

  7. 从Filter 看 自定义SPI 在哪些地方会被使用

以上问题看完文章后相信大家就可以清楚,若有疑问,关注博主公众号:六点A君,回复标题获取最新答案><

Filter 是Dubbo 的一个扩展点,可以理解为拦截作用,Dubbo 本身大多功能都基于该扩展点实现。

Filter 例子

先说说Dubbo 中Filter 的使用,以下摘抄自Dubbo官网:

  1. 用户自定义 filter 默认在内置 filter 之后。

  2. 特殊值 default,表示缺省扩展点插入的位置。比如:filter="xxx,default,yyy",表示 xxx 在缺省 filter 之前,yyy 在缺省 filter 之后。

  3. 特殊符号 -,表示剔除。比如:filter="-foo1",剔除添加缺省扩展点 foo1。比如:filter="-default",剔除添加所有缺省扩展点。

  4. providerservice 同时配置的 filter 时,累加所有 filter,而不是覆盖。比如:<dubbo:provider filter="xxx,yyy"/><dubbo:service filter="aaa,bbb" />,则 xxx,yyy,aaa,bbb 均会生效。如果要覆盖,需配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />

Dubbo 文档:http://dubbo.apache.org/zh-cn/docs/dev/impls/filter.html

Dubbo 中使用 Filter 有两种途径,可以使用Dubbo内置的Filter,或者可以使用自定义的Filter

以自定义Filter 为例,其中带入

  1. 定义类实现 org.apache.dubbo.rpc.Filter

  2. META-INF/dubbo 下增加 增加 SPI 文件 org.apache.dubbo.rpc.Filter,填写SPI 映射,例如 traceConsumer=com.anla.rpc.filter.consumer.filter.ConsumerTraceFilter

  3. 在 Consumer 端 的 dubbo:consumerdubbo:reference 进行声明,例如 <dubbo:consumer filter="traceConsumer"/>,同理可以在 Provider 端的 dubbo:providerdubbo:service 进行声明。

当然在这一步可以使用 Dubbo内置的Filter 进行使用。也可以通过 - (减号) 进行内置Filter 删除。

具体Filter 例子可以看博主写的例子:https://github.com/anLA7856/dubbolearn/tree/master/filter

Dubbo 内置的Filter

如何找Dubbo内置Filter呢?

由于Dubbo 提供SPI 机制,所以我们可以沿着这条路往下找,即找到 `org.apache.dubbo.rpc.Filter`` 在找到其有哪些子类就可以了。

在这里插入图片描述

上面图片看起来过于眼花缭乱,下面博主整理表格或许来的更加直观(起始下面更眼花缭乱,各位看官直接过了就好,需要再来细看)

| 类名 | @Adaptive | 是否默认Consumer| 是否默认Provider|排序| 作用|

|:------:| :-------------:|:-------------:|:-------------:|:-------------:|:----------:|

|ClassLoaderFilter|@Activate(group = CommonConstants.PROVIDER, order = -30000)|否|是,默认存在|-30000|用于添加invoker的ClassLoader 到本地线程中|

|DeprecatedFilter|@Activate(group = CommonConstants.CONSUMER, value = DEPRECATED_KEY)|是,需要指定 值为 deprecated|无|无|用于提醒错误如果接口配置了deprecated,则会打印错误|

|EchoFilter|@Activate(group = CommonConstants.PROVIDER, order = -110000)|否|是,默认有|-110000|用于在服务端提供基于EchoService的功能链路相应功能|

|MetricsFilter|无|否|否|无|用于一些统计接口监控统计,提供report 功能|

|TpsLimitFilter|@Activate(group = CommonConstants.PROVIDER, value = TPS_LIMIT_RATE_KEY)|否|是,需要指定,值为tpc|无|限流作用,用于限制tps,用户可自行配置|

|AccessLogFilter|@Activate(group = PROVIDER, value = ACCESS_LOG_KEY)|否|是,需要指定,值为accesslog|无|用于打印接口log|

|ValidationFilter|@Activate(group = {CONSUMER, PROVIDER}, value = VALIDATION_KEY, order = 10000)|是,需要指定|是,需要指定|10000|在配置完validation后,在具体调用前既可以使用,用户可以基于org.apache.dubbo.validation.Validation 的 SPI方式实现新Validation |

|DubboAppContextFilter|@Activate(group = "consumer") |是,默认有|否|无|用于将当前Conusmer 的applicationName 放入attachment中|

|CacheFilter|@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)| 是,需要指定,值为cache|是,需要指定,值为cache|无|为Dubbo的核心组件,支持 service,method,consumer or provider 四种粒度缓存|

|TokenFilter|@Activate(group = CommonConstants.PROVIDER, value = TOKEN_KEY)|否|有,需要指定,value为token|无|用于鉴权访问|

|TraceFilter|@Activate(group = CommonConstants.PROVIDER)|否|有,默认存在|无|用于链路跟踪,放入相应信息|

| SentinelDubboProviderFilter | @Activate(group = "provider") |否|仅Provder,默认有|无|为Sentinel 提供的 服务端Filter|

|GenericImplFilter|@Activate(group = CommonConstants.CONSUMER, value = GENERIC_KEY, order = 20000)|是,需要指定,值为generic |否| 20000|用于支持泛化generic 调用|

|MonitorFilter|@Activate(group = {PROVIDER, CONSUMER})|是,默认有|是,默认有|无|用于监控接口调用|

|ContextFilter|@Activate(group = PROVIDER, order = -10000)|否|是,默认有|-10000|在invoker中设置服务端的RpcContext|

|ExceptionFilter|@Activate(group = CommonConstants.PROVIDER)|否|是,默认有|无|unexpect 异常将会在provider 中error层级记录|

|CompatibleFilter|无|否|否|无|为版本兼容所有,让rpc调用返回值可以兼容旧版本的invoker 所需|

|GenericFilter|@Activate(group = CommonConstants.PROVIDER, order = -20000)| 无|有,默认有|-20000|用于实现和 generic 泛化调用相关逻辑|

|FutureFilter|@Activate(group = CommonConstants.CONSUMER) |有,默认有|无|无|属于一个事件的Filter,待研究|

|TimeoutFilter|@Activate(group = CommonConstants.PROVIDER)|无|有|无|记录 超时的 invocation,但是不阻止该invoker 运行,也就是会照常运行|

|ActiveLimitFilter| @Activate(group = CONSUMER, value = ACTIVES_KEY) | 有,需要指定|无 |无|用于限制 client 端的调用量 |

|ConsumerContextFilter|@Activate(group = CONSUMER, order = -10000)|有,默认有|无|-10000|用于设定RpcContext中一些属性,例如invoker,invocation等。|

|ExecuteLimitFilter|@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)|无|有,需要指定 executes|无| 用于限制,单method 下 ,最大并行请求数限制 |

从上面分析来看,当程序没有显示指定Filter,那么Consumer 和 Provider 存在的Filter 如下:

| Provider | Consumer |

|:-------------:| :--------:|

| EchoFilter,ClassLoaderFilter, GenericFilter, ContextFilter, TraceFilter, TimeoutFilter, MonitorFilter, ExceptionFilter ,SentinelDubboProviderFilter |ConsumerContextFilter, FutureFilter,MonitorFilter, DubboAppContextFilter |

以上得出 ConsumerProvider 可以由 上面表格推到出,一方面也可以看到 @Activate 注解用途:

  1. 如果在 @Adaptive 有指定 value,则需要配置才能被加载。

  2. 如果没有 @Adaptive,则无法被加载

  3. Filter 中 排序按照 order 来的,倒序排。

  4. 最后有删除的 SentinelDubboProviderFilter 实际上不属于Dubbo 的包,它的@Adaptive 注解本应被加载,但是如果没有引入相应starter 文件,不回被加载。。只有当被用到时,才会被加载,才会 sentinel 的 SPI 文件才会被加载,才会被加入到默认的SPI 中。

  5. 所以在编写代码是,如果使用了 @Adaptive注解,满足条件后,即使不配置filter,也会生效,就是这个道理。

Filter 何时初始化

Filter 初始化是伴随着 ProtocolFilterWrapper 使用而初始化。而通过前面文章可知,ProtocolFilterWrapper 作为包装类,当有Protocol 初始化,它就会初始化,并且由链路调用一层一层下去。


    @Override

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {

        if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {

            return protocol.export(invoker);

        }

        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));

    }

    @Override

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {

        if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {

            return protocol.refer(type, url);

        }

        return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);

    }

当 Provider 暴露服务会调用 export 方法,而 Consumer 则会调用refer 方法,上述方法都做了以下同样的事:

  1. 首先初始化注册中心协议

  2. 调用 buildInvokerChain 构造Filter 链。

下面看看 buildInvokerChain


  private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {

        Invoker<T> last = invoker;

        // 获取 group 和key下所有的 Filter 的 SPI 类文件

        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

// 构建Filter 链

        if (!filters.isEmpty()) {

            for (int i = filters.size() - 1; i >= 0; i--) {

                final Filter filter = filters.get(i);

                final Invoker<T> next = last;

                last = new Invoker<T>() {

                    @Override

                    public Class<T> getInterface() {

                        return invoker.getInterface();

                    }

                    @Override

                    public URL getUrl() {

                        return invoker.getUrl();

                    }

                    @Override

                    public boolean isAvailable() {

                        return invoker.isAvailable();

                    }

                    @Override

                    public Result invoke(Invocation invocation) throws RpcException {

                        Result asyncResult;

                        try {

                        // 直接执行filter 的invoke

                            asyncResult = filter.invoke(next, invocation);

                        } catch (Exception e) {

                            // onError callback

                            if (filter instanceof ListenableFilter) {

                                Filter.Listener listener = ((ListenableFilter) filter).listener();

                                if (listener != null) {

                                    listener.onError(e, invoker, invocation);

                                }

                            }

                            throw e;

                        }

                        return asyncResult;

                    }

                    @Override

                    public void destroy() {

                        invoker.destroy();

                    }

                    @Override

                    public String toString() {

                        return invoker.toString();

                    }

                };

            }

        }

        return new CallbackRegistrationInvoker<>(last, filters);

    }

上面方法看起来也容易理解:

  1. 通过SPI 中,加载 Filter 的扩展类。具体加载规则可以看博主前面文章以及上文表格分析

  2. 将每个Filter链起来,即包装每一个Filter。这样当有请求时候,就会依次执行每一个链起来的Filter

  3. 最后返回 CallbackRegistrationInvoker 的封装对象

Filter 中有个特殊的子类:ListenableFilter


public abstract class ListenableFilter implements Filter {

    protected Listener listener = null;

    public Listener listener() {

        return listener;

    }

}

Listener则 为 Filter 中的内部接口,依附 Filter 存在:


    interface Listener {

// 当结果返回时调用

        void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);

// 当调用方出现异常时调用

        void onError(Throwable t, Invoker<?> invoker, Invocation invocation);

    }

有部分Filter是 实现自 Listener,故其有监听作用。

buildInvokerChain 中绑定 invoke 链时,则使用了 onError,当Filter 出错时,则会记录并调用 onError 方法:


                        try {

                            asyncResult = filter.invoke(next, invocation);

                        } catch (Exception e) {

                            // onError callback

                            if (filter instanceof ListenableFilter) {

                                Filter.Listener listener = ((ListenableFilter) filter).listener();

                                if (listener != null) {

                                    listener.onError(e, invoker, invocation);

                                }

                            }

                            throw e;

                        }

而再看 buildInvokerChain 返回的 CallbackRegistrationInvoker 时,在其invoke方法中可以看到,在 Filter链 中最后一个 调用 完之后,则会进行异步封装:


        @Override

        public Result invoke(Invocation invocation) throws RpcException {

        // 异步封装

            Result asyncResult = filterInvoker.invoke(invocation);

            asyncResult.thenApplyWithContext(r -> {

                for (int i = filters.size() - 1; i >= 0; i--) {

                    Filter filter = filters.get(i);

                    // 告知 ListenableFilter 的子类

                    if (filter instanceof ListenableFilter) {

                        Filter.Listener listener = ((ListenableFilter) filter).listener();

                        if (listener != null) {

                            listener.onResponse(r, filterInvoker, invocation);

                        }

                    } else {

                        filter.onResponse(r, filterInvoker, invocation);

                    }

                }

                return r;

            });

            return asyncResult;

        }

并且对 asyncResult 异步调用完结果进行监听,并且循环filter,依次调用他们的onResponse方法。

如何 定位 SPI 的使用点

本篇以Filter 这个SPI 类型 为例,并且分析了 @Adaptive 注解 对于加载SPI 文件区别。因而总结下

SPI 使用点:

  1. 查找其基接口,看何处被 ExtensionLoader 加载

  2. 分析 ExtensionLoader 中使用 什么方法加载初 其相关联的SPI文件,方法可能为:getActivateExtensiongetActivateExtensiongetExtension ...,不同方法加载出的SPI扩展类是不一样的,都是围绕 SPI配置文件@Adaptive注解,@SPI 注解进行。

  3. 获取完具体SPI扩展类之后,就是对类的操作了,这个要看具体逻辑代码了。

觉得博主写的有用,不妨关注博主公众号: 六点A君。

哈哈哈,Dubbo小吃街不迷路:

在这里插入图片描述
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,313评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,369评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,916评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,333评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,425评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,481评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,491评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,268评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,719评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,004评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,179评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,832评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,510评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,153评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,402评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,045评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,071评论 2 352

推荐阅读更多精彩内容