GRPC源码实例解析(二)——UnaryRPC Client 篇

上篇主要介绍了server端的流程,这篇的关注点是Client端的流程,同样只列出核心主流程代码。

// 调用接口
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
    out := new(HelloReply)
    err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}
// 执行接口,通过ClientStream 利用用transport层http2协议发送和接收消息被完成序列化和反序列化
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}
// 构建核心clientStream对象
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error){
    var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
        return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
    }
    return newStream(ctx, func() {})
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
    callHdr := &transport.CallHdr{
        Host:           cc.authority,
        Method:         method,
        ContentSubtype: c.contentSubtype,
        DoneFunc:       doneFunc,
    }

    cs := &clientStream{
        callHdr:      callHdr,
        ...
        ...

    }
    // 构建attempt,attempt是在transport层用stream在clientStream中完成实际的sendMsg,并实现retry机制
    cs.newAttemptLocked(sh, trInfo)
    op := func(a *csAttempt) error { return a.newStream() }
    if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
        cs.finish(err)
        return nil, err
    }
}
func (cs *clientStream) SendMsg(m interface{}) (err error) {
    // 预处理数据, 将数据encode并压缩得到hdr、data为构建http 2 frame做准备
    hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
    // 利用Retry机制,通过csAttempt实际执行向server端发送消息
    op := func(a *csAttempt) error {
        err := a.sendMsg(m, hdr, payload, data)
        m, data = nil, nil
        return err
    }
    err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
}
// 通过csAttempt将构建好的http2 dataFrame写入stream中,等于通过http2方式向服务器发数据。
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
    a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams})
}

// 将构建好的http2 dataFrame 写入流中
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {

    df := &dataFrame{
        streamID:  s.id,
        endStream: opts.Last,
        h:         hdr,
        d:         data,
    }
  return t.controlBuf.put(df)
}
// 利用retry机制,调用csAttempt recvMsg
func (cs *clientStream) RecvMsg(m interface{}) error {
    err := cs.withRetry(func(a *csAttempt) error {
        return a.recvMsg(m, recvInfo)
    }, cs.commitAttemptLocked)
}

func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
      err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
}

// 实现数据解压,利用预定义Codec Unmarshal数据得到reply,在本例中就是HelloReply结构体
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
    d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
    c.Unmarshal(d, m);
}
``

GRPC源码实例解析(一)
https://www.jianshu.com/p/8bbc6dc36859
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容