ETCD《二》--Peer监听

Peer监听

Peer 监听通常监听在 2380 端口上,用于提供 Peer 之间的 https 服务

注册的一些服务端点:

  • /raft:raftHandler处理

  • /members:peerMembersHandler处理

  • /members/promote/:peerMemberPromoteHandler处理

  • /leases、/leases/internal:leaseHandler处理

  • /downgrade/enabled:downgradeEnabledHandler处理

  • /members/hashkv:hashKVHandler处理

  • /version:versionHandler处理

Raft Handler

Raft Handler有可以进一步划分为

  • /raft:pipelineHandler处理

  • /raft/stream/:streamHandler处理

  • /raft/snapshot:snapHandler处理

  • /raft/probing:httpHealth处理

raft handler在初始化的时候会创建两类 Transport,一种是pipeline transport, 另一种是 stream transport

func (t *Transport) Start() error {
    var err error
    t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)

    t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)

    return nil
}
  • stream transport:长连接不断开,Peer 之间相互推送的方式

  • pipeline transport:短链接,请求一次,响应一次;发送 snapshot 或者 stream transport 不可用时才会使用这个;snapshot 也使用这个主要考虑是 snapshot 较大 ,避免阻塞其他消息

func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
    var ok bool
    
    if isMsgSnap(m) {
        return p.pipeline.msgc, pipelineMsg
    }  else if writec, ok = p.writer.writec(); ok {
        return writec, streamMsg
    }
    return p.pipeline.msgc, pipelineMsg
}

Stream Handler

每个 Member 启动的时候都会相互添加 Peer ,每个 Peer 都会维护一个写通道、一个读通道

读协程

读协程的初始化

p.msgAppReader = &streamReader{
        lg:     t.Logger,
        peerID: peerID,
        typ:    "message",
        tr:     t,
        picker: picker,
        status: status,
        recvc:  p.recvc,
        propc:  p.propc,
        rl:     rate.NewLimiter(t.DialRetryFrequency, 1),
    }

读通道启动的时候就会尝试 dial 对应的 Peer 地址

rc, err := cr.dial(t)

dial 实际上就是向 Peer 发送请求 GET /raft/stream/message/local_member_id,使用的就是 stream transport

uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())

resp, err := cr.tr.streamRt.RoundTrip(req)

Peer 端已经注册了 /raft/stream/ 开头的请求由 streamHandler 处理,这个处理器会先返回200状态码给发送方

    w.WriteHeader(http.StatusOK)
    w.(http.Flusher).Flush()

然后将当前连接发送给写通道,当前的 Writer 就是 ResponseWriter , Flusher 用于将写入到 Response 缓冲区的消息强制推送给对方

c := newCloseNotifier()
    conn := &outgoingConn{
        t:       t,
        Writer:  w,
        Flusher: w.(http.Flusher),
        Closer:  c,
        localID: h.tr.ID,
        peerID:  from,
    }
    select {
    case streamWWriter.connc <- conn:
        return true
    case <-cw.done:
        return false
    }

最后将连接保持不断开,连接两端的消息依靠 Flusher 来推送

<-c.closeNotify()

而发送方接收到对方推送过来的 200 状态码后,会不断尝试从 Response 中读取数据,读到数据后就进行反序列化,再将反序列化后的消息发送到 recv_channel 或者 prop_channel

