etcd学习笔记(四):恢复

运维一个etcd集群的一个基本要求是能够故障恢复。etcd有哪些机制支持故障恢复?如何进行故障恢复?

持久化

example

etcd的持久化依赖于快照和WAL,常见的文件格式如下:


image.png
image.png

其中wal文件的命名产生方式如下:

fpath := filepath.Join(w.dir, walName(w.seq()+1, w.enti+1))
其中:
enti是index of the last entry saved to the wal
seq()是最后一个wal文件的seq
func (w *WAL) seq() uint64 {
    t := w.tail()
    seq, _, err := parseWALName(filepath.Base(t.Name()))
    return seq
}

所以wal的seq是递增,index是存储的log的first index。
snapshot文件的命名产生方式如下:

    fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)

什么时候持久化

还是在ready数据结构的处理过程中进行持久化

                // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
                // ensure that recovery after a snapshot restore is possible.
                if !raft.IsEmptySnap(rd.Snapshot) {
                    // gofail: var raftBeforeSaveSnap struct{}
                    if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
                    }
                    // gofail: var raftAfterSaveSnap struct{}
                }

                // gofail: var raftBeforeSave struct{}
                if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                    r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
                }

wal数据比较容易理解,那么快照数据是怎么产生的呢?
还在在apply函数中,会尝试trigger snapshot
这时候会调用raftStorage的create snapshot,然后此snapshot就可以出现在下次的ready结构中。

可以看出,snapshot里存储的是V2 store的内容。理论上来说,snapshot对于etcdv3来说,有用的信息仅仅是index和term。

func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
    clone := s.v2store.Clone()
    s.KV().Commit()

    s.GoAttach(func() {
        lg := s.Logger()

        d, err := clone.SaveNoCopy()
        snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
        if err = s.r.storage.SaveSnap(snap); err != nil {
            lg.Panic("failed to save snapshot", zap.Error(err))
        }
        if err = s.r.storage.Release(snap); err != nil {
            lg.Panic("failed to release wal", zap.Error(err))
        }

        // keep some in memory log entries for slow followers.
        compacti := uint64(1)
        if snapi > s.Cfg.SnapshotCatchUpEntries {
            compacti = snapi - s.Cfg.SnapshotCatchUpEntries
        }

        err = s.r.raftStorage.Compact(compacti)
    })
}

而对于V3来说,db文件本身就是快照。

snap 和 restore

snap和restore

在etcdctl/ctlv3/command文件夹下。

snapshot save

snapshot save命令会调用etcd-server的grpc接口,最终会调用到backend interface提供的snapshot接口。

    snap := ms.bg.Backend().Snapshot()
    pr, pw := io.Pipe()

    defer pr.Close()

    go func() {
        snap.WriteTo(pw)
        if err := snap.Close(); err != nil {
            ms.lg.Warn("failed to close snapshot", zap.Error(err))
        }
        pw.Close()
    }()
func (b *backend) Snapshot() Snapshot {
    b.batchTx.Commit()

    b.mu.RLock()
    defer b.mu.RUnlock()
    tx, err := b.db.Begin(false)
    if err != nil {
        b.lg.Fatal("failed to begin tx", zap.Error(err))
    }

    stopc, donec := make(chan struct{}), make(chan struct{})
    dbBytes := tx.Size()
    go func() {
        defer close(donec)
        // sendRateBytes is based on transferring snapshot data over a 1 gigabit/s connection
        // assuming a min tcp throughput of 100MB/s.
        var sendRateBytes int64 = 100 * 1024 * 1024
        warningTimeout := time.Duration(int64((float64(dbBytes) / float64(sendRateBytes)) * float64(time.Second)))
        if warningTimeout < minSnapshotWarningTimeout {
            warningTimeout = minSnapshotWarningTimeout
        }
        start := time.Now()
        ticker := time.NewTicker(warningTimeout)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
        

            case <-stopc:
            }
        }
    }()

    return &snapshot{tx, stopc, donec}
}

snapshot restore

构建membership

restore是etcdctl单独完成,不依赖于etcd-server,依赖参数:

- restoreCluster: InitialCluster,
- restoreClusterToken,
- restoreDataDir,
- restoreWalDir,
- restorePeerURLs,
- restoreName,
- skipHashCheck,

为什么需要restoreCluster、restoreClusterToken、restorePeerURLs、restoreName等参数?用来生成membership

ics, err = types.NewURLsMap(cfg.InitialCluster)
    srv := config.ServerConfig{
        Logger:              s.lg,
        Name:                cfg.Name,
        PeerURLs:            pURLs,
        InitialPeerURLsMap:  ics,
        InitialClusterToken: cfg.InitialClusterToken,
    }
    if err = srv.VerifyBootstrap(); err != nil {
        return err
    }
s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics)

其中

func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
    c := NewCluster(lg)
    for name, urls := range urlsmap {
        m := NewMember(name, urls, token, nil)
        if _, ok := c.members[m.ID]; ok {
            return nil, fmt.Errorf("member exists with identical ID %v", m)
        }
        if uint64(m.ID) == raft.None {
            return nil, fmt.Errorf("cannot use %x as member id", raft.None)
        }
        c.members[m.ID] = m
    }
    c.genID()
    return c, nil
}

根据每个etcd-server的name和url,以及token,生成member,可以看出memeberID=peerURL+clusterToken的hash

