Filter 机制也称拦截器机制,在众多框架或者语言中很常见,可以实现登录鉴权,网关拦截、封装全局状态返回等,博主文章以下几个问题展开:
Filter 的例子
Dubbo中内置的Filter是怎样的?Consumer 和Provider 默认使用的 Filter 有哪些?
Filter 何时初始化?
Filter 何时会被调用?
Filter 中 ListenableFilter 有何作用?
Consumer 和Provider 如何使用Dubbo 内置Filter?
从Filter 看 自定义SPI 在哪些地方会被使用
以上问题看完文章后相信大家就可以清楚,若有疑问,关注博主公众号:六点A君,回复标题获取最新答案><
Filter 是Dubbo 的一个扩展点,可以理解为拦截作用,Dubbo 本身大多功能都基于该扩展点实现。
Filter 例子
先说说Dubbo 中Filter 的使用,以下摘抄自Dubbo官网:
用户自定义 filter 默认在内置 filter 之后。
特殊值
default
,表示缺省扩展点插入的位置。比如:filter="xxx,default,yyy"
,表示xxx
在缺省filter
之前,yyy
在缺省filter
之后。特殊符号
-
,表示剔除。比如:filter="-foo1"
,剔除添加缺省扩展点foo1
。比如:filter="-default"
,剔除添加所有缺省扩展点。provider
和service
同时配置的filter
时,累加所有filter
,而不是覆盖。比如:<dubbo:provider filter="xxx,yyy"/>
和<dubbo:service filter="aaa,bbb" />
,则xxx,yyy,aaa,bbb
均会生效。如果要覆盖,需配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />
Dubbo
文档:http://dubbo.apache.org/zh-cn/docs/dev/impls/filter.html
Dubbo
中使用 Filter
有两种途径,可以使用Dubbo
内置的Filter
,或者可以使用自定义的Filter
。
以自定义Filter 为例,其中带入
定义类实现
org.apache.dubbo.rpc.Filter
在
META-INF/dubbo
下增加 增加 SPI 文件org.apache.dubbo.rpc.Filter
,填写SPI 映射,例如traceConsumer=com.anla.rpc.filter.consumer.filter.ConsumerTraceFilter
在 Consumer 端 的
dubbo:consumer
或dubbo:reference
进行声明,例如<dubbo:consumer filter="traceConsumer"/>
,同理可以在 Provider 端的dubbo:provider
或dubbo:service
进行声明。
当然在这一步可以使用 Dubbo内置的Filter 进行使用。也可以通过 -
(减号) 进行内置Filter 删除。
具体Filter 例子可以看博主写的例子:https://github.com/anLA7856/dubbolearn/tree/master/filter
Dubbo 内置的Filter
如何找Dubbo内置Filter呢?
由于Dubbo 提供SPI 机制,所以我们可以沿着这条路往下找,即找到 `org.apache.dubbo.rpc.Filter`` 在找到其有哪些子类就可以了。
上面图片看起来过于眼花缭乱,下面博主整理表格或许来的更加直观(起始下面更眼花缭乱,各位看官直接过了就好,需要再来细看)
| 类名 | @Adaptive | 是否默认Consumer| 是否默认Provider|排序| 作用|
|:------:| :-------------:|:-------------:|:-------------:|:-------------:|:----------:|
|ClassLoaderFilter
|@Activate(group = CommonConstants.PROVIDER, order = -30000)
|否|是,默认存在|-30000|用于添加invoker的ClassLoader 到本地线程中|
|DeprecatedFilter
|@Activate(group = CommonConstants.CONSUMER, value = DEPRECATED_KEY)
|是,需要指定 值为 deprecated|无|无|用于提醒错误如果接口配置了deprecated,则会打印错误|
|EchoFilter
|@Activate(group = CommonConstants.PROVIDER, order = -110000)
|否|是,默认有|-110000|用于在服务端提供基于EchoService的功能链路相应功能|
|MetricsFilter
|无|否|否|无|用于一些统计接口监控统计,提供report 功能|
|TpsLimitFilter
|@Activate(group = CommonConstants.PROVIDER, value = TPS_LIMIT_RATE_KEY)
|否|是,需要指定,值为tpc|无|限流作用,用于限制tps,用户可自行配置|
|AccessLogFilter
|@Activate(group = PROVIDER, value = ACCESS_LOG_KEY)
|否|是,需要指定,值为accesslog|无|用于打印接口log|
|ValidationFilter
|@Activate(group = {CONSUMER, PROVIDER}, value = VALIDATION_KEY, order = 10000)
|是,需要指定|是,需要指定|10000|在配置完validation后,在具体调用前既可以使用,用户可以基于org.apache.dubbo.validation.Validation
的 SPI方式实现新Validation
|
|DubboAppContextFilter
|@Activate(group = "consumer")
|是,默认有|否|无|用于将当前Conusmer 的applicationName 放入attachment中|
|CacheFilter
|@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)
| 是,需要指定,值为cache|是,需要指定,值为cache|无|为Dubbo的核心组件,支持 service,method,consumer or provider
四种粒度缓存|
|TokenFilter
|@Activate(group = CommonConstants.PROVIDER, value = TOKEN_KEY)
|否|有,需要指定,value为token|无|用于鉴权访问|
|TraceFilter
|@Activate(group = CommonConstants.PROVIDER)
|否|有,默认存在|无|用于链路跟踪,放入相应信息|
| SentinelDubboProviderFilter
| @Activate(group = "provider")
|否|仅Provder,默认有|无|为Sentinel 提供的 服务端Filter|
|GenericImplFilter
|@Activate(group = CommonConstants.CONSUMER, value = GENERIC_KEY, order = 20000)
|是,需要指定,值为generic |否| 20000|用于支持泛化generic 调用|
|MonitorFilter
|@Activate(group = {PROVIDER, CONSUMER})
|是,默认有|是,默认有|无|用于监控接口调用|
|ContextFilter
|@Activate(group = PROVIDER, order = -10000)
|否|是,默认有|-10000|在invoker中设置服务端的RpcContext|
|ExceptionFilter
|@Activate(group = CommonConstants.PROVIDER)
|否|是,默认有|无|unexpect 异常将会在provider 中error层级记录|
|CompatibleFilter
|无|否|否|无|为版本兼容所有,让rpc调用返回值可以兼容旧版本的invoker 所需|
|GenericFilter
|@Activate(group = CommonConstants.PROVIDER, order = -20000)
| 无|有,默认有|-20000|用于实现和 generic 泛化调用相关逻辑|
|FutureFilter
|@Activate(group = CommonConstants.CONSUMER)
|有,默认有|无|无|属于一个事件的Filter,待研究|
|TimeoutFilter
|@Activate(group = CommonConstants.PROVIDER)
|无|有|无|记录 超时的 invocation,但是不阻止该invoker 运行,也就是会照常运行|
|ActiveLimitFilter
| @Activate(group = CONSUMER, value = ACTIVES_KEY)
| 有,需要指定|无 |无|用于限制 client 端的调用量 |
|ConsumerContextFilter
|@Activate(group = CONSUMER, order = -10000)
|有,默认有|无|-10000|用于设定RpcContext中一些属性,例如invoker,invocation等。|
|ExecuteLimitFilter
|@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)
|无|有,需要指定 executes|无| 用于限制,单method 下 ,最大并行请求数限制 |
从上面分析来看,当程序没有显示指定Filter,那么Consumer 和 Provider 存在的Filter 如下:
| Provider | Consumer |
|:-------------:| :--------:|
| EchoFilter
,ClassLoaderFilter
, GenericFilter
, ContextFilter
, TraceFilter
, TimeoutFilter
, MonitorFilter
, ExceptionFilter
, |SentinelDubboProviderFilter
ConsumerContextFilter
, FutureFilter
,MonitorFilter
, |DubboAppContextFilter
以上得出 Consumer
和 Provider
可以由 上面表格推到出,一方面也可以看到 @Activate
注解用途:
如果在
@Adaptive
有指定 value,则需要配置才能被加载。如果没有
@Adaptive
,则无法被加载Filter 中 排序按照 order 来的,倒序排。
最后有删除的
SentinelDubboProviderFilter
实际上不属于Dubbo 的包,它的@Adaptive
注解本应被加载,但是如果没有引入相应starter 文件,不回被加载。。只有当被用到时,才会被加载,才会 sentinel 的 SPI 文件才会被加载,才会被加入到默认的SPI 中。所以在编写代码是,如果使用了
@Adaptive
注解,满足条件后,即使不配置filter
,也会生效,就是这个道理。
Filter 何时初始化
Filter 初始化是伴随着 ProtocolFilterWrapper
使用而初始化。而通过前面文章可知,ProtocolFilterWrapper
作为包装类,当有Protocol
初始化,它就会初始化,并且由链路调用一层一层下去。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
当 Provider 暴露服务会调用 export
方法,而 Consumer 则会调用refer 方法,上述方法都做了以下同样的事:
首先初始化注册中心协议
调用
buildInvokerChain
构造Filter
链。
下面看看 buildInvokerChain
:
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 获取 group 和key下所有的 Filter 的 SPI 类文件
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
// 构建Filter 链
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 直接执行filter 的invoke
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
return asyncResult;
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
上面方法看起来也容易理解:
通过SPI 中,加载 Filter 的扩展类。具体加载规则可以看博主前面文章以及上文表格分析
将每个Filter链起来,即包装每一个Filter。这样当有请求时候,就会依次执行每一个链起来的Filter
最后返回
CallbackRegistrationInvoker
的封装对象
在 Filter
中有个特殊的子类:ListenableFilter
:
public abstract class ListenableFilter implements Filter {
protected Listener listener = null;
public Listener listener() {
return listener;
}
}
而 Listener
则 为 Filter
中的内部接口,依附 Filter
存在:
interface Listener {
// 当结果返回时调用
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
// 当调用方出现异常时调用
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
有部分Filter
是 实现自 Listener
,故其有监听作用。
在 buildInvokerChain
中绑定 invoke
链时,则使用了 onError
,当Filter
出错时,则会记录并调用 onError
方法:
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// onError callback
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onError(e, invoker, invocation);
}
}
throw e;
}
而再看 buildInvokerChain
返回的 CallbackRegistrationInvoker
时,在其invoke方法中可以看到,在 Filter链 中最后一个 调用 完之后,则会进行异步封装:
@Override
public Result invoke(Invocation invocation) throws RpcException {
// 异步封装
Result asyncResult = filterInvoker.invoke(invocation);
asyncResult.thenApplyWithContext(r -> {
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
// 告知 ListenableFilter 的子类
if (filter instanceof ListenableFilter) {
Filter.Listener listener = ((ListenableFilter) filter).listener();
if (listener != null) {
listener.onResponse(r, filterInvoker, invocation);
}
} else {
filter.onResponse(r, filterInvoker, invocation);
}
}
return r;
});
return asyncResult;
}
并且对 asyncResult
异步调用完结果进行监听,并且循环filter,依次调用他们的onResponse
方法。
如何 定位 SPI 的使用点
本篇以Filter 这个SPI 类型 为例,并且分析了 @Adaptive
注解 对于加载SPI 文件区别。因而总结下
SPI 使用点:
查找其基接口,看何处被
ExtensionLoader
加载分析
ExtensionLoader
中使用 什么方法加载初 其相关联的SPI文件,方法可能为:getActivateExtension
,getActivateExtension
,getExtension
...,不同方法加载出的SPI扩展类是不一样的,都是围绕 SPI配置文件,@Adaptive注解,@SPI 注解进行。获取完具体SPI扩展类之后,就是对类的操作了,这个要看具体逻辑代码了。
觉得博主写的有用,不妨关注博主公众号: 六点A君。
哈哈哈,Dubbo小吃街不迷路: