grpc-go彻底弄清http2协议如何解析

大家好,我是dandyhuang。最近有个朋友,问http2协议解析的时候。request body获取的时候是否可以避免压缩。因为他们的业务是proxy,不希望解析body部分。借此看了一下grpc-go解析。之前就一直听说h2h1好,并且有很多优势。

HTTP/2相较于HTTP/1.1改进

  • 二进制分帧

  • 头部压缩

  • 数据流

  • 多向请求与响应

  • 请求优先级

  • 流量控制

  • 服务器推送

HTTP 2.0 协议详解

可以先了解h2基础,对后续理解代码也会更有帮忙,可以查阅HTTP 2.0 协议详解

grpc-go是如何实现的

func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
  ...
    // Finish handshaking (HTTP2)
    st := s.newHTTP2Transport(rawConn)
    rawConn.SetDeadline(time.Time{})
    if st == nil {
        return
    }

    if !s.addConn(lisAddr, st) {
        return
    }
    go func() {
        s.serveStreams(st)
        s.removeConn(lisAddr, st)
    }()
}
  • server.go中,handleRawConn中获取client发送的tcp数据。newHTTP2Transport中获取完成了http2中,前期SETTINGS帧的发送确认,组成Connection Preface(连接序言)。

Connection Preface连接序言

newHTTP2Transport中,transport.NewServerTransport创建。这里实现了http2的握手过程,可以理解为在tcp基础上,h2自身又实现了一次握手。

