etcd学习笔记2(草稿)

etcd初始化流程

etcd启动时首先会调用startEtcdOrProxyV2, 这个方法里首先会进行config的初始化以及解析传入的配置项,然后检查config中的Dir是否为空,如果为空则根据config中指定的Name来生成data dir,默认如下所示,后面
再次启动时会检查data dir的类型,目前有三种:member, proxy, empty,分别代表成员,代理,空。然后进入不同的分支调用startEtcd,或者startProxy

                                                                             +---->startEtcd ---> configurePeerListeners ---> configureClientListeners ----> etcdserver.NewServer
                                                                             |
startEtcdOrProxyV2 ---> newConifg ---> cfg.parse ---> identify data dir ---> |
                                                                             |
                                                                             +---->startProxy

etcd data dir如下:

(ENV) [root@ceph-2 etcd]# ls
10.255.101.74.etcd  10.255.101.74.proxy.etcd  etcd.conf  etcd-proxy.conf
(ENV) [root@ceph-2 etcd]# tree -h
.
├── [  20]  10.255.101.74.etcd
│   └── [  29]  member
│       ├── [ 246]  snap
│       │   ├── [366K]  0000000000000002-0000000000d5a021.snap
│       │   ├── [366K]  0000000000000002-0000000000d726c2.snap
│       │   ├── [366K]  0000000000000002-0000000000d8ad63.snap
│       │   ├── [366K]  0000000000000002-0000000000da3404.snap
│       │   ├── [362K]  0000000000000002-0000000000dbbaa5.snap
│       │   └── [ 20K]  db
│       └── [ 244]  wal
│           ├── [ 61M]  000000000000001e-0000000000c0cf3d.wal
│           ├── [ 61M]  000000000000001f-0000000000c775f1.wal
│           ├── [ 61M]  0000000000000020-0000000000ce234b.wal
│           ├── [ 61M]  0000000000000021-0000000000d4ce84.wal
│           ├── [ 61M]  0000000000000022-0000000000db76f5.wal
│           └── [ 61M]  0.tmp
├── [  19]  10.255.101.74.proxy.etcd
│   └── [  21]  proxy
│       └── [  70]  cluster
├── [3.6K]  etcd.conf
└── [ 558]  etcd-proxy.conf

6 directories, 15 files

启动etcd server时会创建store,如果data dir, wal dir和snap dir不存在则创建, snap/db为backend path。如果db存在的话,则用db构建Backend。构建完成后会启动goroutine执行backend.run()

// file: mvcc/backend/backend.go
type Backend interface {
    // ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
    ReadTx() ReadTx
    BatchTx() BatchTx
    // ConcurrentReadTx returns a non-blocking read transaction.
    ConcurrentReadTx() ReadTx

    Snapshot() Snapshot
    Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
    // Size returns the current size of the backend physically allocated.
    // The backend can hold DB space that is not utilized at the moment,
    // since it can conduct pre-allocation or spare unused space for recycling.
    // Use SizeInUse() instead for the actual DB size.
    Size() int64
    // SizeInUse returns the current size of the backend logically in use.
    // Since the backend can manage free space in a non-byte unit such as
    // number of pages, the returned value can be not exactly accurate in bytes.
    SizeInUse() int64
    // OpenReadTxN returns the number of currently open read transactions in the backend.
    OpenReadTxN() int64
    Defrag() error
    ForceCommit()
    Close() error
}

接着,新创建Transport

// etcdserver/server.go
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())

WAL

如果WAL目录存在,则会打开所有的wal并检验snapshot entries,其通过decoder来对wal进行解码,decoder结构如下

// wal/decoder.go
type decoder struct {
     mu  sync.Mutex
     brs []*bufio.Reader

     // lastValidOff file offset following the last valid decoded record
     lastValidOff int64
     crc          hash.Hash32
 }

