etcd-raft源码分析2-server间通信机制

在etcd的raft实现中,server之前的消息传递并不是简单的request-response模型,而是读写分离模型,即每两个server之间会建立两条链路,对于每一个server来说,一条链路专门用来发送数据,另一条链路专门用来接收数据。在代码实现中,通过streamWriter发送数据,通过streamReader接收数据。即通过streamReader接收数据接收到数据后会直接响应,在处理完数据后通过streamWriter将响应发送到对端。

对于每个server来说,不管是leader、candicate还是follower,都会维持一个peers数组,每个peer对应集群中的一个server,负责处理server之间的一些数据交互。

server间数据交互的框图如下:

server间数据交互.png

当server需要向其他server发送数据时,只需要找到其他server对应的peer,然后向peer的streamWriter的msgc通道发送数据即可,streamWriter会监听msgc通道的数据并发送到对端server;而streamReader会在一个goroutine中循环读取对端发送来的数据,一旦接收到数据,就发送到peer的p.propc或p.recvc通道,而peer会监听这两个通道的事件,写入到node的n.propc或n.recvc通道,node只需要监听这两个通道的数据并处理即可。这就是在etcd的raft实现中server间数据交互的流程。

对于每个server,都会创建一个raftNode,并且启动一个goroutine,执行raftNode的serveRaft方法,这个方法的代码如下:

func (rc *raftNode) serveRaft() {
url, err := url.Parse(rc.peers[rc.id-1])
if err != nil {
    log.Fatalf("raftexample: Failed parsing URL (%v)", err)
}

ln, err := newStoppableListener(url.Host, rc.httpstopc)
if err != nil {
    log.Fatalf("raftexample: Failed to listen rafthttp (%v)", err)
}

err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
select {
case <-rc.httpstopc:
default:
    log.Fatalf("raftexample: Failed to serve rafthttp (%v)", err)
}
close(rc.httpdonec)
}

这个方法主要是建立一个httpserver,监听其他server的连接,处理函数为rc.transport.Handler(),下面看下该处代码:

func (t *Transport) Handler() http.Handler {
pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
mux := http.NewServeMux()
mux.Handle(RaftPrefix, pipelineHandler)
mux.Handle(RaftStreamPrefix+"/", streamHandler)
mux.Handle(RaftSnapshotPrefix, snapHandler)
mux.Handle(ProbingPrefix, probing.NewHandler())
return mux
}

下面重点看下streamHandler,这个handler用于处理server之间的心跳、投票、附加日志等请求的发送,该handler的ServeHTTP代码为:

func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
    w.Header().Set("Allow", "GET")
    http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
    return
}

w.Header().Set("X-Server-Version", version.Version)
w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())

if err := checkClusterCompatibilityFromHeader(r.Header, h.cid); err != nil {
    http.Error(w, err.Error(), http.StatusPreconditionFailed)
    return
}

var t streamType
switch path.Dir(r.URL.Path) {
case streamTypeMsgAppV2.endpoint():
    t = streamTypeMsgAppV2
case streamTypeMessage.endpoint():
    t = streamTypeMessage
default:
    plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
    http.Error(w, "invalid path", http.StatusNotFound)
    return
}

fromStr := path.Base(r.URL.Path)
from, err := types.IDFromString(fromStr)
if err != nil {
    plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
    http.Error(w, "invalid from", http.StatusNotFound)
    return
}
if h.r.IsIDRemoved(uint64(from)) {
    plog.Warningf("rejected the stream from peer %s since it was removed", from)
    http.Error(w, "removed member", http.StatusGone)
    return
}
p := h.peerGetter.Get(from)
if p == nil {
    // This may happen in following cases:
    // 1. user starts a remote peer that belongs to a different cluster
    // with the same cluster ID.
    // 2. local etcd falls behind of the cluster, and cannot recognize
    // the members that joined after its current progress.
    if urls := r.Header.Get("X-PeerURLs"); urls != "" {
        h.tr.AddRemote(from, strings.Split(urls, ","))
    }
    plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
    http.Error(w, "error sender not found", http.StatusNotFound)
    return
}

wto := h.id.String()
if gto := r.Header.Get("X-Raft-To"); gto != wto {
    plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
    http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
    return
}

w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()

c := newCloseNotifier()
conn := &outgoingConn{
    t:       t,
    Writer:  w,
    Flusher: w.(http.Flusher),
    Closer:  c,
}
//一旦接收到对端的连接,则把该连接attach到自己encoder的writer中,这样自己encoder和对端decoder就能协同工作了,
// 对于每个节点,会主动去连接其他节点,连接成功后便通过自己的decoder循环读取该连接的数据,该节点通过该decoder读取其他节点发来的数据;
// 当某节点收到其他节点连接请求并连接成功后便把该连接attach到该节点的encoder,该节点通过该encoder向其他节点发送数据;
p.attachOutgoingConn(conn)
<-c.closeNotify()
}