func NewMember(name string, peerURLs types.URLs, clusterName string, now *time.Time) *Member {
    memberId := computeMemberId(peerURLs, clusterName, now)
    return newMember(name, peerURLs, memberId, false)
}
func computeMemberId(peerURLs types.URLs, clusterName string, now *time.Time) types.ID {
    peerURLstrs := peerURLs.StringSlice()
    sort.Strings(peerURLstrs)
    joinedPeerUrls := strings.Join(peerURLstrs, "")
    b := []byte(joinedPeerUrls)

    b = append(b, []byte(clusterName)...)
    if now != nil {
        b = append(b, []byte(fmt.Sprintf("%d", now.Unix()))...)
    }

    hash := sha1.Sum(b)
    return types.ID(binary.BigEndian.Uint64(hash[:8]))
}

clusterID是memberIDs的hash

func (c *RaftCluster) genID() {
    mIDs := c.MemberIDs()
    b := make([]byte, 8*len(mIDs))
    for i, id := range mIDs {
        binary.BigEndian.PutUint64(b[8*i:], uint64(id))
    }
    hash := sha1.Sum(b)
    c.cid = types.ID(binary.BigEndian.Uint64(hash[:8]))
}

恢复数据

如果没有传入wal和snap的恢复目录的话,就使用etcd的默认目录

dataDir := cfg.OutputDataDir
    if dataDir == "" {
        dataDir = cfg.Name + ".etcd"
    }
    if fileutil.Exist(dataDir) && !fileutil.DirEmpty(dataDir) {
        return fmt.Errorf("data-dir %q not empty or could not be read", dataDir)
    }

    walDir := cfg.OutputWALDir
    if walDir == "" {
        walDir = filepath.Join(dataDir, "member", "wal")
    } else if fileutil.Exist(walDir) {
        return fmt.Errorf("wal-dir %q exists", walDir)
    }

    s.name = cfg.Name
    s.srcDbPath = cfg.SnapshotPath
    s.walDir = walDir
    s.snapDir = filepath.Join(dataDir, "member", "snap")
    s.skipHashCheck = cfg.SkipHashCheck

然后恢复DB数据,这里可以看出,在恢复DB数据之后,会移除DB中关于member的相关bucket数据

func (s *v3Manager) saveDB() error {
    err := s.copyAndVerifyDB()
    if err != nil {
        return err
    }

    be := backend.NewDefaultBackend(s.outDbPath())
    defer be.Close()

    err = schema.NewMembershipBackend(s.lg, be).TrimMembershipFromBackend()
    if err != nil {
        return err
    }

    return nil
}

然后恢复snap和wal文件,这里有几个点:

  1. 会检查wal目录下,确保不存在任何的.wal文件
  2. 会将新的member关系存储到bolt-db
  3. 生成成员变更entry,提交到wal
  4. term=1,vote=peers[0].ID的hard state
  5. 根据V2 store恢复snapshot,并将snapshot 记录写入wal
  6. 快照存储metadata中的confstate,用于恢复peer关系
func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
    if err := fileutil.CreateDirAll(s.lg, s.walDir); err != nil {
        return nil, err
    }

    // add members again to persist them to the store we create.
    st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
    s.cl.SetStore(st)
    be := backend.NewDefaultBackend(s.outDbPath())
    defer be.Close()
    s.cl.SetBackend(schema.NewMembershipBackend(s.lg, be))
    for _, m := range s.cl.Members() {
        s.cl.AddMember(m, true)
    }

    m := s.cl.MemberByName(s.name)
    md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
    metadata, merr := md.Marshal()
    if merr != nil {
        return nil, merr
    }
    w, walerr := wal.Create(s.lg, s.walDir, metadata)
    if walerr != nil {
        return nil, walerr
    }
    defer w.Close()

    peers := make([]raft.Peer, len(s.cl.MemberIDs()))
    for i, id := range s.cl.MemberIDs() {
        ctx, err := json.Marshal((*s.cl).Member(id))
        if err != nil {
            return nil, err
        }
        peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
    }

    ents := make([]raftpb.Entry, len(peers))
    nodeIDs := make([]uint64, len(peers))
    for i, p := range peers {
        nodeIDs[i] = p.ID
        cc := raftpb.ConfChange{
            Type:    raftpb.ConfChangeAddNode,
            NodeID:  p.ID,
            Context: p.Context,
        }
        d, err := cc.Marshal()
        if err != nil {
            return nil, err
        }
        ents[i] = raftpb.Entry{
            Type:  raftpb.EntryConfChange,
            Term:  1,
            Index: uint64(i + 1),
            Data:  d,
        }
    }

    commit, term := uint64(len(ents)), uint64(1)
    hardState := raftpb.HardState{
        Term:   term,
        Vote:   peers[0].ID,
        Commit: commit,
    }
    if err := w.Save(hardState, ents); err != nil {
        return nil, err
    }

    b, berr := st.Save()
    if berr != nil {
        return nil, berr
    }
    confState := raftpb.ConfState{
        Voters: nodeIDs,
    }
    raftSnap := raftpb.Snapshot{
        Data: b,
        Metadata: raftpb.SnapshotMetadata{
            Index:     commit,
            Term:      term,
            ConfState: confState,
        },
    }
    sn := snap.New(s.lg, s.snapDir)
    if err := sn.SaveSnap(raftSnap); err != nil {
        return nil, err
    }
    snapshot := walpb.Snapshot{Index: commit, Term: term, ConfState: &confState}
    return &hardState, w.SaveSnapshot(snapshot)
}

最后根据commit和term恢复consistent index。

restart node

如果有wal则根据snapshot中的confstate恢复peer关系,否则的话根据启动参数生成peer关系哦

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容