SOFARPC 源码分析5 - 服务端调用链的设计与实现

服务发布流程

  1. 检查配置参数
  2. 构造器调用链
  3. 提前初始化注册中心
  4. Server 的创建、初始化、注册和启动
  5. 注册配置变化监听器
  6. 注册服务到注册中心
ProviderProxyInvoker providerProxyInvoker = new ProviderProxyInvoker(providerConfig)
image.png

Invoker 接口包含服务端和客户端的两部分实现类。客户端的在《SOFARPC 源码解析 - 客户端调用链的设计与实现》进行分析。

  • ProviderProxyInvoker:FilterChain 的代理类,用于构造 FilterChain 对象
  • FilterChain:用于构建 FilterInvoker 执行链(链尾是真正的执行器 ProviderInvoker,执行链内的其他节点都是带有 Filter 实例的 FilterInvoker)
  • FilterInvoker:包含有 Filter 实例的 Invoker,FilterInvoker 内部又包含 FilterInvoker,进而组成 FilterInvoker 链,链尾是 ProviderInvoker,不包含 Filter;其他节点是普通 FilterInvoker,包含 Filter 实例;
  • ProviderInvoker:真正的执行器

执行图:


image.png

首先会构造调用链,然后当执行时,ProviderProxyInvoker 会调用 FilterChain,FilterChain 调用 FilterInvoker,FilterInvoker 判断如果存在 Filter,则执行 Filter;否则,直接执行 FilterInvoker;Filter 内部的实现就要看如何实现了,如果不在向后边的 FilterInvoker 进行调用,则 Filter 执行结束后,调用链就执行结束了;否则,继续执行下一个 FilterInvoker。最后,执行到链尾的 ProviderInvoker,ProviderInvoker 是一个没有 Filter 属性的特殊的 FilterInvoker 子类,在 ProviderInvoker 中,进行反射调用。

构造调用链

============================= Invoker =============================
public interface Invoker {
    /**
     * 执行调用
     * @param request 请求
     * @return SofaResponse 响应
     * @throws SofaRpcException rpc异常
     */
    SofaResponse invoke(SofaRequest request) throws SofaRpcException;
}

============================= ProviderProxyInvoker =============================
public class ProviderProxyInvoker implements Invoker {
    // 服务端配置信息
    private final ProviderConfig providerConfig;
    // 调用器执行链
    private final FilterChain    filterChain;

    // 构造执行链
    public ProviderProxyInvoker(ProviderConfig providerConfig) {
        this.providerConfig = providerConfig;
        // 最底层是调用过滤器
        this.filterChain = FilterChain.buildProviderChain(providerConfig, new ProviderInvoker(providerConfig));
    }

    // proxy拦截的调用
    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        return filterChain.invoke(request);
    }
}

============================= FilterChain =============================
public class FilterChain implements Invoker {
    // 服务端自动激活的 {"alias":ExtensionClass}
    private final static Map<String, ExtensionClass<Filter>> PROVIDER_AUTO_ACTIVES = Collections.synchronizedMap(new LinkedHashMap<String, ExtensionClass<Filter>>());
    // 调用端自动激活的 {"alias":ExtensionClass}
    private final static Map<String, ExtensionClass<Filter>> CONSUMER_AUTO_ACTIVES = Collections.synchronizedMap(new LinkedHashMap<String, ExtensionClass<Filter>>());
    // 扩展加载器
    private final static ExtensionLoader<Filter> EXTENSION_LOADER = buildLoader();
    private static ExtensionLoader<Filter> buildLoader() {
        // 每加载成功一个 Filter 的 ExtensionClass,执行一次加载监听器
        // 如果 class 有 @AutoActive 注解,则根据配置添加到不同的激活映射中
        return ExtensionLoaderFactory.getExtensionLoader(Filter.class, new ExtensionLoaderListener<Filter>() {
            @Override
            public void onLoad(ExtensionClass<Filter> extensionClass) {
                Class<? extends Filter> implClass = extensionClass.getClazz();
                AutoActive autoActive = implClass.getAnnotation(AutoActive.class);
                if (autoActive != null) {
                    String alias = extensionClass.getAlias();
                    if (autoActive.providerSide()) {
                        PROVIDER_AUTO_ACTIVES.put(alias, extensionClass);
                    }
                    if (autoActive.consumerSide()) {
                        CONSUMER_AUTO_ACTIVES.put(alias, extensionClass);
                    }
                }
            }
        });
    }

