主要参考资料
- 《etcd技术内幕》
- 极客时间《etcd实战课》
- blog https://gohalo.me/post/theme-database-etcd.html
主要目的
- 理顺一些etcd实现上的细节问题
1. 概述
etcd的核心是raft协议的实现。整体来说,etcd的raft lib只实现了一个整体框架和核心逻辑(append log、选主逻辑、snapshot、成员变更等);但该库没有实现WAL、SnapShot、存储、序列化、网络(消息传输和接收)等。
所以整体上etcd中模块、模块之间的通信比较多,甚至有一些同名的模块,导致整体上代码实现难度大,阅读难度大。
比较核心的类有:
- raft.Node:raft node,raft协议的主要逻辑,其中 raftlog是存储在内存中的,持久化依赖于wal和snapshot
- storge:日志应用后的数据存储
- wal:write ahead log用于处理持久化
- snapshot:快照
- 网络传输
node raft.Node
raftStorage *raft.MemoryStorage
wal *wal.WAL
snapshotter *snap.Snapshotter
snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready
snapCount uint64
transport *rafthttp.Transport
raft.Node
type Node interface {
Tick() -> 触发一次Tick,会触发Node心跳或者选举
Campaign(ctx context.Context) error -> 触发一次选举
Propose(ctx context.Context, data []byte) error -> 进行一次提案
ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error -> 成员变更提案
Step(ctx context.Context, msg pb.Message) error -> 处理msg
Ready() <-chan Ready
Advance()
ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
TransferLeadership(ctx context.Context, lead, transferee uint64)
ReadIndex(ctx context.Context, rctx []byte) error
Status() Status
ReportUnreachable(id uint64)
ReportSnapshot(id uint64, status SnapshotStatus)
Stop()
}
raft.Storage
type Storage interface {
InitialState() (pb.HardState, pb.ConfState, error)
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
Term(i uint64) (uint64, error)
LastIndex() (uint64, error)
FirstIndex() (uint64, error)
Snapshot() (pb.Snapshot, error)
}
2. ready / advance
对于这种 IO 网络密集型的应用,提高吞吐最好的手段就是批量操作,ETCD 与之相关的核心抽象就是 Ready 结构体。
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
// 通过raftLog.unstableEntries()读取的是raftLog.unstable.entries中的数据
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
// 包括了所有已经持久化到日志但是还没有应用到状态机的数据
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
// 包含了应该发送给对端的数据,也就是直接读取的raft.msgs[]中缓存的数据
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
- 什么时候可以读。ReadState 用来支持 Linearizable Read。
- 需要持久化的状态。HardState、Entries 需要在正式发送数据之前持久化。
- Snapshot 需要执行SnapShot的数据。
- CommittedEntries 已经提交的数据,可以应用到状态机。
- Messages 需要发送到其它机器的消息。需要在处理完持久化数据之后处理。
应用需要对 Ready 的处理包括:
- 将 HardState、Entries、Snapshot 持久化到 storage;
- 将 Messages 非阻塞的广播给其他 peers;
- 将 CommittedEntries (已经提交但是还没有应用的日志) 应用到状态机;
- 如果发现 CommittedEntries 中有成员变更类型的 entry,则调用 node 的 ApplyConfChange() 方法让 node 知道;
- 调用 Node.Advance() 告诉 raft node 这批状态更新处理完,状态已经演进了,可以给我下一批 Ready 让我处理了。
这里的一个问题是entries和messages的区别是什么?
entry是日志条目,messages是需要发送到peer的msg。当接受到消息的时候,可能会同时产生entry和msg。
在raft-example中,ready的处理:
// store raft entries to wal, then publish over commit channel
case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
if !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot(applyDoneC)
rc.node.Advance()
可以看出来,对entries的处理主要是save到wal,然后append到raftStorage;对msg的处理是通过transport发送出去。
为什么会调用rc.raftStorage.Append(rd.Entries)
,这又牵扯到raftLog的数据结构,raftLog的持久化是由上层来做的,raft.Node做不了,因为持久化、快照的代码模块与raft核心模块是分离的。
对于刚刚接收到的 Entry记录首先都会被存储在unstable中 。然后按照Raft协议将unstable中缓存的这些 Entry 记录交给上层模块进行处理,上层模块会将这些 Entry 记录发送到集群其他节点或进行保存( 写入 Storage
中)。之后,上层模块会调用 Advance()方法通知底层的 etcd-raft模块将 unstable 中对应的 Entry
记录删除(因为己经保存到了 Storage 中。正因为 unstable 中保存的 Entry 记录并未进行持久化,
可能会因节点故障而意外丢失,所以被称为unstable。
另一个需要注意的点是rc.wal.Save(rd.HardState, rd.Entries)
,每次ready的处理都需要保存一下当时的rd.HardState。
如何生成ready
什么时候应该处理ready?
- 比较容易理解的地方是有待处理的msgs、unstableEntries、committedEntries
- softState和hardState发生变化?????
func (rn *RawNode) HasReady() bool {
r := rn.raft
if !r.softState().equal(rn.prevSoftSt) {
return true
}
if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
return true
}
if r.raftLog.hasPendingSnapshot() {
return true
}
if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
return true
}
if len(r.readStates) != 0 {
return true
}
return false
}
ready处理完之后调用advance更新相关的索引位置,具体可以看代码,没有特别的地方。
3. 如何处理成员变更
参考文档
关键函数是raft.applyConfChange
启动时,会调用apply
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
}
用户主动请求变更的时候
node.ProposeConfChange
-> node.Step(Prop msg)
-> 向propc中写msg
node.Run中:
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
调用r.step,对于stepFollow会转发msg给leader,对于leader来说,alreadyPending来保证同一时刻仅有一个成员变更请求,然后将成员变更的entries写到unstable中,同时调用r.bcastAppend(),生成send到peer的MsgApp-msg ,等待上层处理。
for i := range m.Entries {
e := &m.Entries[i]
var cc pb.ConfChangeI
if e.Type == pb.EntryConfChange {
var ccc pb.ConfChange
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
} else if e.Type == pb.EntryConfChangeV2 {
var ccc pb.ConfChangeV2
if err := ccc.Unmarshal(e.Data); err != nil {
panic(err)
}
cc = ccc
}
if cc != nil {
alreadyPending := r.pendingConfIndex > r.raftLog.applied
alreadyJoint := len(r.prs.Config.Voters[1]) > 0
wantsLeaveJoint := len(cc.AsV2().Changes) == 0
var refused string
if alreadyPending {
refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
} else if alreadyJoint && !wantsLeaveJoint {
refused = "must transition out of joint config first"
} else if !alreadyJoint && wantsLeaveJoint {
refused = "not in joint state; refusing empty conf change"
}
if refused != "" {
r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
}
}
}
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
整个待处理的entries和msg包装成ready struct,交给上层处理,等到对应的entries被committed,则可以处理,主要的操作有两个:调用raftNode的ApplyConfChange,然后transport也去除对应的peer:
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
if len(ents[i].Data) == 0 {
// ignore empty messages
break
}
s := string(ents[i].Data)
data = append(data, s)
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
cc.Unmarshal(ents[i].Data)
rc.confState = *rc.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
if len(cc.Context) > 0 {
rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
}
case raftpb.ConfChangeRemoveNode:
if cc.NodeID == uint64(rc.id) {
log.Println("I've been removed from the cluster! Shutting down.")
return nil, false
}
rc.transport.RemovePeer(types.ID(cc.NodeID))
}
}
}
可以看到主要最终调用的函数是raft.applyConfChange
func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
var cs pb.ConfState
select {
case n.confc <- cc.AsV2():
case <-n.done:
}
select {
case cs = <-n.confstatec:
case <-n.done:
}
return &cs
}
------------------------
case cc := <-n.confc:
_, okBefore := r.prs.Progress[r.id]
cs := r.applyConfChange(cc)
// If the node was removed, block incoming proposals. Note that we
// only do this if the node was in the config before. Nodes may be
// a member of the group without knowing this (when they're catching
// up on the log and don't have the latest config) and we don't want
// to block the proposal channel in that case.
//
// NB: propc is reset when the leader changes, which, if we learn
// about it, sort of implies that we got readded, maybe? This isn't
// very sound and likely has bugs.
if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
var found bool
outer:
for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
for _, id := range sl {
if id == r.id {
found = true
break outer
}
}
}
if !found {
propc = nil
}
}
select {
case n.confstatec <- cs:
case <-n.done:
}
raft.applyConfChange
对于一次最多一个成员变更的情况下,主要做的操作是生成新的peer,设置其next和matched字段。
3. 上层是如何把多个模块组合协作起来的
启动
有两种启动方式,start或者Restart。
oldwal := wal.Exist(rc.waldir)
if oldwal || rc.join {
rc.node = raft.RestartNode(c)
} else {
rc.node = raft.StartNode(c, rpeers)
}
根据是否存在wal文件来判断restart还是start,start会多调用bootstrap来处理成员变更信息。
// RestartNode is similar to StartNode but does not take a list of peers.
// The current membership of the cluster will be restored from the Storage.
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
n := newNode(rn)
go n.run()
return &n
}
restart会复用wal和快照信息,那么如何使用快照信息来恢复etcd-node?
if !fileutil.Exist(rc.snapdir) {
if err := os.Mkdir(rc.snapdir, 0750); err != nil {
log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
}
}
rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL()
// signal replay has finished
rc.snapshotterReady <- rc.snapshotter
snapshotter对象生成之后,会发送channel,通知snapshot模块根据snap恢复storage;这里需要考虑的问题是存在多个snap的情况下,选择最新的未损坏snap来使用。这里主要是根据snap的名字来判断fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)
snapshot, err := s.loadSnapshot()
if err != nil {
log.Panic(err)
}
if snapshot != nil {
log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index)
if err := s.recoverFromSnapshot(snapshot.Data); err != nil {
log.Panic(err)
}
}
然后wal模块replayWAL:
WAL 记录类型目前支持 5 种,分别是文件元数据记录、日志条目记录、状态信息记录、CRC 记录、快照记录:
- 文件元数据记录包含节点 ID、集群 ID 信息,它在 WAL 文件创建的时候写入;
- 日志条目记录包含 Raft 日志信息,如 put 提案内容;
- 状态信息记录,包含集群的任期号、节点投票信息等,一个日志文件中会有多条,以最后的记录为准;
- CRC 记录包含上一个 WAL 文件的最后的 CRC(循环冗余校验码)信息, 在创建、切割 WAL 文件时,作为第一条记录写入到新的 WAL 文件, 用于校验数据文件的完整性、准确性等;
- 快照记录包含快照的任期号、日志索引信息,用于检查快照文件的准确性。
func (rc *raftNode) replayWAL() *wal.WAL {
log.Printf("replaying WAL of member %d", rc.id)
snapshot := rc.loadSnapshot()
w := rc.openWAL(snapshot)
_, st, ents, err := w.ReadAll()
if err != nil {
log.Fatalf("raftexample: failed to read WAL (%v)", err)
}
rc.raftStorage = raft.NewMemoryStorage()
if snapshot != nil {
rc.raftStorage.ApplySnapshot(*snapshot)
}
rc.raftStorage.SetHardState(st)
// append to storage so raft starts at the right place in log
rc.raftStorage.Append(ents)
return w
}
首先是获取根据wal中的快照记录和state记录,找出所有小于state.commit的快照,然后 loads the newest snapshot available that is in walSnaps。
// filter out any snaps that are newer than the committed hardstate
n := 0
for _, s := range snaps {
if s.Index <= state.Commit {
snaps[n] = s
n++
}
}
snaps = snaps[:n:n]
return snaps, nil
然后根据快照数据,找出比snap.Index的所有wal entries,然后将所有的ents append到raftLog中。