运维一个etcd集群的一个基本要求是能够故障恢复。etcd有哪些机制支持故障恢复?如何进行故障恢复?
持久化
example
etcd的持久化依赖于快照和WAL,常见的文件格式如下:
其中wal文件的命名产生方式如下:
fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
其中:
enti是index of the last entry saved to the wal
seq()是最后一个wal文件的seq
func (w *WAL) seq() uint64 {
t := w.tail()
seq, _, err := parseWALName(filepath.Base(t.Name()))
return seq
}
所以wal的seq是递增,index是存储的log的first index。
snapshot文件的命名产生方式如下:
fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)
什么时候持久化
还是在ready数据结构的处理过程中进行持久化
// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
// ensure that recovery after a snapshot restore is possible.
if !raft.IsEmptySnap(rd.Snapshot) {
// gofail: var raftBeforeSaveSnap struct{}
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
}
// gofail: var raftAfterSaveSnap struct{}
}
// gofail: var raftBeforeSave struct{}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
}
wal数据比较容易理解,那么快照数据是怎么产生的呢?
还在在apply函数中,会尝试trigger snapshot
这时候会调用raftStorage的create snapshot,然后此snapshot就可以出现在下次的ready结构中。
可以看出,snapshot里存储的是V2 store的内容。理论上来说,snapshot对于etcdv3来说,有用的信息仅仅是index和term。
func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
clone := s.v2store.Clone()
s.KV().Commit()
s.GoAttach(func() {
lg := s.Logger()
d, err := clone.SaveNoCopy()
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}
// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}
err = s.r.raftStorage.Compact(compacti)
})
}
而对于V3来说,db文件本身就是快照。
snap 和 restore
snap和restore
在etcdctl/ctlv3/command文件夹下。
snapshot save
snapshot save命令会调用etcd-server的grpc接口,最终会调用到backend interface提供的snapshot接口。
snap := ms.bg.Backend().Snapshot()
pr, pw := io.Pipe()
defer pr.Close()
go func() {
snap.WriteTo(pw)
if err := snap.Close(); err != nil {
ms.lg.Warn("failed to close snapshot", zap.Error(err))
}
pw.Close()
}()
func (b *backend) Snapshot() Snapshot {
b.batchTx.Commit()
b.mu.RLock()
defer b.mu.RUnlock()
tx, err := b.db.Begin(false)
if err != nil {
b.lg.Fatal("failed to begin tx", zap.Error(err))
}
stopc, donec := make(chan struct{}), make(chan struct{})
dbBytes := tx.Size()
go func() {
defer close(donec)
// sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
// assuming a min tcp throughput of 100MB/s.
var sendRateBytes int64 = 100 * 1024 * 1024
warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
if warningTimeout < minSnapshotWarningTimeout {
warningTimeout = minSnapshotWarningTimeout
}
start := time.Now()
ticker := time.NewTicker(warningTimeout)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-stopc:
}
}
}()
return &snapshot{tx, stopc, donec}
}
snapshot restore
构建membership
restore是etcdctl单独完成,不依赖于etcd-server,依赖参数:
- restoreCluster: InitialCluster,
- restoreClusterToken,
- restoreDataDir,
- restoreWalDir,
- restorePeerURLs,
- restoreName,
- skipHashCheck,
为什么需要restoreCluster、restoreClusterToken、restorePeerURLs、restoreName等参数?用来生成membership
ics, err = types.NewURLsMap(cfg.InitialCluster)
srv := config.ServerConfig{
Logger: s.lg,
Name: cfg.Name,
PeerURLs: pURLs,
InitialPeerURLsMap: ics,
InitialClusterToken: cfg.InitialClusterToken,
}
if err = srv.VerifyBootstrap(); err != nil {
return err
}
s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics)
其中
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
c := NewCluster(lg)
for name, urls := range urlsmap {
m := NewMember(name, urls, token, nil)
if _, ok := c.members[m.ID]; ok {
return nil, fmt.Errorf("member exists with identical ID %v", m)
}
if uint64(m.ID) == raft.None {
return nil, fmt.Errorf("cannot use %x as member id", raft.None)
}
c.members[m.ID] = m
}
c.genID()
return c, nil
}
根据每个etcd-server的name和url,以及token,生成member,可以看出memeberID=peerURL+clusterToken的hash
func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
memberId := computeMemberId(peerURLs, clusterName, now)
return newMember(name, peerURLs, memberId, false)
}
func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
peerURLstrs := peerURLs.StringSlice()
sort.Strings(peerURLstrs)
joinedPeerUrls := strings.Join(peerURLstrs, "")
b := []byte(joinedPeerUrls)
b = append(b, []byte(clusterName)...)
if now != nil {
b = append(b, []byte(fmt.Sprintf("%d", now.Unix()))...)
}
hash := sha1.Sum(b)
return types.ID(binary.BigEndian.Uint64(hash[:8]))
}
clusterID是memberIDs的hash
func (c *RaftCluster) genID() {
mIDs := c.MemberIDs()
b := make([]byte, 8*len(mIDs))
for i, id := range mIDs {
binary.BigEndian.PutUint64(b[8*i:], uint64(id))
}
hash := sha1.Sum(b)
c.cid = types.ID(binary.BigEndian.Uint64(hash[:8]))
}
恢复数据
如果没有传入wal和snap的恢复目录的话,就使用etcd的默认目录
dataDir := cfg.OutputDataDir
if dataDir == "" {
dataDir = cfg.Name + ".etcd"
}
if fileutil.Exist(dataDir) && !fileutil.DirEmpty(dataDir) {
return fmt.Errorf("data-dir %q not empty or could not be read", dataDir)
}
walDir := cfg.OutputWALDir
if walDir == "" {
walDir = filepath.Join(dataDir, "member", "wal")
} else if fileutil.Exist(walDir) {
return fmt.Errorf("wal-dir %q exists", walDir)
}
s.name = cfg.Name
s.srcDbPath = cfg.SnapshotPath
s.walDir = walDir
s.snapDir = filepath.Join(dataDir, "member", "snap")
s.skipHashCheck = cfg.SkipHashCheck
然后恢复DB数据,这里可以看出,在恢复DB数据之后,会移除DB中关于member的相关bucket数据
func (s *v3Manager) saveDB() error {
err := s.copyAndVerifyDB()
if err != nil {
return err
}
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
err = schema.NewMembershipBackend(s.lg, be).TrimMembershipFromBackend()
if err != nil {
return err
}
return nil
}
然后恢复snap和wal文件,这里有几个点:
- 会检查wal目录下,确保不存在任何的.wal文件
- 会将新的member关系存储到bolt-db
- 生成成员变更entry,提交到wal
- term=1,vote=peers[0].ID的hard state
- 根据V2 store恢复snapshot,并将snapshot 记录写入wal
- 快照存储metadata中的confstate,用于恢复peer关系
func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
if err := fileutil.CreateDirAll(s.lg, s.walDir); err != nil {
return nil, err
}
// add members again to persist them to the store we create.
st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
s.cl.SetStore(st)
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
s.cl.SetBackend(schema.NewMembershipBackend(s.lg, be))
for _, m := range s.cl.Members() {
s.cl.AddMember(m, true)
}
m := s.cl.MemberByName(s.name)
md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
metadata, merr := md.Marshal()
if merr != nil {
return nil, merr
}
w, walerr := wal.Create(s.lg, s.walDir, metadata)
if walerr != nil {
return nil, walerr
}
defer w.Close()
peers := make([]raft.Peer, len(s.cl.MemberIDs()))
for i, id := range s.cl.MemberIDs() {
ctx, err := json.Marshal((*s.cl).Member(id))
if err != nil {
return nil, err
}
peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
}
ents := make([]raftpb.Entry, len(peers))
nodeIDs := make([]uint64, len(peers))
for i, p := range peers {
nodeIDs[i] = p.ID
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: p.ID,
Context: p.Context,
}
d, err := cc.Marshal()
if err != nil {
return nil, err
}
ents[i] = raftpb.Entry{
Type: raftpb.EntryConfChange,
Term: 1,
Index: uint64(i + 1),
Data: d,
}
}
commit, term := uint64(len(ents)), uint64(1)
hardState := raftpb.HardState{
Term: term,
Vote: peers[0].ID,
Commit: commit,
}
if err := w.Save(hardState, ents); err != nil {
return nil, err
}
b, berr := st.Save()
if berr != nil {
return nil, berr
}
confState := raftpb.ConfState{
Voters: nodeIDs,
}
raftSnap := raftpb.Snapshot{
Data: b,
Metadata: raftpb.SnapshotMetadata{
Index: commit,
Term: term,
ConfState: confState,
},
}
sn := snap.New(s.lg, s.snapDir)
if err := sn.SaveSnap(raftSnap); err != nil {
return nil, err
}
snapshot := walpb.Snapshot{Index: commit, Term: term, ConfState: &confState}
return &hardState, w.SaveSnapshot(snapshot)
}
最后根据commit和term恢复consistent index。
restart node
如果有wal则根据snapshot中的confstate恢复peer关系,否则的话根据启动参数生成peer关系哦