grpc-go源码阅读(3)grpc invoke一次请求的过程

1.grpc.invoke

// Invoke sends the RPC request on the wire and returns after response is received.
// Invoke is called by generated code. Also users can call Invoke directly when it
// is really needed in their use cases.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    //如果在dial的时候WithUnaryInterceptor  则会先调用拦截器(可以做一些传递数据,日志等等事情)
    if cc.dopts.unaryInt != nil {
        return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
    }
    return invoke(ctx, method, args, reply, cc, opts...)
}

invoke

func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
    c := defaultCallInfo //默认的callinfo配置
    if mc, ok := cc.getMethodConfig(method); ok {//获取methodConfig的配置
        c.failFast = !mc.WaitForReady
        if mc.Timeout > 0 {
            var cancel context.CancelFunc
            ctx, cancel = context.WithTimeout(ctx, mc.Timeout)
            defer cancel()
        }
    }
    for _, o := range opts {
        if err := o.before(&c); err != nil { //调用之前的处理
            return toRPCErr(err)
        }
    }
    defer func() {
        for _, o := range opts { //调用之后的处理
            o.after(&c)
        }
    }()
    if EnableTracing {
        c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
        defer c.traceInfo.tr.Finish()
        c.traceInfo.firstLine.client = true
        if deadline, ok := ctx.Deadline(); ok {
            c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
        }
        c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)
        // TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.
        defer func() {
            if e != nil {
                c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)
                c.traceInfo.tr.SetError()
            }
        }()
    }
    sh := cc.dopts.copts.StatsHandler//通过WithStatsHandler来设置的, 对调用前后的状态进行掌控
    if sh != nil {
        ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})//附加一些信息到ctx
        begin := &stats.Begin{
            Client:    true,
            BeginTime: time.Now(),
            FailFast:  c.failFast,
        }
        sh.HandleRPC(ctx, begin)
    }
    defer func() {
        if sh != nil {
            end := &stats.End{
                Client:  true,
                EndTime: time.Now(),
                Error:   e,
            }
            sh.HandleRPC(ctx, end)
        }
    }()
    topts := &transport.Options{
        Last:  true,
        Delay: false,
    }
    for {//进入循环
        var (
            err    error
            t      transport.ClientTransport
            stream *transport.Stream
            // Record the put handler from Balancer.Get(...). It is called once the
            // RPC has completed or failed.
            put func()
        )
        // TODO(zhaoq): Need a formal spec of fail-fast.
        callHdr := &transport.CallHdr{
            Host:   cc.authority,
            Method: method,
        }
        if cc.dopts.cp != nil {//发送的时候的数据压缩算法
            callHdr.SendCompress = cc.dopts.cp.Type()
        }

        gopts := BalancerGetOptions{
            BlockingWait: !c.failFast,// 默认failFast=true, 所以Balancer Get不到有效的地址的时候不会block
        }
        //获取一个连接
        t, put, err = cc.getTransport(ctx, gopts)
        if err != nil {
            // TODO(zhaoq): Probably revisit the error handling.
            if _, ok := err.(*rpcError); ok {
                return err
            }
            if err == errConnClosing || err == errConnUnavailable {
                if c.failFast {//默认是true
                    return Errorf(codes.Unavailable, "%v", err)
                }
                continue
            }
            // All the other errors are treated as Internal errors.
            return Errorf(codes.Internal, "%v", err)
        }
        if c.traceInfo.tr != nil {
            c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
        }
        //发送请求
        stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, t, args, topts)
        if err != nil {
            if put != nil {
                put()
                put = nil
            }
            // Retry a non-failfast RPC when
            // i) there is a connection error; or
            // ii) the server started to drain before this RPC was initiated.
            if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
                if c.failFast {//默认是true
                    return toRPCErr(err)
                }
                continue
            }
            return toRPCErr(err)
        }
        //接受响应数据,里面是阻塞等待的
        err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
        if err != nil {
            if put != nil {
                put()
                put = nil
            }
            if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
                if c.failFast {
                    return toRPCErr(err)
                }
                continue
            }
            return toRPCErr(err)
        }
        if c.traceInfo.tr != nil {
            c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
        }
        t.CloseStream(stream, nil)
        if put != nil {
            put()
            put = nil
        }
        return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
    }
}

默认的defaultCallInfo
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
    failFast  bool
    headerMD  metadata.MD
    trailerMD metadata.MD
    peer      *peer.Peer
    traceInfo traceInfo // in trace.go
}

var defaultCallInfo = callInfo{failFast: true}

2.(cc *ClientConn) getTransport

func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
    var (
        ac  *addrConn
        ok  bool
        put func()
    )
    if cc.dopts.balancer == nil { //如果没有设置balancer,只会有一个地址,直接返回
        // If balancer is nil, there should be only one addrConn available.
        cc.mu.RLock()
        if cc.conns == nil {
            cc.mu.RUnlock()
            return nil, nil, toRPCErr(ErrClientConnClosing)
        }
        for _, ac = range cc.conns {
            // Break after the first iteration to get the first addrConn.
            ok = true
            break
        }
        cc.mu.RUnlock()
    } else { //如果有设置balancer(etcd等),根据策略选一个(默认是轮询)
        var (
            addr Address
            err  error
        )
        //得到一个地址,如果!BlockingWait即failfast(默认),不保证这个地址一定是有效的;反之,则能保证
        addr, put, err = cc.dopts.balancer.Get(ctx, opts) //(rr *roundRobin) Get
        if err != nil {
            return nil, nil, toRPCErr(err)
        }
        cc.mu.RLock()
        if cc.conns == nil {
            cc.mu.RUnlock()
            return nil, nil, toRPCErr(ErrClientConnClosing)
        }
        ac, ok = cc.conns[addr]
        cc.mu.RUnlock()
    }
    if !ok { //如果这个地址不在cc.conns里,说明这个地址已经被删了(比如etcd中掉了)
        if put != nil {
            put()
        }
        return nil, nil, errConnClosing
    }
    //等待一个连接,默认情况下,这里是保证连接ok,如果不ok会返回错误
    t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
    if err != nil {
        if put != nil {
            put()
        }
        return nil, nil, err
    }
    return t, put, nil
}

3.(ac *addrConn) wait

//等待failfast默认是true)
//默认情况下 ac.state=Shutdown,ready,TransientFailure,直接返回
//否则,ac.state=Connecting等,则等待ready(成功或者失败),直到ctx超时
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
    for {
        ac.mu.Lock()
        switch {
        case ac.state == Shutdown:
            if failfast || !hasBalancer {
                // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
                err := ac.tearDownErr
                ac.mu.Unlock()
                return nil, err
            }
            ac.mu.Unlock()
            return nil, errConnClosing
        case ac.state == Ready:
            ct := ac.transport
            ac.mu.Unlock()
            return ct, nil
        case ac.state == TransientFailure:
            if failfast || hasBalancer {
                ac.mu.Unlock()
                return nil, errConnUnavailable
            }
        }
        //ac.state=Connecting,默认情况下,走到这里,表示正在连接~~,所以等一等~~
        ready := ac.ready
        if ready == nil {
            ready = make(chan struct{})
            ac.ready = ready
        }
        ac.mu.Unlock()
        select {
        case <-ctx.Done(): //这个请求被cancel了(超时等等)
            return nil, toRPCErr(ctx.Err())
        // Wait until the new transport is ready or failed.
        //不管成功还是失败,都会close(ac.ready)
        case <-ready:
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容