filter其实是一种责任链模式,每个filter只负责完成自己职责的部分,解除耦合,这种设计模式很利于扩展。
大家可能对Dubbo的filter不太熟悉,但是应该都写过Servlet的filter,让我们先来回顾一下Servlet的Filter:
public interface Filter {
/**
* Called by the web container to indicate to a filter that it is being placed into
* service. The servlet container calls the init method exactly once after instantiating the
* filter. The init method must complete successfully before the filter is asked to do any
* filtering work. <br><br>
* The web container cannot place the filter into service if the init method either<br>
* 1.Throws a ServletException <br>
* 2.Does not return within a time period defined by the web container
*/
public void init(FilterConfig filterConfig) throws ServletException;
/**
* The <code>doFilter</code> method of the Filter is called by the container
* each time a request/response pair is passed through the chain due
* to a client request for a resource at the end of the chain. The FilterChain passed in to this
* method allows the Filter to pass on the request and response to the next entity in the
* chain.<p>
* A typical implementation of this method would follow the following pattern:- <br>
* 1. Examine the request<br>
* 2. Optionally wrap the request object with a custom implementation to
* filter content or headers for input filtering <br>
* 3. Optionally wrap the response object with a custom implementation to
* filter content or headers for output filtering <br>
* 4. a) <strong>Either</strong> invoke the next entity in the chain using the FilterChain object (<code>chain.doFilter()</code>), <br>
** 4. b) <strong>or</strong> not pass on the request/response pair to the next entity in the filter chain to block the request processing<br>
** 5. Directly set headers on the response after invocation of the next entity in the filter chain.
**/
public void doFilter ( ServletRequest request, ServletResponse response, FilterChain chain ) throws IOException, ServletException;
/**
* Called by the web container to indicate to a filter that it is being taken out of service. This
* method is only called once all threads within the filter's doFilter method have exited or after
* a timeout period has passed. After the web container calls this method, it will not call the
* doFilter method again on this instance of the filter. <br><br>
*
* This method gives the filter an opportunity to clean up any resources that are being held (for
* example, memory, file handles, threads) and make sure that any persistent state is synchronized
* with the filter's current state in memory.
*/
public void destroy();
}
我们重写doFilter,在chain.doFilter(request, response) 前后做一些切面的工作,比如防XSS攻击、CROS跨域请求处理、记录相关日志等,调用逻辑可以用下图来概括:
类似于Servlet中的filter,Dubbo也可以通过扩展filter来增强功能,Dubbo服务提供方和服务消费方均支持调用过程拦截,并且Dubbo 自身的大多功能均基于此扩展点实现,下面例举部分filter:
EchoFilter -> 用于provider的回声测试,检测服务是否正常
ContextFilter -> 用于provider接收RpcContext的参数
ConsumerContextFilter -> 用于consumer传递RpcContext的参数
ExecuteLimitFilter -> 用于provider的限流
ActiveLimitFilter -> 用于consumer的限流
ExceptionFilter -> 用于provider对异常进行封装
GenericFilter -> 用于provider的泛化调用,可用于集成通用服务测试框架或为其他语言调用服务提供Restful接口的支持
AccessLogFilter -> 用于provider 的access log记录
ClassLoaderFilter -> 用于provider切换当前的ClassLoader
MonitorFilter -> 用于dubbo monitor模块对consumer和provider进行监控
Dubbo filter的调用逻辑可以用下图来概括:
那么Dubbo是怎么将多个filter串起来的呢?
答案就位于ProtocolFilterWrapper这个类的buildInvokerChain方法。
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol){
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
public int getDefaultPort() {
return protocol.getDefaultPort();
}
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
public void destroy() {
protocol.destroy();
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}
看明白这里首先要理解Dubbo的SPI扩展点机制,List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);这一行是获取Filter接口的所有被标注为@Activate的扩展点,然后基于回调让前一个filter调用后一个filter从而串成一个调用链,调用的先后顺序是由每个filter定义的order属性决定的(不声明默认为0),order值越小则调用优先级越高。
了解了Dubbo filter的作用和原理,那让我们来看看如何扩展:
Maven 项目结构:
src
|-main
|-java
|-com
|-xxx
|-XxxFilter.java (实现Filter接口)
Dubbo自身的过滤器配置都放在resources/META-INF/dubbo/internal下,我们扩展的过滤器一版放在resources/META-INF/dubbo/下:
|-resources
|-META-INF
|-dubbo
|-com.alibaba.dubbo.rpc.Filter (纯文本文件,内容为:xxx=com.xxx.XxxFilter)
XxxFilter.java:
package com.xxx;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
public class XxxFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// before filter ...
Result result = invoker.invoke(invocation);
// after filter ...
return result;
}
}
META-INF/dubbo/com.alibaba.dubbo.rpc.Filter:
xxx=com.xxx.XxxFilter
我司生产环境中利用Dubbo filter扩展来记录服务调用日志和服务调用链追踪。
1.服务调用日志记录:
服务调用日志记录分为provider日志和consumer日志两部分,provider日志记录的是当前工程作为provider的服务提供日志,consumer日志记录的是当前工程作为consumer的服务消费日志,以下是部分consumer日志内容:
2018-06-07 18:15:22,971 [main] INFO consumerMonitor - [DUBBO] consumer[null,192.168.15.215:9015,2018-06-07 18:15:22.960] -> provider[nil,172.16.11.120:20942,2018-06-07 18:15:22.971] - com.bj58.qf.service.GoodsSkuService findSkuItemById(java.lang.Long) elapse:11ms SUCCESS [2291038187773442], dubbo version: 2.5.3, current host: 192.168.15.215
2018-06-07 18:15:22,982 [main] INFO consumerMonitor - [DUBBO] consumer[null,192.168.15.215:9015,2018-06-07 18:15:22.971] -> provider[nil,172.16.11.120:20942,2018-06-07 18:15:22.982] - com.bj58.qf.service.GoodsSkuService findSkuItemById(java.lang.Long) elapse:11ms SUCCESS [2291414190906882], dubbo version: 2.5.3, current host: 192.168.15.215
2018-06-07 18:15:22,995 [main] INFO consumerMonitor - [DUBBO] consumer[null,192.168.15.215:9015,2018-06-07 18:15:22.983] -> provider[nil,172.16.11.120:20942,2018-06-07 18:15:22.995] - com.bj58.qf.service.ShopGoodsService queryStorageBySkuId(java.lang.Long) elapse:12ms SUCCESS [2290217867189761], dubbo version: 2.5.3, current host: 192.168.15.215
日志会记录每一次调用的consumer的ip、端口、调用时间、provider的ip、端口、接口请求的时间、调用的方法、调用耗时、调用结果(成功或失败,失败则打印异常)、方法入参(可选)、返回值(可选)等。
由于每次调用的入参和返回值的内容比较多,所以方法入参和返回值是否打印都是可以配置的,filter会根据当前配置的日志等级去打印。
2.调用链追踪:
Dubbo Filter结合brave + zipkin实现RPC调用链追踪和梳理项目间的依赖关系,filter中用brave向zipkin服务器异步发送http请求(也可以用kafka),zipkin服务器对数据进行分析汇总,很容易分析出性能的瓶颈在哪,效果如下:
如果有人对这些具体实现感兴趣,笔者会开放源码。
最后提醒一点,filter的实现中不要写太耗时的方法,会影响性能。