func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
    var authInfo credentials.AuthInfo
    rawConn := conn
    if config.Credentials != nil {
        ...
    }
    writeBufSize := config.WriteBufferSize
    readBufSize := config.ReadBufferSize
    maxHeaderListSize := defaultServerMaxHeaderListSize
    if config.MaxHeaderListSize != nil {
        maxHeaderListSize = *config.MaxHeaderListSize
    }
  // framer初始化
    framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
    // Send initial settings as connection preface to client.
    isettings := []http2.Setting{{
        ID:  http2.SettingMaxFrameSize,
        Val: http2MaxFrameLen,
    }}
    // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
    // permitted in the HTTP2 spec.
    maxStreams := config.MaxStreams
    if maxStreams == 0 {
        maxStreams = math.MaxUint32
    } else {
        isettings = append(isettings, http2.Setting{
            ID:  http2.SettingMaxConcurrentStreams,
            Val: maxStreams,
        })
    }
    ... // setting帧的设置
  // endWrite->调用的write是newBufWriter中的。还没有发送给client端
    if err := framer.fr.WriteSettings(isettings...); err != nil {
        return nil, connectionErrorf(false, err, "transport: %v", err)
    }
    // Adjust the connection flow control window if needed.
    if delta := uint32(icwz - defaultWindowSize); delta > 0 {
        if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
            return nil, connectionErrorf(false, err, "transport: %v", err)
        }
    }
    kp := config.KeepaliveParams
    if kp.MaxConnectionIdle == 0 {
        kp.MaxConnectionIdle = defaultMaxConnectionIdle
    }
    if kp.MaxConnectionAge == 0 {
        kp.MaxConnectionAge = defaultMaxConnectionAge
    }
    // Add a jitter to MaxConnectionAge.
    kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
    if kp.MaxConnectionAgeGrace == 0 {
        kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
    }
    if kp.Time == 0 {
        kp.Time = defaultServerKeepaliveTime
    }
    if kp.Timeout == 0 {
        kp.Timeout = defaultServerKeepaliveTimeout
    }
    kep := config.KeepalivePolicy
    if kep.MinTime == 0 {
        kep.MinTime = defaultKeepalivePolicyMinTime
    }

    done := make(chan struct{})
    t := &http2Server{
        ctx:               setConnection(context.Background(), rawConn),
        done:              done,
        conn:              conn,
        remoteAddr:        conn.RemoteAddr(),
        localAddr:         conn.LocalAddr(),
        authInfo:          authInfo,
        framer:            framer,
        readerDone:        make(chan struct{}),
        writerDone:        make(chan struct{}),
        maxStreams:        maxStreams,
        inTapHandle:       config.InTapHandle,
        fc:                &trInFlow{limit: uint32(icwz)},
        state:             reachable,
        activeStreams:     make(map[uint32]*Stream),
        stats:             config.StatsHandler,
        kp:                kp,
        idle:              time.Now(),
        kep:               kep,
        initialWindowSize: iwz,
        czData:            new(channelzData),
        bufferPool:        newBufferPool(),
    }
  // 后续窗口大小调整使用
    t.controlBuf = newControlBuffer(t.done)
    if dynamicWindow {
        t.bdpEst = &bdpEstimator{
            bdp:               initialWindowSize,
            updateFlowControl: t.updateFlowControl,
        }
    }
    if t.stats != nil {
        t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
            RemoteAddr: t.remoteAddr,
            LocalAddr:  t.localAddr,
        })
        connBegin := &stats.ConnBegin{}
        t.stats.HandleConn(t.ctx, connBegin)
    }
    if channelz.IsOn() {
        t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
    }

    t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
    // 套接字发送给client端,settting帧信息
    t.framer.writer.Flush()

    defer func() {
        if err != nil {
            t.Close()
        }
    }()

    // Check the validity of client preface.
    preface := make([]byte, len(clientPreface))
  // 接收client发送的preface数据
    if _, err := io.ReadFull(t.conn, preface); err != nil {
        // In deployments where a gRPC server runs behind a cloud load balancer
        // which performs regular TCP level health checks, the connection is
        // closed immediately by the latter.  Returning io.EOF here allows the
        // grpc server implementation to recognize this scenario and suppress
        // logging to reduce spam.
        if err == io.EOF {
            return nil, io.EOF
        }
        return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
    }
  // 校验数据的合法性 连接序言以字符串 "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" 开始
    if !bytes.Equal(preface, clientPreface) {
        return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
    }
    // 再次读取client端发送的setting帧
    frame, err := t.framer.fr.ReadFrame()
    if err == io.EOF || err == io.ErrUnexpectedEOF {
        return nil, err
    }
    if err != nil {
        return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
    }
    atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
    sf, ok := frame.(*http2.SettingsFrame)
    if !ok {
        return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
    }
  // setting帧ack回复
    t.handleSettings(sf)
    // 后续server端发送的数据帧,都是通过这个loopWriter来发送的
    go func() {
        t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
        t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
        if err := t.loopy.run(); err != nil {
            if logger.V(logLevel) {
                logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
            }
        }
        t.conn.Close()
        t.controlBuf.finish()
        close(t.writerDone)
    }()
  // 保活
    go t.keepalive()
    return t, nil
}
  • tcp链接建立成功后,server是先发送setting帧,包括服务端的初始设置,参数发送给client端。

  • 后在接收client的数据。收到的数据为连接序言以字符串 "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" 开始。这个序列后面必须跟一个可以为空的SETTINGS帧。这之后,就可以理解为完成一次http2的初始握手了。

  • handleSettings为settting帧ack回复的消息,发送给client端。并且是通过loopWriter去做发送的。

  • 另外这里启动了一个协程loopWriter。去处理后续server需要发送的二进制分帧的数据。

serveStreams解析二进制分帧包

