etcd学习笔记(一): raft-example

主要参考资料

  1. 《etcd技术内幕》
  2. 极客时间《etcd实战课》
  3. blog https://gohalo.me/post/theme-database-etcd.html

主要目的

  1. 理顺一些etcd实现上的细节问题

1. 概述

etcd的核心是raft协议的实现。整体来说,etcd的raft lib只实现了一个整体框架和核心逻辑(append log、选主逻辑、snapshot、成员变更等);但该库没有实现WAL、SnapShot、存储、序列化、网络(消息传输和接收)等。

所以整体上etcd中模块、模块之间的通信比较多,甚至有一些同名的模块,导致整体上代码实现难度大,阅读难度大。

比较核心的类有:

  • raft.Node:raft node,raft协议的主要逻辑,其中 raftlog是存储在内存中的,持久化依赖于wal和snapshot
  • storge:日志应用后的数据存储
  • wal:write ahead log用于处理持久化
  • snapshot:快照
  • 网络传输
    node        raft.Node
    raftStorage *raft.MemoryStorage
    wal         *wal.WAL

    snapshotter      *snap.Snapshotter
    snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
    snapCount uint64
    transport *rafthttp.Transport

raft.Node

type Node interface {
    Tick() -> 触发一次Tick,会触发Node心跳或者选举
    Campaign(ctx context.Context) error -> 触发一次选举
    Propose(ctx context.Context, data []byte) error -> 进行一次提案
    ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error -> 成员变更提案
    Step(ctx context.Context, msg pb.Message) error -> 处理msg

    Ready() <-chan Ready
    Advance()

    ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
    TransferLeadership(ctx context.Context, lead, transferee uint64)
    ReadIndex(ctx context.Context, rctx []byte) error

    Status() Status
    ReportUnreachable(id uint64)
    ReportSnapshot(id uint64, status SnapshotStatus)
    Stop()
}

raft.Storage

type Storage interface {
    InitialState() (pb.HardState, pb.ConfState, error)
    Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
    Term(i uint64) (uint64, error)

    LastIndex() (uint64, error)
    FirstIndex() (uint64, error)
    Snapshot() (pb.Snapshot, error)
}

2. ready / advance

对于这种 IO 网络密集型的应用,提高吞吐最好的手段就是批量操作,ETCD 与之相关的核心抽象就是 Ready 结构体。

type Ready struct {
    // The current volatile state of a Node.
    // SoftState will be nil if there is no update.
    // It is not required to consume or store SoftState.
    *SoftState 

    // The current state of a Node to be saved to stable storage BEFORE
    // Messages are sent.
    // HardState will be equal to empty state if there is no update.
    pb.HardState

    // ReadStates can be used for node to serve linearizable read requests locally
    // when its applied index is greater than the index in ReadState.
    // Note that the readState will be returned when raft receives msgReadIndex.
    // The returned is only valid for the request that requested to read.
    ReadStates []ReadState

    // Entries specifies entries to be saved to stable storage BEFORE
    // Messages are sent.
  // 通过raftLog.unstableEntries()读取的是raftLog.unstable.entries中的数据
    Entries []pb.Entry

    // Snapshot specifies the snapshot to be saved to stable storage.
    Snapshot pb.Snapshot

    // CommittedEntries specifies entries to be committed to a
    // store/state-machine. These have previously been committed to stable
    // store.
// 包括了所有已经持久化到日志但是还没有应用到状态机的数据
    CommittedEntries []pb.Entry

    // Messages specifies outbound messages to be sent AFTER Entries are
    // committed to stable storage.
    // If it contains a MsgSnap message, the application MUST report back to raft
    // when the snapshot has been received or has failed by calling ReportSnapshot.
 // 包含了应该发送给对端的数据,也就是直接读取的raft.msgs[]中缓存的数据
    Messages []pb.Message

    // MustSync indicates whether the HardState and Entries must be synchronously
    // written to disk or if an asynchronous write is permissible.
    MustSync bool
}
  • 什么时候可以读。ReadState 用来支持 Linearizable Read。
  • 需要持久化的状态。HardState、Entries 需要在正式发送数据之前持久化。
  • Snapshot 需要执行SnapShot的数据。
  • CommittedEntries 已经提交的数据,可以应用到状态机。
  • Messages 需要发送到其它机器的消息。需要在处理完持久化数据之后处理。

应用需要对 Ready 的处理包括:

  1. 将 HardState、Entries、Snapshot 持久化到 storage;
  2. 将 Messages 非阻塞的广播给其他 peers;
  3. 将 CommittedEntries (已经提交但是还没有应用的日志) 应用到状态机;
  4. 如果发现 CommittedEntries 中有成员变更类型的 entry,则调用 node 的 ApplyConfChange() 方法让 node 知道;
  5. 调用 Node.Advance() 告诉 raft node 这批状态更新处理完,状态已经演进了,可以给我下一批 Ready 让我处理了。

这里的一个问题是entries和messages的区别是什么?

entry是日志条目,messages是需要发送到peer的msg。当接受到消息的时候,可能会同时产生entry和msg。

