pigeon源码分析-处理请求结果

RPC调用处理请求结果可以分为两部分:

  • 获取 response
  • 关联 request 和 response

这么分看起来似乎很奇怪,不是直接等待处理完获取结果就行了吗?

我们说 RPC 调用都是在模拟这个动作: Result result = service.call(args); 但是远程调用毕竟不是本地调用(其实稍后可以看到还是有相似之处的),将请求写到网络之后,就无法命令远端做任何事了,这次请求就已经告一段落了。

pigeon client 只知道:

  • 向网络写数据;
    • 就是写 request
  • 处理网络写入的数据;
    • 处理成 response

于是不难理解为何有此一问:网络另一端写过来的数据,我怎么知道是哪个请求的返回值呢?

获取 response

其实从网络读取数据,转化成 Object。 pigeon 基于 netty,获取 response 就是处理网络写入。

具体实现在:

 // com.dianping.pigeon.remoting.netty.invoker.NettyClientHandler#messageReceived
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        CodecEvent codecEvent = (CodecEvent) e.getMessage();

        if (codecEvent.isValid() && codecEvent.getInvocation() != null) {
            client.processResponse((InvocationResponse) codecEvent.getInvocation());
        }
    }

从 netty 封装 MessageEvent 转换成 CodecEvent,再剥开一层就是 InvocationResponse,当然这还是一个比较泛化的返回值,com.dianping.pigeon.remoting.invoker.process.ResponseProcessor 将会进一步处理;

继续往下看:

// com.dianping.pigeon.remoting.invoker.process.threadpool.ResponseThreadPoolProcessor#doProcessResponse
public void doProcessResponse(final InvocationResponse response, final Client client) {
        Runnable task = new Runnable() {
            public void run() {
                ServiceInvocationRepository.getInstance().receiveResponse(response);
            }
        };
        try {
            responseProcessThreadPool.execute(task);
        } catch (RejectedExecutionException e) {
            String error = String.format("process response failed:%s, processor stats:%s", response,
                    getProcessorStatistics());
            throw new RejectedException(error, e);
        }
    }

这里封装成了一个 task,交给线程池处理。

再下一层:

// com.dianping.pigeon.remoting.invoker.service.ServiceInvocationRepository#receiveResponse
public class ServiceInvocationRepository {
    // 略
    private static Map<Long, RemoteInvocationBean> invocations = new ConcurrentHashMap<Long, RemoteInvocationBean>();


    public void receiveResponse(InvocationResponse response) {
        RemoteInvocationBean invocationBean = invocations.get(response.getSequence());
        if (invocationBean != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("received response:" + response);
            }
            InvocationRequest request = invocationBean.request;
            try {
                Callback callback = invocationBean.callback;
                if (callback != null) {
                    Client client = callback.getClient();
                    if (client != null) {
                        ServiceStatisticsHolder.flowOut(request, client.getAddress());
                    }
                    callback.callback(response);
                    callback.run();
                }
            } finally {
                invocations.remove(response.getSequence());
            }
        }
    }
// 略
}

这里可以看到, RemoteInvocationBean invocationBean = invocations.get(response.getSequence()); invocations 维护一个 HashMap,key 是一个 long 型的 sequenceId,通过这种方式定位到 invocationBean,而 invocationBean 看实现可知持有 request 引用,以及一个处理返回值的 callback。

Callback 在不同调用模式(sync / future/ oneway/ callback) 下有不同实现类,比如 sync 模式下:

// public class CallbackFuture implements Callback, CallFuture {   
@Override
    public void callback(InvocationResponse response) {
        this.response = response;
    }

sync 和 future 调用,都是将 response 对象实例设置给相应的引用

何时得到真正的 returnValue?

看动态代理的逻辑,com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#invoke

com.dianping.pigeon.remoting.invoker.process.filter.InvocationInvokeFilter 挨个执行完之后,提取返回值

// com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#extractResult
    public Object extractResult(InvocationResponse response, Class<?> returnType) throws Throwable {
        Object responseReturn = response.getReturn();
        // ...
}

拿到的是顶层父类实例 Object,具体的类型匹配需要客户端和服务端自行匹配

关联 request 和 response

从上面的分析可以看出,关键点就在于 sequence,每次调用应该有个唯一的 id 进行匹配

这个 sequence 是唯一的吗?

如果不唯一,就可能导致拿到错误的处理结果。

sequence 的生成位置

//com.dianping.pigeon.remoting.invoker.process.filter.ContextPrepareInvokeFilter#initRequest

private static AtomicLong requestSequenceMaker = new AtomicLong();

request.setSequence(requestSequenceMaker.incrementAndGet() * -1);

可以看到 这个 sequence 是全局唯一的,准确说是同一个 JVM 中是唯一的,而且是 long 类型,足够大;

Q:sequence 发生回绕怎么办?

A:long 类型,即使发生回绕,也需要足够长的时间,一般来说不会堆积有那么多的请求,导致两个相同的 sequenceId 实际对应不同请求;

分布式环境下,sequenceId 在多个机器上可能重复,会出错吗?

A:sequence 存储的 com.dianping.pigeon.remoting.invoker.service.ServiceInvocationRepository#invocations也是同一个 JVM 唯一的,所以只需要担心会不会有这样的场景:

client A 调用 server A,client B 也调用 server A,但是 server A 把 client B的请求返回值处理之后发送到了 client A?

看看服务端的处理,写返回值:

//com.dianping.pigeon.remoting.provider.process.filter.WriteResponseProcessFilter#invoke
    public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderContext invocationContext)
            throws Throwable {
        try {
            ProviderChannel channel = invocationContext.getChannel();
            InvocationRequest request = invocationContext.getRequest();
            InvocationResponse response = handler.handle(invocationContext);
            if (request.getCallType() == Constants.CALLTYPE_REPLY) {
                invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
                channel.write(invocationContext, response);
                invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
            }
        // ...

channel 就是对socket的封装,可以看成是 client / server 对对方的抽象。

那么只需要保证拿到正确的 channel 就对了:

//com.dianping.pigeon.remoting.netty.provider.NettyServerHandler#messageReceived
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {
        CodecEvent codecEvent = (CodecEvent) (message.getMessage());

        if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {
            return;
        }

        InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();

        ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));
        //...
    }

也就是说,再 client 端 writeRequest() 之后,server 端读取网络数据的时候就从 context 中获取到 client 所在的 channel 了,简单来说,”从哪里来,到哪里去“。

总的来说,不会发生以上所述的 sequenceId 错乱的问题。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • pigeon源码分析-同步调用和异步调用 Pigeon是美团点评内部广泛使用的一个分布式服务通信框架(RPC),本...
    WhiteBase阅读 1,553评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,027评论 19 139
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 11,218评论 6 13
  • 1.感谢最强大脑,发现了自己观察视角的不同,会发现最强大脑观众席抓拍到的观众都没有化妆,是因为受众不同。 2.发现...
    张洪瑜阅读 177评论 0 0
  • 这还是高一时候的事。好吧,说我记仇也好,怀旧也罢。就这样在心里生了根,后来发了芽。 是的,我喜欢上一个人,故事太多...
    Grexogr阅读 184评论 0 1