switch resp.StatusCode {
    case http.StatusOK:
        return resp.Body, nil

for {
        m, err := dec.decode()
        recvc := cr.recvc
        if m.Type == raftpb.MsgProp {
            recvc = cr.propc
        }

        select {
        case recvc <- m:

而这两个 channel 都有专门的协程监听,而这里的 Process 就是通过 step 方法将消息流入到 raft 流程里

go func() {
        for {
            select {
            case mm := <-p.recvc:
                if err := r.Process(ctx, mm); err != nil {
                    
                }
            case <-p.stopc:
                return
            }
        }
    }()

func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
    
    return s.r.Step(ctx, m)
}

写协程

写协程的初始化

w := &streamWriter{
        lg: lg,

        localID: local,
        peerID:  id,

        status: status,
        fs:     fs,
        r:      r,
        msgc:   make(chan raftpb.Message, 4096),
        connc:  make(chan *outgoingConn),
        stopc:  make(chan struct{}),
        done:   make(chan struct{}),
    }

写通道在启动的时候会监听在conn_channel 上,而读通道在在启动的时候会将连接发送到这个通道,这里写通道就能正常收到这个连接,并完成一些初始化功能,比如将 working 设置为true,意味着 stream transport 是正常工作的,那么在 Peer 之间相互发送消息的时候就会优先选择 stream transport,而不会退化到 pipeline transport;同时再初始化 msg_channel,意味着可以开始接收消息了

case conn := <-cw.connc:
            cw.working = true
            heartbeatc, msgc = tickc.C, cw.msgc

在 Peer 之间相互发送消息的时候,首先会挑选一个写入通道,正如前面所说,发送 snapshot 或者 stream transport 不可用时才会使用 pipeline transport,其它消息或者 stream transport 正常初始化时都会使用 stream 的写入通道

func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
    var ok bool
    
    if isMsgSnap(m) {
        return p.pipeline.msgc, pipelineMsg
    }  else if writec, ok = p.writer.writec(); ok {
        return writec, streamMsg
    }
    return p.pipeline.msgc, pipelineMsg
}

发送时直接向消息写入到 pick 到的写通道里

writec, name := p.pick(m)
    select {
    case writec <- m:

而写协程同时也监听着 msg_channel ,对于需要发送给 Peer 的消息会先编码,再选择合适的时机推送给 Peer ,如没有更多消息 或者 消息累计到了 缓冲区的一半,就会通过 Flusher 将 Response 缓冲区的消息强制推送给对方

case m := <-msgc:
            err := enc.encode(&m)
            if err == nil {
                unflushed += m.Size()

                if len(msgc) == 0 || batched > 4096 / 2 {
                    flusher.Flush()
                    sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
                    unflushed = 0
                    batched = 0
                } else {
                    batched++
                }

                continue
            }

Pipeline Handler

相比较之下,pipeline handler 的工作原理就简单很多了;按照 pick 的规则,发送 snapshot 或者 stream transport 不可用时才会使用 pipeline transport

此时需要发送给 Peer 的消息,就会 pick 到这个 pipeline 的写通道,而 pipeline 在启动时会监听在这个写通道上,收到消息后就构建 HTTP 请求发送给 Peer

func (p *pipeline) handle() {
    defer p.wg.Done()

    for {
        select {
        case m := <-p.msgc:
            start := time.Now()
            err := p.post(pbutil.MustMarshal(&m))

这里构建的是 HTTP POST请求,请求的就是 POST /raft路径,使用的就是 pipeline transport

req, err := http.NewRequest(http.MethodPost, "/raft", body)

req.Header.Set("Content-Type", "application/protobuf")

resp, err := p.tr.pipelineRt.RoundTrip(req)

正如前面所说,POST /raft路径的请求会交给 pipelineHandler处理,pipelineHandler 的处理也很简单,直接通过 Process 调用 step 方法将消息发给 raft 流程处理

func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if err := h.r.Process(context.TODO(), m); err != nil {

小结

  • 每个 Member 都会 维护到其它 Peers 的两个连接,一种是pipeline transport, 另一种是 stream transport;发送 snapshot 或者 stream transport 不可用时才会使用 pipeline transport;否则 raft 相关的消息都会通过 stream transport 发送

  • 每个 Member 到每个 Peer 都会维护两个协程:

    • 一个读协程,会不断从 stream transport 中读取数据,然后发送到 recv_channel 或者 prop_channel,而这两个通道都会通过 step 方法将消息发送给 raft 流程处理

    • 一个写协程,需要发送给 Peer 的消息,首先会 pick 一个写通道,pick 的原则就是 发送 snapshot 或者 stream transport 不可用时才会使用 pipeline transport,否则都会 pick 到 stream transport;然后将消息写入到这个写通道,而这个写通道对应的就是 Peer 连接中的 Response,负责将消息推送给 Peer

    • Peer 的读协程就负责从 Response 中读取数据,再发送给自己的 raft 流程

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容