    // 调用链
    private FilterInvoker invokerChain;
    // 过滤器列表,从底至上排序; 用于对异步返回的 response 做 filter 链的操作
    private List<Filter> loadedFilters;

    // 构造执行链
    protected FilterChain(List<Filter> filters, FilterInvoker lastInvoker, AbstractInterfaceConfig config) {
        // 调用过程外面包装多层自定义filter
        // 前面的过滤器在最外层,即优先级越小,越先执行
        invokerChain = lastInvoker;
        loadedFilters = new ArrayList<Filter>();
        for (int i = filters.size() - 1; i >= 0; i--) {
            Filter filter = filters.get(i);
            if (filter.needToLoad(invokerChain)) {
                invokerChain = new FilterInvoker(filter, invokerChain, config);
                // cache this for filter when async respond
                loadedFilters.add(filter);
            }
        }
    }

    /**
     * 构造服务端的执行链
     *
     * @param providerConfig provider配置
     * @param lastFilter     最后一个filter
     * @return filter执行链
     */
    public static FilterChain buildProviderChain(ProviderConfig<?> providerConfig, FilterInvoker lastFilter) {
        return new FilterChain(selectActualFilters(providerConfig, PROVIDER_AUTO_ACTIVES), lastFilter, providerConfig);
    }

    /**
     * 构造调用端的执行链
     *
     * @param consumerConfig consumer配置
     * @param lastFilter     最后一个filter
     * @return filter执行链
     */
    public static FilterChain buildConsumerChain(ConsumerConfig<?> consumerConfig, FilterInvoker lastFilter) {
        return new FilterChain(selectActualFilters(consumerConfig, CONSUMER_AUTO_ACTIVES), lastFilter, consumerConfig);
    }

    /**
     * 判断是否需要排除自定义过滤器
     */
    private static HashSet<String> parseExcludeFilter(List<Filter> customFilters) {
        HashSet<String> excludeKeys = new HashSet<String>();
        for (Filter filter : customFilters) {
            if (filter instanceof ExcludeFilter) {
                // 存在需要排除的过滤器
                ExcludeFilter excludeFilter = (ExcludeFilter) filter;
                String excludeName = excludeFilter.getExcludeName();
                String excludeFilterName = startsWithExcludePrefix(excludeName) ? excludeName.substring(1) : excludeName;
                excludeKeys.add(excludeFilterName);
                customFilters.remove(filter);
            }
        }
        return excludeKeys;
    }

    private static boolean startsWithExcludePrefix(String excludeName) {
        char c = excludeName.charAt(0);
        return c == '-' || c == '!';
    }

    /**
     * 选择出真正的过滤器列表,按照 order 进行排序,order 设置在 @Extension 注解中
     * order 由高到低:
     * 1. 自定义的Filter
     * 2. 高 order 的自动激活的系统内置过滤器
     * 3. 低 order 的自动激活的系统内置过滤器
     * 
     * 例如自动装载扩展 A(a),B(b),C(c)  filter=[-a,d]  filterRef=[new E, new Exclude(b)]
     * 逻辑如下:
     * 1.解析config.getFilterRef(),记录E和-b
     * 2.解析config.getFilter()字符串,记录 d 和 -a,-b
     * 3.再解析自动装载扩展,a,b被排除了,所以拿到c,d
     * 4.对c d进行排序
     * 5.拿到C、D实现类
     * 6.加上自定义,返回C、D、E
     */
    private static List<Filter> selectActualFilters(AbstractInterfaceConfig config, Map<String, ExtensionClass<Filter>> autoActiveFilters) {

        // 1. 处理config.getFilterRef():用户通过自己new实例的方式注入的filter,优先级最高
        List<Filter> customFilters = config.getFilterRef() == null ? new ArrayList<Filter>() : new CopyOnWriteArrayList<Filter>(config.getFilterRef());
        // 2. 排除需要排除的自定义过滤器
        HashSet<String> excludes = parseExcludeFilter(customFilters);

        List<ExtensionClass<Filter>> extensionFilters = new ArrayList<ExtensionClass<Filter>>();
        // 3. 处理config.getFilter():用户通过别名的方式注入的filter,根据别名进行 SPI 装载
        List<String> filterAliases = config.getFilter();
        for (String filterAlias : filterAliases) {
            if (startsWithExcludePrefix(filterAlias)) { // 排除用的特殊字符
                excludes.add(filterAlias.substring(1));
            } else {
                ExtensionClass<Filter> filter = EXTENSION_LOADER.getExtensionClass(filterAlias);
                if (filter != null) {
                    extensionFilters.add(filter);
                }
            }
        }
        // 4. 解析自动加载的过滤器(配了-*和-default表示不加载内置的系统过滤器)
        if (!excludes.contains(StringUtils.ALL) && !excludes.contains(StringUtils.DEFAULT)) {
            for (Map.Entry<String, ExtensionClass<Filter>> entry : autoActiveFilters.entrySet()) {
                if (!excludes.contains(entry.getKey())) {
                    extensionFilters.add(entry.getValue());
                }
            }
        }
        // 5. 按order从小到大排序
        if (extensionFilters.size() > 1) {
            Collections.sort(extensionFilters, new OrderedComparator<ExtensionClass<Filter>>());
        }
        List<Filter> actualFilters = new ArrayList<Filter>();
        for (ExtensionClass<Filter> extensionFilter : extensionFilters) {
            actualFilters.add(extensionFilter.getExtInstance());
        }
        // 6. 加入自定义的过滤器
        actualFilters.addAll(customFilters);
        return actualFilters;
    }

