RPC框架Pigeon简析(四)-- 服务端对请求的响应

接上文,在客户端发出请求后,首先处理的自然是netty,在IO处理之后,就进入业务处理NettyServerHandler。最终的处理任务就落在了RequestThreadPoolProcessor这个类身上,主要方法是doProcessRequest

public Future<InvocationResponse> doProcessRequest(final InvocationRequest request,
                                                       final ProviderContext providerContext) {
        requestContextMap.put(request, providerContext);
        doMonitorData(request, providerContext);
        Callable<InvocationResponse> requestExecutor = new Callable<InvocationResponse>() {

            @Override
            public InvocationResponse call() throws Exception {
                providerContext.getTimeline().add(new TimePoint(TimePhase.T));
                try {
                    ServiceInvocationHandler invocationHandler = ProviderProcessHandlerFactory
                            .selectInvocationHandler(providerContext.getRequest().getMessageType());
                    if (invocationHandler != null) {
                        providerContext.setThread(Thread.currentThread());
                        return invocationHandler.handle(providerContext);
                    }
                } catch (Throwable t) {
                    logger.error("Process request failed with invocation handler, you should never be here.", t);
                } finally {
                    requestContextMap.remove(request);
                }
                return null;
            }
        };
        final ThreadPool pool = selectThreadPool(request);
        try {
            checkRequest(pool, request);
            providerContext.getTimeline().add(new TimePoint(TimePhase.T));
            return pool.submit(requestExecutor);
        } catch (RejectedExecutionException e) {
            requestContextMap.remove(request);
            throw new RejectedException(getProcessorStatistics(pool), e);
        }

    }

requestExecutor是整个请求的执行器,内部和请求一样,通过chain-filter来处理。selectThreadPool方法是来选择一个线程池来处理请求。pigeon提供了方法级别的线程池、服务级别的线程池和用户自定义的线程池。默认是使用共享线程池和慢速线程池。为什么会有这么多种线程池呢?比如默认提供的两种,是考虑如果有请求比较缓慢,那么会将请求扔到慢速线程池,这样不会因为几个慢速task,堵死整个线程池。同理,开发人员也可以用自己的策略来使用不同的线程池处理task。
接下来,看看服务端的filter

                registerBizProcessFilter(new TraceFilter());
        if (Constants.MONITOR_ENABLE) {
            registerBizProcessFilter(new MonitorProcessFilter());
        }
        registerBizProcessFilter(new WriteResponseProcessFilter());
        registerBizProcessFilter(new ContextTransferProcessFilter());
        registerBizProcessFilter(new ExceptionProcessFilter());
        registerBizProcessFilter(new SecurityFilter());
        registerBizProcessFilter(new GatewayProcessFilter());
        registerBizProcessFilter(new BusinessProcessFilter());
        bizInvocationHandler = createInvocationHandler(bizProcessFilters);

TraceFilter和客户端类似。MonitorProcessFilter是用来处理监控的,这个要看监控开关是否打开。WriteResponseProcessFilter是利用netty将结果返回给客户端。ContextTransferProcessFilter是用来处理请求参数的。ExceptionProcessFilter是用来处理在调用过程中发生的异常,之前的版本是没有这个filter的,就会导致如果服务端的代码没有捕获异常直接抛出,异常信息没有办法正常的传递给客户端。SecurityFilter是安全相关的filter,包括黑名单、白名单、请求秘钥等等。GatewayProcessFilter处理了一些请求数拦截的事情,比如可以设置某个服务最多请求多少次之类的。BusinessProcessFilter核心就是通过反射直接调用请求的方法,完成远程方法调用。

以上两篇就是整个RPC的请求和应答处理的过程,其实比较简单,后面两遍针对一些特定的问题,讲述下pigeon是用了哪些方法解决的,主要有一些很有用的小技巧,赶脚看了源码之后学到不少。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 综述 netty通...
    jiangmo阅读 11,259评论 0 13
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,488评论 11 349
  • 第一章 Nginx简介 Nginx是什么 没有听过Nginx?那么一定听过它的“同行”Apache吧!Ngi...
    JokerW阅读 32,914评论 24 1,002
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,850评论 19 139
  • 我是日记星球的34号星宝宝,正在参加孙老师的日记星球21天蜕变之旅的写作训练。这是我的原创第159篇,我相信日积月...
    誉仔妈妈阅读 5,902评论 0 6

友情链接更多精彩内容