服务发布流程
- 检查配置参数
构造器调用链
- 提前初始化注册中心
- Server 的创建、初始化、注册和启动
- 注册配置变化监听器
- 注册服务到注册中心
ProviderProxyInvoker providerProxyInvoker = new ProviderProxyInvoker(providerConfig)
Invoker 接口包含服务端和客户端的两部分实现类。客户端的在《SOFARPC 源码解析 - 客户端调用链的设计与实现》进行分析。
- ProviderProxyInvoker:FilterChain 的代理类,用于构造 FilterChain 对象
- FilterChain:用于构建 FilterInvoker 执行链(链尾是真正的执行器 ProviderInvoker,执行链内的其他节点都是带有 Filter 实例的 FilterInvoker)
- FilterInvoker:包含有 Filter 实例的 Invoker,FilterInvoker 内部又包含 FilterInvoker,进而组成 FilterInvoker 链,链尾是 ProviderInvoker,不包含 Filter;其他节点是普通 FilterInvoker,包含 Filter 实例;
- ProviderInvoker:真正的执行器
执行图:
首先会构造调用链,然后当执行时,ProviderProxyInvoker 会调用 FilterChain,FilterChain 调用 FilterInvoker,FilterInvoker 判断如果存在 Filter,则执行 Filter;否则,直接执行 FilterInvoker;Filter 内部的实现就要看如何实现了,如果不在向后边的 FilterInvoker 进行调用,则 Filter 执行结束后,调用链就执行结束了;否则,继续执行下一个 FilterInvoker。最后,执行到链尾的 ProviderInvoker,ProviderInvoker 是一个没有 Filter 属性的特殊的 FilterInvoker 子类,在 ProviderInvoker 中,进行反射调用。
构造调用链
============================= Invoker =============================
public interface Invoker {
/**
* 执行调用
* @param request 请求
* @return SofaResponse 响应
* @throws SofaRpcException rpc异常
*/
SofaResponse invoke(SofaRequest request) throws SofaRpcException;
}
============================= ProviderProxyInvoker =============================
public class ProviderProxyInvoker implements Invoker {
// 服务端配置信息
private final ProviderConfig providerConfig;
// 调用器执行链
private final FilterChain filterChain;
// 构造执行链
public ProviderProxyInvoker(ProviderConfig providerConfig) {
this.providerConfig = providerConfig;
// 最底层是调用过滤器
this.filterChain = FilterChain.buildProviderChain(providerConfig, new ProviderInvoker(providerConfig));
}
// proxy拦截的调用
@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
return filterChain.invoke(request);
}
}
============================= FilterChain =============================
public class FilterChain implements Invoker {
// 服务端自动激活的 {"alias":ExtensionClass}
private final static Map<String, ExtensionClass<Filter>> PROVIDER_AUTO_ACTIVES = Collections.synchronizedMap(new LinkedHashMap<String, ExtensionClass<Filter>>());
// 调用端自动激活的 {"alias":ExtensionClass}
private final static Map<String, ExtensionClass<Filter>> CONSUMER_AUTO_ACTIVES = Collections.synchronizedMap(new LinkedHashMap<String, ExtensionClass<Filter>>());
// 扩展加载器
private final static ExtensionLoader<Filter> EXTENSION_LOADER = buildLoader();
private static ExtensionLoader<Filter> buildLoader() {
// 每加载成功一个 Filter 的 ExtensionClass,执行一次加载监听器
// 如果 class 有 @AutoActive 注解,则根据配置添加到不同的激活映射中
return ExtensionLoaderFactory.getExtensionLoader(Filter.class, new ExtensionLoaderListener<Filter>() {
@Override
public void onLoad(ExtensionClass<Filter> extensionClass) {
Class<? extends Filter> implClass = extensionClass.getClazz();
AutoActive autoActive = implClass.getAnnotation(AutoActive.class);
if (autoActive != null) {
String alias = extensionClass.getAlias();
if (autoActive.providerSide()) {
PROVIDER_AUTO_ACTIVES.put(alias, extensionClass);
}
if (autoActive.consumerSide()) {
CONSUMER_AUTO_ACTIVES.put(alias, extensionClass);
}
}
}
});
}
// 调用链
private FilterInvoker invokerChain;
// 过滤器列表,从底至上排序; 用于对异步返回的 response 做 filter 链的操作
private List<Filter> loadedFilters;
// 构造执行链
protected FilterChain(List<Filter> filters, FilterInvoker lastInvoker, AbstractInterfaceConfig config) {
// 调用过程外面包装多层自定义filter
// 前面的过滤器在最外层,即优先级越小,越先执行
invokerChain = lastInvoker;
loadedFilters = new ArrayList<Filter>();
for (int i = filters.size() - 1; i >= 0; i--) {
Filter filter = filters.get(i);
if (filter.needToLoad(invokerChain)) {
invokerChain = new FilterInvoker(filter, invokerChain, config);
// cache this for filter when async respond
loadedFilters.add(filter);
}
}
}
/**
* 构造服务端的执行链
*
* @param providerConfig provider配置
* @param lastFilter 最后一个filter
* @return filter执行链
*/
public static FilterChain buildProviderChain(ProviderConfig<?> providerConfig, FilterInvoker lastFilter) {
return new FilterChain(selectActualFilters(providerConfig, PROVIDER_AUTO_ACTIVES), lastFilter, providerConfig);
}
/**
* 构造调用端的执行链
*
* @param consumerConfig consumer配置
* @param lastFilter 最后一个filter
* @return filter执行链
*/
public static FilterChain buildConsumerChain(ConsumerConfig<?> consumerConfig, FilterInvoker lastFilter) {
return new FilterChain(selectActualFilters(consumerConfig, CONSUMER_AUTO_ACTIVES), lastFilter, consumerConfig);
}
/**
* 判断是否需要排除自定义过滤器
*/
private static HashSet<String> parseExcludeFilter(List<Filter> customFilters) {
HashSet<String> excludeKeys = new HashSet<String>();
for (Filter filter : customFilters) {
if (filter instanceof ExcludeFilter) {
// 存在需要排除的过滤器
ExcludeFilter excludeFilter = (ExcludeFilter) filter;
String excludeName = excludeFilter.getExcludeName();
String excludeFilterName = startsWithExcludePrefix(excludeName) ? excludeName.substring(1) : excludeName;
excludeKeys.add(excludeFilterName);
customFilters.remove(filter);
}
}
return excludeKeys;
}
private static boolean startsWithExcludePrefix(String excludeName) {
char c = excludeName.charAt(0);
return c == '-' || c == '!';
}
/**
* 选择出真正的过滤器列表,按照 order 进行排序,order 设置在 @Extension 注解中
* order 由高到低:
* 1. 自定义的Filter
* 2. 高 order 的自动激活的系统内置过滤器
* 3. 低 order 的自动激活的系统内置过滤器
*
* 例如自动装载扩展 A(a),B(b),C(c) filter=[-a,d] filterRef=[new E, new Exclude(b)]
* 逻辑如下:
* 1.解析config.getFilterRef(),记录E和-b
* 2.解析config.getFilter()字符串,记录 d 和 -a,-b
* 3.再解析自动装载扩展,a,b被排除了,所以拿到c,d
* 4.对c d进行排序
* 5.拿到C、D实现类
* 6.加上自定义,返回C、D、E
*/
private static List<Filter> selectActualFilters(AbstractInterfaceConfig config, Map<String, ExtensionClass<Filter>> autoActiveFilters) {
// 1. 处理config.getFilterRef():用户通过自己new实例的方式注入的filter,优先级最高
List<Filter> customFilters = config.getFilterRef() == null ? new ArrayList<Filter>() : new CopyOnWriteArrayList<Filter>(config.getFilterRef());
// 2. 排除需要排除的自定义过滤器
HashSet<String> excludes = parseExcludeFilter(customFilters);
List<ExtensionClass<Filter>> extensionFilters = new ArrayList<ExtensionClass<Filter>>();
// 3. 处理config.getFilter():用户通过别名的方式注入的filter,根据别名进行 SPI 装载
List<String> filterAliases = config.getFilter();
for (String filterAlias : filterAliases) {
if (startsWithExcludePrefix(filterAlias)) { // 排除用的特殊字符
excludes.add(filterAlias.substring(1));
} else {
ExtensionClass<Filter> filter = EXTENSION_LOADER.getExtensionClass(filterAlias);
if (filter != null) {
extensionFilters.add(filter);
}
}
}
// 4. 解析自动加载的过滤器(配了-*和-default表示不加载内置的系统过滤器)
if (!excludes.contains(StringUtils.ALL) && !excludes.contains(StringUtils.DEFAULT)) {
for (Map.Entry<String, ExtensionClass<Filter>> entry : autoActiveFilters.entrySet()) {
if (!excludes.contains(entry.getKey())) {
extensionFilters.add(entry.getValue());
}
}
}
// 5. 按order从小到大排序
if (extensionFilters.size() > 1) {
Collections.sort(extensionFilters, new OrderedComparator<ExtensionClass<Filter>>());
}
List<Filter> actualFilters = new ArrayList<Filter>();
for (ExtensionClass<Filter> extensionFilter : extensionFilters) {
actualFilters.add(extensionFilter.getExtInstance());
}
// 6. 加入自定义的过滤器
actualFilters.addAll(customFilters);
return actualFilters;
}
@Override
public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
return invokerChain.invoke(sofaRequest);
}
// Do filtering when async respond from server
public void onAsyncResponse(ConsumerConfig config, SofaRequest request, SofaResponse response, Throwable throwable)
throws SofaRpcException {
for (Filter loadedFilter : loadedFilters) {
loadedFilter.onAsyncResponse(config, request, response, throwable);
}
}
}
SOFARPC Filter 链的设计,Filter 的优先级越小,即 order 越小,越先执行:
即执行顺序:低 order 的系统内嵌 Filter -> 高 order 的系统内嵌 Filter -> 用户自定义 Filter
@ThreadSafe
public class FilterInvoker implements Invoker {
// 过滤器
protected Filter nextFilter;
// 执行器
protected FilterInvoker invoker;
public FilterInvoker(Filter nextFilter, FilterInvoker invoker, AbstractInterfaceConfig config) {
this.nextFilter = nextFilter;
this.invoker = invoker;
}
@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
return nextFilter == null ? invoker.invoke(request) : nextFilter.invoke(invoker, request);
}
}
FilterInvoker 判断如果存在 Filter,则执行 Filter;否则,直接执行 FilterInvoker
以 ProviderBaggageFilter Filter 为例
public class ProviderBaggageFilter extends Filter {
@Override
public boolean needToLoad(FilterInvoker invoker) {
return RpcInvokeContext.isBaggageEnable();
}
@Override
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
...
response = invoker.invoke(request); // 执行下一个 FilterInvoker
...
}
}
真正的执行器
public class ProviderInvoker<T> extends FilterInvoker {
private final ProviderConfig<T> providerConfig;
@Override
public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
SofaResponse sofaResponse = new SofaResponse();
try {
// 反射 真正调用业务代码
Method method = request.getMethod();
if (method == null) {
throw new SofaRpcException(RpcErrorType.SERVER_FILTER, "Need decode method first!");
}
Object result = method.invoke(providerConfig.getRef(), request.getMethodArgs());
// 设置响应
sofaResponse.setAppResponse(result);
} catch (IllegalArgumentException e) { // 非法参数,可能是实现类和接口类不对应
sofaResponse.setErrorMsg(e.getMessage());
} catch (IllegalAccessException e) { // 如果此 Method 对象强制执行 Java 语言访问控制,并且底层方法是不可访问的
sofaResponse.setErrorMsg(e.getMessage());
} catch (InvocationTargetException e) { // 业务代码抛出异常
sofaResponse.setAppResponse(e.getCause());
} finally {
...
}
return sofaResponse;
}
}
值得注意的是,此处的调用直接使用了反射调用,并未使用类似于 dubbo 的方式:使用 javassist 动态生成实现类的包装类来避免反射调用。Dubbo 的处理方式见 https://www.cnblogs.com/java-zhao/p/7625596.html。
SOFARPC 为了避免反射获取 Method,会在向 Server 注册请求调用器的时候,缓存 Mehod 到{service:{方法名#(参数列表):Method}}
映射中,在 BoltServerProcessor 根据 SofaRequest 传递来的服务名、方法名和参数列表从方法缓存映射中获取 Method,进而进行调用。
关于 Dubbo 的无反射调用和 SOFARPC 的反射调用对比:https://github.com/alipay/sofa-rpc/issues/329