其中brs对应所有的wal文件Reader,分别遍历每个wal文件:

  1. little endian的形式读取wal开头8个字节,例如下面wal文件中开头8个字节为04 00 00 00 00 00 00 84,注意是小端优先序,低56bits代表record字节,值为4; 高8bits的低3位部分代表pad,84的二进制表述为10000100, 低三位的值为4。WAL entry size最大为10MB。每个WAL segment file的默认大小为64MB。
    0000000 04 00 00 00 00 00 00 84 08 04 10 00 00 00 00 00
    0000010 20 00 00 00 00 00 00 00 08 01 10 bf ae e5 db 08
    0000020 1a 16 08 e9 e4 bc b2 8f ba fc 88 82 01 10 c4 cf
    
    var (
        // SegmentSizeBytes is the preallocated size of each wal segment file.
        // The actual size might be larger than this. In general, the default
        // value should be used, but this is defined as an exported variable
        // so that tests can set a different segment size.
        SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
    )
    
  2. 读取record bytes + padding bytes
  3. 将其反序列化为Record,其结构如下,其中包括类型,CRC以及数据,校验时会根据data计算其CRC值,然后与Record中的CRC值进行比较,如果不相等,说明数据已经损坏。
  4. 获取所有Record类型为snapshot且其Index小于Committed hardState
// wal/walpb/record.pb.go
type Record struct {
    Type             int64  `protobuf:"varint,1,opt,name=type" json:"type"`
    Crc              uint32 `protobuf:"varint,2,opt,name=crc" json:"crc"`
    Data             []byte `protobuf:"bytes,3,opt,name=data" json:"data,omitempty"`
    XXX_unrecognized []byte `json:"-"`
}

如下所示,Record有五种类型

// wal/wal.go
const (
    metadataType int64 = iota + 1
    entryType
    stateType
    crcType
    snapshotType
)
// raft/raftpb/raft.pb.go
type HardState struct {
    Term             uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
    Vote             uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
    Commit           uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
    XXX_unrecognized []byte `json:"-"`
}
// raft/raftpb/raft.pb.go
type Entry struct {
    Term             uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
    Index            uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
    Type             EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
    Data             []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
    XXX_unrecognized []byte    `json:"-"`
}
// wal/walpb/record.pb.go
type Snapshot struct {
    Index            uint64 `protobuf:"varint,1,opt,name=index" json:"index"`
    Term             uint64 `protobuf:"varint,2,opt,name=term" json:"term"`
    XXX_unrecognized []byte `json:"-"`
}
// etcdserver/etcdserverpb/etcdserver.pb.go
type Metadata struct {
    NodeID           uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
    ClusterID        uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
    XXX_unrecognized []byte `json:"-"`
}`
+----------------------------------------------------------------------+
|  +-------------------------------+---------------------------------+ |
|  |     record bytes<56bits>      | padding <lower 3 bits of 8bits> | |
|  |-----------------------------------------------------------------+ |
|  |                              data                               | |
|  +-----------------------------------------------------------------+ |
|                                  ...                                 |
+----------------------------------------------------------------------+

Snapshot

snap目录下只包含.snap结尾的文件以及db文件。其中每个.snap文件命名格式为%016x-%016x.snap,即term-index.snap。而wal目录下的格式为%016x-%016x.wal,即seq-index.wal
其对应snappb.Snapshot结构:

// etcdserver/api/snap/snappb/snap.pb.go
type Snapshot struct {
    Crc              uint32 `protobuf:"varint,1,opt,name=crc" json:"crc"`
    Data             []byte `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
    XXX_unrecognized []byte `json:"-"`
}

snappb.Snapshot中的Data又对应raftpb.Snapshot