在raft-example中,ready的处理:

        // store raft entries to wal, then publish over commit channel
        case rd := <-rc.node.Ready():
            rc.wal.Save(rd.HardState, rd.Entries)
            if !raft.IsEmptySnap(rd.Snapshot) {
                rc.saveSnap(rd.Snapshot)
                rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }
            rc.raftStorage.Append(rd.Entries)
            rc.transport.Send(rd.Messages)
            applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
            if !ok {
                rc.stop()
                return
            }
            rc.maybeTriggerSnapshot(applyDoneC)
            rc.node.Advance()

可以看出来,对entries的处理主要是save到wal,然后append到raftStorage;对msg的处理是通过transport发送出去。

为什么会调用rc.raftStorage.Append(rd.Entries),这又牵扯到raftLog的数据结构,raftLog的持久化是由上层来做的,raft.Node做不了,因为持久化、快照的代码模块与raft核心模块是分离的。

image.png

对于刚刚接收到的 Entry记录首先都会被存储在unstable中 。然后按照Raft协议将unstable中缓存的这些 Entry 记录交给上层模块进行处理,上层模块会将这些 Entry 记录发送到集群其他节点或进行保存( 写入 Storage
中)
。之后,上层模块会调用 Advance()方法通知底层的 etcd-raft模块将 unstable 中对应的 Entry
记录删除(因为己经保存到了 Storage 中。正因为 unstable 中保存的 Entry 记录并未进行持久化,
可能会因节点故障而意外丢失,所以被称为unstable。

另一个需要注意的点是rc.wal.Save(rd.HardState, rd.Entries),每次ready的处理都需要保存一下当时的rd.HardState。

如何生成ready

什么时候应该处理ready?

  1. 比较容易理解的地方是有待处理的msgs、unstableEntries、committedEntries
  2. softState和hardState发生变化?????
func (rn *RawNode) HasReady() bool {
    r := rn.raft
    if !r.softState().equal(rn.prevSoftSt) {
        return true
    }
    if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
        return true
    }
    if r.raftLog.hasPendingSnapshot() {
        return true
    }
    if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
        return true
    }
    if len(r.readStates) != 0 {
        return true
    }
    return false
}

ready处理完之后调用advance更新相关的索引位置,具体可以看代码,没有特别的地方。

3. 如何处理成员变更

参考文档

  1. https://zhuanlan.zhihu.com/p/27908888
  2. https://segmentfault.com/a/1190000022796386

关键函数是raft.applyConfChange

启动时,会调用apply

    for _, peer := range peers {
        rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
    }

用户主动请求变更的时候

node.ProposeConfChange
-> node.Step(Prop msg)
  -> 向propc中写msg

node.Run中:

        case pm := <-propc:
            m := pm.m
            m.From = r.id
            err := r.Step(m)
            if pm.result != nil {
                pm.result <- err
                close(pm.result)
            }

调用r.step,对于stepFollow会转发msg给leader,对于leader来说,alreadyPending来保证同一时刻仅有一个成员变更请求,然后将成员变更的entries写到unstable中,同时调用r.bcastAppend(),生成send到peer的MsgApp-msg ,等待上层处理。

        for i := range m.Entries {
            e := &m.Entries[i]
            var cc pb.ConfChangeI
            if e.Type == pb.EntryConfChange {
                var ccc pb.ConfChange
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            } else if e.Type == pb.EntryConfChangeV2 {
                var ccc pb.ConfChangeV2
                if err := ccc.Unmarshal(e.Data); err != nil {
                    panic(err)
                }
                cc = ccc
            }
            if cc != nil {
                alreadyPending := r.pendingConfIndex > r.raftLog.applied
                alreadyJoint := len(r.prs.Config.Voters[1]) > 0
                wantsLeaveJoint := len(cc.AsV2().Changes) == 0

                var refused string
                if alreadyPending {
                    refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
                } else if alreadyJoint && !wantsLeaveJoint {
                    refused = "must transition out of joint config first"
                } else if !alreadyJoint && wantsLeaveJoint {
                    refused = "not in joint state; refusing empty conf change"
                }

                if refused != "" {
                    r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
                    m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
                } else {
                    r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
                }
            }
        }

        if !r.appendEntry(m.Entries...) {
            return ErrProposalDropped
        }
        r.bcastAppend()

整个待处理的entries和msg包装成ready struct,交给上层处理,等到对应的entries被committed,则可以处理,主要的操作有两个:调用raftNode的ApplyConfChange,然后transport也去除对应的peer:

    for i := range ents {
        switch ents[i].Type {
        case raftpb.EntryNormal:
            if len(ents[i].Data) == 0 {
                // ignore empty messages
                break
            }
            s := string(ents[i].Data)
            data = append(data, s)
        case raftpb.EntryConfChange:
            var cc raftpb.ConfChange
            cc.Unmarshal(ents[i].Data)
            rc.confState = *rc.node.ApplyConfChange(cc)
            switch cc.Type {
            case raftpb.ConfChangeAddNode:
                if len(cc.Context) > 0 {
                    rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
                }
            case raftpb.ConfChangeRemoveNode:
                if cc.NodeID == uint64(rc.id) {
                    log.Println("I've been removed from the cluster! Shutting down.")
                    return nil, false
                }
                rc.transport.RemovePeer(types.ID(cc.NodeID))
            }
        }
    }

