etcd学习笔记(三): Propose

我们已经在raft-example看过了对Propose的简单处理了,但是真正的etcd对Propose的处理更加复杂。主要是有如下几个点:

  1. consistent index。用于处理boltdb和raftlog之间的幂等性。
  2. 同步返回。由于raft的log复制是异步的,如何做到同步返回结果。

当blotdb用作状态机的时候,wal和blotdb作为两个不同的实体,很有可能存在不一致的情况。所以etcd在blotdb中存储一条记录consistent-index,来代表已经apply到blot-db上成功的log index,这样当根据wal恢复blot-db的时候,就可以判断log index是不是已经被apply过。

处理过程

在etcd-server中,一条propose的处理过程:
首先是为每一条请求注册一个唯一requestID,然后register并等待requestID处理完成

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {

    ai := s.getAppliedIndex()
    ci := s.getCommittedIndex()
    if ci > ai+maxGapBetweenApplyAndCommitIndex {
        return nil, ErrTooManyRequests
    }

    r.Header = &pb.RequestHeader{
        ID: s.reqIDGen.Next(),
    }

    id := r.ID
    if id == 0 {
        id = r.Header.ID
    }
    ch := s.w.Register(id)

    cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
    defer cancel()

    start := time.Now()
    err = s.r.Propose(cctx, data)


    select {
    case x := <-ch:
        return x.(*applyResult), nil
    case <-cctx.Done():
        proposalsFailed.Inc()
        s.w.Trigger(id, nil) // GC wait
        return nil, s.parseProposeCtxErr(cctx.Err(), start)
    case <-s.done:
        return nil, ErrStopped
    }
}

请求转发到raft-node处理

根据之前的知识,raft-node经过一番处理之后,交给上层的ready结构来处理,首先msg转发到leader,然后leader调用processMsg,这里主要的操作是copy log到follower。


                ap := apply{
                    entries:  rd.CommittedEntries,
                    snapshot: rd.Snapshot,
                    notifyc:  notifyc,
                }

                updateCommittedIndex(&ap, rh)

                select {
                case r.applyc <- ap:
                case <-r.stopped:
                    return
                }

                if islead {
                    r.transport.Send(r.processMessages(rd.Messages))
                }

                // Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
                // ensure that recovery after a snapshot restore is possible.
                if !raft.IsEmptySnap(rd.Snapshot) {
                    if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
                    }
                }

                if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                    r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
                }
                if !raft.IsEmptyHardState(rd.HardState) {
                    proposalsCommitted.Set(float64(rd.HardState.Commit))
                }

                if !raft.IsEmptySnap(rd.Snapshot) {
                    // Force WAL to fsync its hard state before Release() releases
                    // old data from the WAL. Otherwise could get an error like:
                    // panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
                    // See https://github.com/etcd-io/etcd/issues/10219 for more details.
                    if err := r.storage.Sync(); err != nil {
                        r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
                    }

                    // etcdserver now claim the snapshot has been persisted onto the disk
                    notifyc <- struct{}{}

                    // gofail: var raftBeforeApplySnap struct{}
                    r.raftStorage.ApplySnapshot(rd.Snapshot)
                    r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
                    // gofail: var raftAfterApplySnap struct{}

                    if err := r.storage.Release(rd.Snapshot); err != nil {
                        r.lg.Fatal("failed to release Raft wal", zap.Error(err))
                    }
                    // gofail: var raftAfterWALRelease struct{}
                }

                r.raftStorage.Append(rd.Entries)

                if !islead {
                    msgs := r.processMessages(rd.Messages)
                    notifyc <- struct{}{}
                    r.transport.Send(msgs)
                } else {
                    // leader already processed 'MsgSnap' and signaled
                    notifyc <- struct{}{}
                }

                r.Advance()

最终调用的是apply

        case ap := <-s.r.apply():
            f := func(context.Context) { s.applyAll(&ep, &ap) }
            sched.Schedule(f)


func (s *EtcdServer) apply(
    es []raftpb.Entry,
    confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
    s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
    for i := range es {
        e := es[i]

        switch e.Type {
        case raftpb.EntryNormal:
            s.applyEntryNormal(&e)
            s.setAppliedIndex(e.Index)
            s.setTerm(e.Term)

        case raftpb.EntryConfChange:
            // We need to apply all WAL entries on top of v2store
            // and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
            shouldApplyV3 := membership.ApplyV2storeOnly

            // set the consistent index of current executing entry
            if e.Index > s.consistIndex.ConsistentIndex() {
                s.consistIndex.SetConsistentIndex(e.Index, e.Term)
                shouldApplyV3 = membership.ApplyBoth
            }

            var cc raftpb.ConfChange
            pbutil.MustUnmarshal(&cc, e.Data)
            removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
            s.setAppliedIndex(e.Index)
            s.setTerm(e.Term)
            shouldStop = shouldStop || removedSelf
            s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})

        }
        appliedi, appliedt = e.Index, e.Term
    }
    return appliedt, appliedi, shouldStop
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容