// raft/raftpb/raft.pb.go
type Snapshot struct {
    Data             []byte           `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
    Metadata         SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
    XXX_unrecognized []byte           `json:"-"`
}
// raft/raftpb/raft.pb.go
type SnapshotMetadata struct {
    ConfState        ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
    Index            uint64    `protobuf:"varint,2,opt,name=index" json:"index"`
    Term             uint64    `protobuf:"varint,3,opt,name=term" json:"term"`
    XXX_unrecognized []byte    `json:"-"`
}

启动/重启节点

根据是否存在WAL目录,以及是否是new cluster来判断执行启动节点还是重启节点,下面以重启节点为例进行介绍。

  1. 从WAL中读取 metadataraftpb.HardState以及所有的raftpb.Entry
// etcdserver/etcdserverpb/etcdserver.pb.go
type Metadata struct {
    NodeID           uint64 `protobuf:"varint,1,opt,name=NodeID" json:"NodeID"`
    ClusterID        uint64 `protobuf:"varint,2,opt,name=ClusterID" json:"ClusterID"`
    XXX_unrecognized []byte `json:"-"`
}
  1. 创建RaftCluster对象,其中metadata中的NodeIDClusterID分别对应RaftClusterlocalIDcid
// file: etcdserver/api/membership/cluster.go
// RaftCluster is a list of Members that belong to the same raft cluster
type RaftCluster struct {
   lg *zap.Logger

   localID types.ID
   cid     types.ID
   token   string

   v2store v2store.Store
   be      backend.Backend

   sync.Mutex // guards the fields below
   version    *semver.Version
   members    map[types.ID]*Member
   // removed contains the ids of removed members in the cluster.
   // removed id cannot be reused.
   removed map[types.ID]bool

   downgradeInfo *DowngradeInfo
}
  1. 创建MemoryStorage
    • Apply snapshot
    • 设置HardState,步骤一中获取的值
    • 将步骤一中获取的entries append到MemoryStorage
// file: raft/storage.go
// MemoryStorage implements the Storage interface backed by an
// in-memory array.
type MemoryStorage struct {
    // Protects access to all fields. Most methods of MemoryStorage are
    // run on the raft goroutine, but Append() is run on an application
    // goroutine.
    sync.Mutex

    hardState pb.HardState
    snapshot  pb.Snapshot
    // ents[i] has raft log position i+snapshot.Metadata.Index
    ents []pb.Entry
}
MemoryStorage

4 . 根据raft.Config配置重启Node
通常建议ElectionTick = 10 * HeartbeatTick,这样可以避免不必要的leader切换。

// file: raft/rawnode.go
// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
    raft       *raft
    prevSoftSt *SoftState
    prevHardSt pb.HardState
}
  • 根据raft.Config新建Raft。详见raft/raft.go文件。

    1. 校验Config

    2. 新建raftLog。如下图所示,需要注意的是raftLog的committedapplied初始值为firstIndex - 1log.unstable.offset 等于lastIndex + 1

      raftLog

        60     log := &raftLog{
        61         storage:         storage,
        62         logger:          logger,
        63         maxNextEntsSize: maxNextEntsSize,
        64     }
        65     firstIndex, err := storage.FirstIndex()
        66     if err != nil {
        67         panic(err) // TODO(bdarnell)
        68     }
        69     lastIndex, err := storage.LastIndex()
        70     if err != nil {
        71         panic(err) // TODO(bdarnell)
        72     }
        73     log.unstable.offset = lastIndex + 1
        74     log.unstable.logger = logger
        75     // Initialize our committed and applied pointers to the time of the last compaction.
        76     log.committed = firstIndex - 1
        77     log.applied = firstIndex - 1
      
    3. 从Memory Storage中获取HardStateConfState,前面提到过Memory Storage会Apply snapshot已经获取WAL中记录的Hard State信息。

    4. 构建raft信息,默认情况下每条message的最大size为1MB。

     const (
         // The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
         // Assuming the RTT is around 10ms, 1MB max size is large enough.
         maxSizePerMsg = 1 * 1024 * 1024
         // Never overflow the rafthttp buffer, which is 4096.
         // TODO: a better const?
         maxInflightMsgs = 4096 / 8
     )
    
    1. 根据上面步骤3获取的HardState设置raft的VoteTerm以及raftLog的committed
    2. 初始化节点为follower节点。包括为其指定step, tick, 重置Term值,将其Lead置为None,State置为Follower。
      695 func (r *raft) becomeFollower(term uint64, lead uint64) {
      696     r.step = stepFollower
      697     r.reset(term)
      698     r.tick = r.tickElection
      699     r.lead = lead
      700     r.state = StateFollower
      701     r.logger.Infof("%x became follower at term %d", r.id, r.Term)
      702 }
    
  • 根据Raft信息构建RawNode,将raft的HardStateSoft State保存在rawNode的prevSoftStprevHardSt

// RawNode is a thread-unsafe Node.
// The methods of this struct correspond to the methods of Node and are described
// more fully there.
type RawNode struct {
    raft       *raft
    prevSoftSt *SoftState
    prevHardSt pb.HardState
}
  1. 新建raft node,其中node为Node接口的标准实现。
// file: raft/node.go
// Node represents a node in a raft cluster.   
type Node interface 
// node is the canonical implementation of the Node interface
type node struct {
    propc      chan msgWithResult
    recvc      chan pb.Message
    confc      chan pb.ConfChangeV2
    confstatec chan pb.ConfState
    readyc     chan Ready
    advancec   chan struct{}
    tickc      chan struct{}
    done       chan struct{}
    stop       chan struct{}
    status     chan chan Status

    rn *RawNode
}
  1. 启动goroutine运行node.run()方法。详见raft/node.go文件。
// file: raft/raft.go
// StateType represents the role of a node in a cluster.
type StateType uint64

var stmap = [...]string{
    "StateFollower",
    "StateCandidate",
    "StateLeader",
    "StatePreCandidate",
}

Transport


Peer

远端raft node通过peer来进行表述,本地raft node通过peer来向远端发送messages,每个peer有两种底层的机制来发送messages,分别为streampipeline

etcd主要采用Stream消息通道和pipeline消息通道,其中Stream消息通道维护HTTP长连接,主要负责数据传输量较小,发送比较频繁的消息,而pipeline消息通道在传输数据完成后会立即关闭连接,主要负责传输数据量较大,发送频率较低的消息,例如传输快照数据。


Handler

/raft  --> pipelineHandler
/raft/stream/ --> streamHandler
/raft/sanpshot --> snapshotHandler
/raft/probing --> httpHealth

Message encoder/decoder

Message的encoder/decoder通过封装io.Writer/Reader,分别对Message进行编码,解码。

+----------------------------------------------------------------------+
|  +-------------------------------+---------------------------------+ |
|  |                    message size (8 bytes)                       | |
|  |-----------------------------------------------------------------+ |
|  |                              data                               | |
|  +-----------------------------------------------------------------+ |
|                                  ...                                 |
+----------------------------------------------------------------------+

编码时先写入8字节的message大小,然后才是序列号过后的数据。
解码正好与之相反,首先读取8字节的message大小,然后判断其是否大于512MB,如果大于则直接返错。如果小于阈值则将其反序列化为Message。也可以通过指定读取的字节大小,例如snapshot信息最大可为1TB。详细见etcdserver/api/rafthttp/msg_codec.go

// messageEncoder is a encoder that can encode all kinds of messages.
// It MUST be used with a paired messageDecoder.
type messageEncoder struct {
    w io.Writer
}

func (enc *messageEncoder) encode(m *raftpb.Message) error {
    if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
        return err
    }
    _, err := enc.w.Write(pbutil.MustMarshal(m))
    return err
}

// messageDecoder is a decoder that can decode all kinds of messages.
type messageDecoder struct {
    r io.Reader
}

var (
    readBytesLimit     uint64 = 512 * 1024 * 1024 // 512 MB
    ErrExceedSizeLimit        = errors.New("rafthttp: error limit exceeded")
)

func (dec *messageDecoder) decode() (raftpb.Message, error) {
    return dec.decodeLimit(readBytesLimit)
}

func (dec *messageDecoder) decodeLimit(numBytes uint64) (raftpb.Message, error) {
    var m raftpb.Message
    var l uint64
    if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
        return m, err
    }
    if l > numBytes {
        return m, ErrExceedSizeLimit
    }
    buf := make([]byte, int(l))
    if _, err := io.ReadFull(dec.r, buf); err != nil {
        return m, err
    }
    return m, m.Unmarshal(buf)
}

本文是基于etcd 3.5.0-pre版本。


References

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353