上篇主要介绍了server端的流程,这篇的关注点是Client端的流程,同样只列出核心主流程代码。
// 调用接口
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// 执行接口,通过ClientStream 利用用transport层http2协议发送和接收消息被完成序列化和反序列化
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
// 构建核心clientStream对象
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error){
var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
}
return newStream(ctx, func() {})
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
DoneFunc: doneFunc,
}
cs := &clientStream{
callHdr: callHdr,
...
...
}
// 构建attempt,attempt是在transport层用stream在clientStream中完成实际的sendMsg,并实现retry机制
cs.newAttemptLocked(sh, trInfo)
op := func(a *csAttempt) error { return a.newStream() }
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
cs.finish(err)
return nil, err
}
}
func (cs *clientStream) SendMsg(m interface{}) (err error) {
// 预处理数据, 将数据encode并压缩得到hdr、data为构建http 2 frame做准备
hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
// 利用Retry机制,通过csAttempt实际执行向server端发送消息
op := func(a *csAttempt) error {
err := a.sendMsg(m, hdr, payload, data)
m, data = nil, nil
return err
}
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
}
// 通过csAttempt将构建好的http2 dataFrame写入stream中,等于通过http2方式向服务器发数据。
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams})
}
// 将构建好的http2 dataFrame 写入流中
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
h: hdr,
d: data,
}
return t.controlBuf.put(df)
}
// 利用retry机制,调用csAttempt recvMsg
func (cs *clientStream) RecvMsg(m interface{}) error {
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
}
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
}
// 实现数据解压,利用预定义Codec Unmarshal数据得到reply,在本例中就是HelloReply结构体
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
c.Unmarshal(d, m);
}
``
GRPC源码实例解析(一)
https://www.jianshu.com/p/8bbc6dc36859