当监听到其他server的连接建立请求并建立连接成功后,其核心处理逻辑是这一行代码:

p.attachOutgoingConn(conn)

下面看下其函数实现:

func (p *peer) attachOutgoingConn(conn *outgoingConn) {
var ok bool
switch conn.t {
case streamTypeMsgAppV2:
    ok = p.msgAppV2Writer.attach(conn)
case streamTypeMessage:
    ok = p.writer.attach(conn)
default:
    plog.Panicf("unhandled stream type %s", conn.t)
}
if !ok {
    conn.Close()
}
}

其中调用了streamWriter的attach方法,如下:

func (cw *streamWriter) attach(conn *outgoingConn) bool {
select {
case cw.connc <- conn:
    return true
case <-cw.done:
    return false
}
}

最终将该连接写入到cw.connc通道,下面看下streamWriter监听该通道的goroutine:

case conn := <-cw.connc:
        cw.mu.Lock()
        closed := cw.closeUnlocked()
        t = conn.t
        switch conn.t {
        case streamTypeMsgAppV2:
            enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
        case streamTypeMessage:
            enc = &messageEncoder{w: conn.Writer}
        default:
            plog.Panicf("unhandled stream type %s", conn.t)
        }
        flusher = conn.Flusher
        unflushed = 0
        cw.status.activate()
        cw.closer = conn.Closer
        cw.working = true
        cw.mu.Unlock()

        if closed {
            plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
        }
        plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
        heartbeatc, msgc = tickc.C, cw.msgc

当监听到cw.connc通道有数据时,获取该数据,即与其他某个server的连接,然后获取conn.Writer封装成一个encoder,用来将要发送的数据发送出去。

上面说了server的连接监听,下面看下server与其他server的连接建立。
在startRaft这个goroutine中,有如下代码段:

rc.transport = &rafthttp.Transport{
    ID:          types.ID(rc.id),
    ClusterID:   0x1000,
    Raft:        rc,
    ServerStats: ss,
    LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
    ErrorC:      make(chan error),
}

rc.transport.Start()
for i := range rc.peers {
    if i+1 != rc.id {
        rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
    }
}

在rc.transport.AddPeer方法中调用了startPeer方法,里面创建了streamReader,并开启了一个goroutine:

func (cr *streamReader) run() {
t := cr.typ
plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
for {
    //与对端建立连接
    rc, err := cr.dial(t)
    if err != nil {
        if err != errUnsupportedStreamType {
            cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
        }
    } else {
        cr.status.activate()
        plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
        //循环读取对端发过来的数据并处理
        err := cr.decodeLoop(rc, t)
        plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
        switch {
        // all data is read out
        case err == io.EOF:
        // connection is closed by the remote
        case transport.IsClosedConnError(err):
        default:
            cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
        }
    }
    select {
    // Wait 100ms to create a new stream, so it doesn't bring too much
    // overhead when retry.
    case <-time.After(100 * time.Millisecond):
    case <-cr.stopc:
        plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
        close(cr.done)
        return
    }
}
}

通过rc, err := cr.dial(t)与对端建立连接,在err := cr.decodeLoop(rc, t)中循环读取该连接的数据:

func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
var dec decoder
cr.mu.Lock()
switch t {
case streamTypeMsgAppV2:
    dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
case streamTypeMessage:
    dec = &messageDecoder{r: rc}
default:
    plog.Panicf("unhandled stream type %s", t)
}
select {
case <-cr.stopc:
    cr.mu.Unlock()
    if err := rc.Close(); err != nil {
        return err
    }
    return io.EOF
default:
    cr.closer = rc
}
cr.mu.Unlock()
for {
    m, err := dec.decode()
    if err != nil {
        cr.mu.Lock()
        cr.close()
        cr.mu.Unlock()
        return err
    }
    receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))

    cr.mu.Lock()
    paused := cr.paused
    cr.mu.Unlock()

    if paused {
        continue
    }

    if isLinkHeartbeatMessage(&m) {
        // raft is not interested in link layer
        // heartbeat message, so we should ignore
        // it.
        continue
    }

    recvc := cr.recvc
    if m.Type == raftpb.MsgProp {
        recvc = cr.propc
    }
    select {
    case recvc <- m:
    default:
        if cr.status.isActive() {
            plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
        }
        plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
        recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
    }
}
}

这里创建了decoder,并在一个for循环中循环执行m, err := dec.decode(),读取对端发送过来的数据,写入cr.recvc或cr.propc通道。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容