func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
    defer close(t.readerDone)
    for {
    // 流量控制
        t.controlBuf.throttle()
    // 读取包头数据
        frame, err := t.framer.fr.ReadFrame()
        atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
        if err != nil {
            if se, ok := err.(http2.StreamError); ok {
                if logger.V(logLevel) {
                    logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
                }
                t.mu.Lock()
                s := t.activeStreams[se.StreamID]
                t.mu.Unlock()
                if s != nil {
                    t.closeStream(s, true, se.Code, false)
                } else {
                    t.controlBuf.put(&cleanupStream{
                        streamID: se.StreamID,
                        rst:      true,
                        rstCode:  se.Code,
                        onWrite:  func() {},
                    })
                }
                continue
            }
            if err == io.EOF || err == io.ErrUnexpectedEOF {
                t.Close()
                return
            }
            if logger.V(logLevel) {
                logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
            }
            t.Close()
            return
        }
    // 判断帧类型
        switch frame := frame.(type) {
        case *http2.MetaHeadersFrame:
      // 继续解析包头Meta、filed等数据
            if t.operateHeaders(frame, handle, traceCtx) {
                t.Close()
                break
            }
      // http body数据
        case *http2.DataFrame:
            t.handleData(frame)
      // rst帧
        case *http2.RSTStreamFrame:
            t.handleRSTStream(frame)
      // setting帧
        case *http2.SettingsFrame:
            t.handleSettings(frame)
        case *http2.PingFrame:
            t.handlePing(frame)
        case *http2.WindowUpdateFrame:
            t.handleWindowUpdate(frame)
        case *http2.GoAwayFrame:
            // TODO: Handle GoAway from the client appropriately.
        default:
            if logger.V(logLevel) {
                logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
            }
        }
    }
}

当时看读HandleStreams这段代码的时候,一直没有想明白,为什么body的数据还没解析获取到,就开始处理业务逻辑的。

  • 先获取到握手setting帧ack信息,之后接收其他帧数据。

  • h2协议中,包头HEADERS是先到的, DATA帧类是后到的。内部顺序是由tcp肯定是能保证成功的。

  • 解析operateHeaders过程:handle->handleStream()->processUnaryRPC()当时看到这里。HEADERS解析完,直接去处理RPC业务逻辑processUnaryRPC。而没有等待data帧的解析再去处理业务。继续深入

processUnaryRPC

func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
    sh := s.opts.statsHandler
    ...

    binlog := binarylog.GetMethodLogger(stream.Method())
    if binlog != nil {
        ... // binlog
    }

    var comp, decomp encoding.Compressor
    var cp Compressor
    var dc Decompressor
  // 压缩类型
    if s.opts.cp != nil {
        cp = s.opts.cp
        stream.SetSendCompress(cp.Type())
    } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
        // Legacy compressor not specified; attempt to respond with same encoding.
        comp = encoding.GetCompressor(rc)
        if comp != nil {
            stream.SetSendCompress(rc)
        }
    }

    var payInfo *payloadInfo
    if sh != nil || binlog != nil {
        payInfo = &payloadInfo{}
    }
  // 获取body数据
    d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
    if err != nil {
        if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
            channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status %v", e)
        }
        return err
    }
    if channelz.IsOn() {
        t.IncrMsgRecv()
    }
    df := func(v interface{}) error {
    // decode数据
        if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
            return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
        }
        if sh != nil {
            sh.HandleRPC(stream.Context(), &stats.InPayload{
                RecvTime:   time.Now(),
                Payload:    v,
                WireLength: payInfo.wireLength + headerLen,
                Data:       d,
                Length:     len(d),
            })
        }
        if binlog != nil {
            binlog.Log(&binarylog.ClientMessage{
                Message: d,
            })
        }
        if trInfo != nil {
            trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
        }
        return nil
    }
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  // rpc业务逻辑处理
    reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
    if appErr != nil {
        ...
        return appErr
    }
    // 回包处理
    if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
        ...
  }
    return err
}

  • 先获取压缩类型,recvAndDecompress获取body的数据。

  • protocol-http2协议详解中,body部分数据解析

  • 获取到body后,我们就是对起进行handler处理,decode-body数据。

  • 这里抛出疑惑,data数据帧我们是还没收到的,那是如何获取的呢

如何获取h2DATA数据

func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
  // 获取data数据
    pf, d, err := p.recvMsg(maxReceiveMessageSize)
    if err != nil {
        return nil, err
    }
    if payInfo != nil {
        payInfo.wireLength = len(d)
    }
    // payload校验
    if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
        return nil, st.Err()
    }

    var size int
  // 是否压缩
    if pf == compressionMade {
        // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
        // use this decompressor as the default.
        if dc != nil {
            d, err = dc.Do(bytes.NewReader(d))
            size = len(d)
        } else {
            d, size, err = decompress(compressor, d, maxReceiveMessageSize)
        }
        if err != nil {
            return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
        }
        if size > maxReceiveMessageSize {
            // TODO: Revisit the error code. Currently keep it consistent with java
            // implementation.
            return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
        }
    }
    return d, nil
}