    @Override
    public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
        return invokerChain.invoke(sofaRequest);
    }

    // Do filtering when async respond from server
    public void onAsyncResponse(ConsumerConfig config, SofaRequest request, SofaResponse response, Throwable throwable)
            throws SofaRpcException {
        for (Filter loadedFilter : loadedFilters) {
            loadedFilter.onAsyncResponse(config, request, response, throwable);
        }
    }
}

SOFARPC Filter 链的设计,Filter 的优先级越小,即 order 越小,越先执行:
即执行顺序:低 order 的系统内嵌 Filter -> 高 order 的系统内嵌 Filter -> 用户自定义 Filter

@ThreadSafe
public class FilterInvoker implements Invoker {
    // 过滤器
    protected Filter                  nextFilter;
    // 执行器
    protected FilterInvoker           invoker;
    
    public FilterInvoker(Filter nextFilter, FilterInvoker invoker, AbstractInterfaceConfig config) {
        this.nextFilter = nextFilter;
        this.invoker = invoker;
    }

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        return nextFilter == null ? invoker.invoke(request) : nextFilter.invoke(invoker, request);
    }
}

FilterInvoker 判断如果存在 Filter,则执行 Filter;否则,直接执行 FilterInvoker
以 ProviderBaggageFilter Filter 为例

public class ProviderBaggageFilter extends Filter {
    @Override
    public boolean needToLoad(FilterInvoker invoker) {
        return RpcInvokeContext.isBaggageEnable();
    }

    @Override
    public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
        ...
        response = invoker.invoke(request); // 执行下一个 FilterInvoker
        ...
    }
}

真正的执行器

public class ProviderInvoker<T> extends FilterInvoker {
    private final ProviderConfig<T> providerConfig;

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse sofaResponse = new SofaResponse();
        try {
            // 反射 真正调用业务代码
            Method method = request.getMethod();
            if (method == null) {
                throw new SofaRpcException(RpcErrorType.SERVER_FILTER, "Need decode method first!");
            }
            Object result = method.invoke(providerConfig.getRef(), request.getMethodArgs());
            // 设置响应
            sofaResponse.setAppResponse(result);
        } catch (IllegalArgumentException e) { // 非法参数,可能是实现类和接口类不对应
            sofaResponse.setErrorMsg(e.getMessage());
        } catch (IllegalAccessException e) { // 如果此 Method 对象强制执行 Java 语言访问控制,并且底层方法是不可访问的
            sofaResponse.setErrorMsg(e.getMessage());
        } catch (InvocationTargetException e) { // 业务代码抛出异常
            sofaResponse.setAppResponse(e.getCause());
        } finally {
            ...
        }
        return sofaResponse;
    }
}

值得注意的是,此处的调用直接使用了反射调用,并未使用类似于 dubbo 的方式:使用 javassist 动态生成实现类的包装类来避免反射调用。Dubbo 的处理方式见 https://www.cnblogs.com/java-zhao/p/7625596.html
SOFARPC 为了避免反射获取 Method,会在向 Server 注册请求调用器的时候,缓存 Mehod 到 {service:{方法名#(参数列表):Method}} 映射中,在 BoltServerProcessor 根据 SofaRequest 传递来的服务名、方法名和参数列表从方法缓存映射中获取 Method,进而进行调用。

关于 Dubbo 的无反射调用和 SOFARPC 的反射调用对比:https://github.com/alipay/sofa-rpc/issues/329

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容