可以看到主要最终调用的函数是raft.applyConfChange

func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
    var cs pb.ConfState
    select {
    case n.confc <- cc.AsV2():
    case <-n.done:
    }
    select {
    case cs = <-n.confstatec:
    case <-n.done:
    }
    return &cs
}
------------------------


    case cc := <-n.confc:
            _, okBefore := r.prs.Progress[r.id]
            cs := r.applyConfChange(cc)
            // If the node was removed, block incoming proposals. Note that we
            // only do this if the node was in the config before. Nodes may be
            // a member of the group without knowing this (when they're catching
            // up on the log and don't have the latest config) and we don't want
            // to block the proposal channel in that case.
            //
            // NB: propc is reset when the leader changes, which, if we learn
            // about it, sort of implies that we got readded, maybe? This isn't
            // very sound and likely has bugs.
            if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
                var found bool
            outer:
                for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
                    for _, id := range sl {
                        if id == r.id {
                            found = true
                            break outer
                        }
                    }
                }
                if !found {
                    propc = nil
                }
            }
            select {
            case n.confstatec <- cs:
            case <-n.done:
            }

raft.applyConfChange

对于一次最多一个成员变更的情况下,主要做的操作是生成新的peer,设置其next和matched字段。

3. 上层是如何把多个模块组合协作起来的

启动

有两种启动方式,start或者Restart。

oldwal := wal.Exist(rc.waldir)
    if oldwal || rc.join {
        rc.node = raft.RestartNode(c)
    } else {
        rc.node = raft.StartNode(c, rpeers)
    }

根据是否存在wal文件来判断restart还是start,start会多调用bootstrap来处理成员变更信息。

// RestartNode is similar to StartNode but does not take a list of peers.
// The current membership of the cluster will be restored from the Storage.
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
    rn, err := NewRawNode(c)
    if err != nil {
        panic(err)
    }
    n := newNode(rn)
    go n.run()
    return &n
}

restart会复用wal和快照信息,那么如何使用快照信息来恢复etcd-node?

    if !fileutil.Exist(rc.snapdir) {
        if err := os.Mkdir(rc.snapdir, 0750); err != nil {
            log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
        }
    }
    rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)

    oldwal := wal.Exist(rc.waldir)
    rc.wal = rc.replayWAL()

    // signal replay has finished
    rc.snapshotterReady <- rc.snapshotter

snapshotter对象生成之后,会发送channel,通知snapshot模块根据snap恢复storage;这里需要考虑的问题是存在多个snap的情况下,选择最新的未损坏snap来使用。这里主要是根据snap的名字来判断fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)

    snapshot, err := s.loadSnapshot()
    if err != nil {
        log.Panic(err)
    }
    if snapshot != nil {
        log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
        if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
            log.Panic(err)
        }
    }

然后wal模块replayWAL:

WAL 记录类型目前支持 5 种,分别是文件元数据记录、日志条目记录、状态信息记录、CRC 记录、快照记录:

  • 文件元数据记录包含节点 ID、集群 ID 信息,它在 WAL 文件创建的时候写入;
  • 日志条目记录包含 Raft 日志信息,如 put 提案内容;
  • 状态信息记录,包含集群的任期号、节点投票信息等,一个日志文件中会有多条,以最后的记录为准;
  • CRC 记录包含上一个 WAL 文件的最后的 CRC(循环冗余校验码)信息, 在创建、切割 WAL 文件时,作为第一条记录写入到新的 WAL 文件, 用于校验数据文件的完整性、准确性等;
  • 快照记录包含快照的任期号、日志索引信息,用于检查快照文件的准确性。
func (rc *raftNode) replayWAL() *wal.WAL {
    log.Printf("replaying WAL of member %d", rc.id)
    snapshot := rc.loadSnapshot()
    w := rc.openWAL(snapshot)
    _, st, ents, err := w.ReadAll()
    if err != nil {
        log.Fatalf("raftexample: failed to read WAL (%v)", err)
    }
    rc.raftStorage = raft.NewMemoryStorage()
    if snapshot != nil {
        rc.raftStorage.ApplySnapshot(*snapshot)
    }
    rc.raftStorage.SetHardState(st)

    // append to storage so raft starts at the right place in log
    rc.raftStorage.Append(ents)

    return w
}

首先是获取根据wal中的快照记录和state记录,找出所有小于state.commit的快照,然后 loads the newest snapshot available that is in walSnaps。


    // filter out any snaps that are newer than the committed hardstate
    n := 0
    for _, s := range snaps {
        if s.Index <= state.Commit {
            snaps[n] = s
            n++
        }
    }
    snaps = snaps[:n:n]
    return snaps, nil

然后根据快照数据,找出比snap.Index的所有wal entries,然后将所有的ents append到raftLog中。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容