func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) {
  // 获取5字节的body包头数据
    if _, err := p.r.Read(p.header[:]); err != nil {
        return 0, nil, err
    }
    //是否压缩
    pf = payloadFormat(p.header[0])
  // body长度,大端序模式
    length := binary.BigEndian.Uint32(p.header[1:])

    if length == 0 {
        return pf, nil, nil
    }
    if int64(length) > int64(maxInt) {
        return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
    }
    if int(length) > maxReceiveMessageSize {
        return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
    }
    // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
    // of making it for each message:
    msg = make([]byte, int(length))
  // 读取真正的body数据
    if _, err := p.r.Read(msg); err != nil {
        if err == io.EOF {
            err = io.ErrUnexpectedEOF
        }
        return 0, nil, err
    }
    return pf, msg, nil
}
  • recvAndDecompress中p.recvMsg是如何获取到的呢。

  • read嵌套很深:从parser.recvMsg->Stream.Read->io.ReadFull()->ReadAtLeast()->transportReader.Read()->recvBufferReader->read管道recv.get()阻塞住了。

  • 之前解析中,handleStream已经启动协程去处理handle回调了。所以即使这里阻塞住了,也不影响后续data帧的接收。

  • 后续读取channel中的数据。先读取5个字节的包头的数据,第一位为是否压缩,后四个字节为data长度,大端序模式。这里我们也可以看到request body的数据怎么避免压缩,如何去解决朋友说的这个问题。

DATA帧解析

func (t *http2Server) handleData(f *http2.DataFrame) {
  // 帧长度
    size := f.Header().Length
    // Select the right stream to dispatch.
  // stream信息
    s, ok := t.getStream(f)
    if !ok {
        return
    }
  // stream是否已经读取完毕
    if s.getState() == streamReadDone {
        t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
        return
    }
    if size > 0 {
        if err := s.fc.onData(size); err != nil {
            t.closeStream(s, true, http2.ErrCodeFlowControl, false)
            return
        }
        if f.Header().Flags.Has(http2.FlagDataPadded) {
            if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
                t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
            }
        }
        if len(f.Data()) > 0 {
            buffer := t.bufferPool.get()
            buffer.Reset()
            buffer.Write(f.Data())
      // 数据写入
            s.write(recvMsg{buffer: buffer})
        }
    }
  // 判断是否是结束帧
    if f.StreamEnded() {
        // Received the end of stream from the client.
        s.comp
    areAndSwapState(streamActive, streamReadDone)
        s.write(recvMsg{err: io.EOF})
    }
}
  • 这里s.write(recvMsg{buffer: buffer}),将buf的数据写入Stream->buf.recvBuffer->put进c中的recvMsg的channel中。和上面的read阻塞获取data帧数据。刚好一一对应,一个写入,一个读取。

总结

http2.png
  • 大家如果理解了tcp协议的大致流程,那么h2协议就不难理解

  • 从最开始学习http2的协议,了解client端是先发送HEAD帧,在发送DATA帧数据。

  • 一开始可能一直陷入为什么DATA的数据没有接受完,就可以处理业务逻辑。并且回调启协程没有太注意看。recvMsg中read的嵌套非常深。种种原因,叠加在一起,一开始确实一头雾水。不过拨开云雾,逐层慢慢分析,思路还是比较清晰的。

  • client(C)先和server(S)建立h2握手(settting帧),回复ack。后C继续发送HEAD帧。S端解析HEAD,启动goroutinechannel阻塞获取后续DATA数据,C继续发送DATA帧数据,S端解析DATA数据,并将body数据发送到channel中。刚启动的G获取到channel中的数据。就可以处理后续的rpc业务逻辑了。

大家可以添加我一起探讨

我是一个爱扣源码细节的dandyhuang,码字不易,点个小赞,只希望大家能更加明白。w:dandyhuang_

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容