EtcdRaft源码分析(日志复制)

image.png

Leader

假定现在已经选出leader,开始要准备给其他人做日志同步了。

首先你要成为一个真正的Leader,需要做前期准备。

  • 从Candidate转变为Leader,不是只是换个名字而已
  • 转变完成后,就要给其他成员同步日志了

becomeLeader

func (r *raft) becomeLeader() {
   // TODO(xiangli) remove the panic when the raft implementation is stable
   if r.state == StateFollower {
      panic("invalid transition [follower -> leader]")
   }
   r.step = stepLeader
   r.reset(r.Term)
   r.tick = r.tickHeartbeat
   r.lead = r.id
   r.state = StateLeader
   // Followers enter replicate mode when they've been successfully probed
   // (perhaps after having received a snapshot as a result). The leader is
   // trivially in this state. Note that r.reset() has initialized this
   // progress with the last index already.
   r.prs[r.id].becomeReplicate()

   // Conservatively set the pendingConfIndex to the last index in the
   // log. There may or may not be a pending config change, but it's
   // safe to delay any future proposals until we commit all our
   // pending log entries, and scanning the entire tail of the log
   // could be expensive.
   r.pendingConfIndex = r.raftLog.lastIndex()

   emptyEnt := pb.Entry{Data: nil}
   if !r.appendEntry(emptyEnt) {
      // This won't happen because we just called reset() above.
      r.logger.Panic("empty entry was dropped")
   }
   // As a special case, don't count the initial empty entry towards the
   // uncommitted log quota. This is because we want to preserve the
   // behavior of allowing one entry larger than quota if the current
   // usage is zero.
   r.reduceUncommittedSize([]pb.Entry{emptyEnt})
   r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

  • 首先之前的身份不能是follower
  • 之后step处理会让stepLeader托管
  • 将自己设为ProgressStateReplicate,且Next=Match+1

reset(r.Term)

func (r *raft) reset(term uint64) {
   if r.Term != term {
      r.Term = term
      r.Vote = None
   }
   r.lead = None

   r.electionElapsed = 0
   r.heartbeatElapsed = 0
   r.resetRandomizedElectionTimeout()

   r.abortLeaderTransfer()

   r.votes = make(map[uint64]bool)
   r.forEachProgress(func(id uint64, pr *Progress) {
      *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
      if id == r.id {
         pr.Match = r.raftLog.lastIndex()
      }
   })

   r.pendingConfIndex = 0
   r.uncommittedSize = 0
   r.readOnly = newReadOnly(r.readOnly.option)
}

  • 设置任期为当前任期
  • 投票,lead,选举计时器,心跳计时器,随机选举超时时间,leader转移,投票机,pendingConfigIndex,未提交的entrySize,readOnly全部清零
    • pendingConfigIndex
    • readOnly
    • uncommittedSize
  • 重置本地保存的其他节点的进度
    • 这里需要注意的是,将对方的Next设为跟leader保持一致,是leader假定大家都跟我一致。r.raftLog.lastIndex() + 1
    • 每个节点的Progress的状态初始都为Probe

tickHeartbeat

r.heartbeatElapsed++
r.electionElapsed++

if r.electionElapsed >= r.electionTimeout {
   r.electionElapsed = 0
   if r.checkQuorum {
      r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
   }
   // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
   if r.state == StateLeader && r.leadTransferee != None {
      r.abortLeaderTransfer()
   }
}

if r.state != StateLeader {
   return
}

if r.heartbeatElapsed >= r.heartbeatTimeout {
   r.heartbeatElapsed = 0
   r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}

  • 如果成员接受Leader的同步请求的情况
  • 还记得么,Leader上任的时候大家都是Probe状态,现在转换成ProgressStateReplicate,同时他的Next当然是Match+1
*   ProgressStateSnapshot 见[EtcdRaft源码分析(快照复制)](https://www.jianshu.com/p/a48532b45792)
*   ProgressStateReplicate
    *   到这里说明对方已经接受了日志复制,那么在ins里面删除小于或等于这次index的部分。

bcastAppend

func (r *raft) maybeSendAppend (to uint64, sendIfEmpty bool) bool {
   pr := r.getProgress(to)
   if pr.IsPaused() {
      return false
   }
   m := pb.Message{}
   m.To = to

   term, errt := r.raftLog.term(pr.Next - 1)
   ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
   if len(ents) == 0 && !sendIfEmpty {
      return false
   }

   if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
      if !pr.RecentActive {
         r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
         return false
      }

      m.Type = pb.MsgSnap
      snapshot, err := r.raftLog.snapshot()
      if err != nil {
         if err == ErrSnapshotTemporarilyUnavailable {
            r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
            return false
         }
         panic(err) // TODO(bdarnell)
      }
      if IsEmptySnap(snapshot) {
         panic("need non-empty snapshot")
      }
      m.Snapshot = snapshot
      sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
      r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
         r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
      pr.becomeSnapshot(sindex)
      r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
   } else {
      m.Type = pb.MsgApp
      m.Index = pr.Next - 1
      m.LogTerm = term
      m.Entries = ents
      m.Commit = r.raftLog.committed
      if n := len(m.Entries); n != 0 {
         switch pr.State {
         // optimistically increase the next when in ProgressStateReplicate
         case ProgressStateReplicate:
            last := m.Entries[n-1].Index
            pr.optimisticUpdate(last)
            pr.ins.add(last)
         case ProgressStateProbe:
            pr.pause()
         default:
            r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
         }
      }
   }
   r.send(m)
   return true
}

  • 拿到对方的Progress,也就是进度。
  • 打包当前节点Next之后的entries
  • 打包当前节点Next-1的(任期,index),作为接收人校验用
  • 将自己committed的情况发给对方
  • 准备发MsgApp消息给对方
  • 遍历entries
    • 如果对方的状态是ProgressStateReplicate
      • 更新对方进度的Next为最新的last
      • 将last加到ins里面,注意这个ins是个类环形的队列。
  • Snapshot的情况
    • 如果Next-1的任期或之后的entries如果查不到,那肯定就在snapshot里面
    • 拿出当前节点存储的snapshot,有可能在unstable或storage里面
    • 将对方的Progress设为ProgressStateSnapshot,且设置PendingSnapshot为snapshot的index
    • 准备发MsgSnap消息给对方

Follower

case pb.MsgApp:
   r.electionElapsed = 0
   r.lead = m.From
   r.handleAppendEntries(m)

  • 首先Follower认为只有Leader才能发这种消息,所以只要收到就认他为Leader
  • 同时选举计时要清零
  • 真正处理的逻辑在handleAppendEntries里面

handleAppendEntries

func (r *raft) handleAppendEntries(m pb.Message) {
   if m.Index < r.raftLog.committed {
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
      return
   }

   if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
   } else {
      r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
   }
}

func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
    if l.matchTerm(index, logTerm) {
        lastnewi = index + uint64(len(ents))
        ci := l.findConflict(ents)
        switch {
        case ci == 0:
        case ci <= l.committed:
            l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
        default:
            offset := index + 1
            l.append(ents[ci-offset:]...)
        }
        l.commitTo(min(committed, lastnewi))
        return lastnewi, true
    }
    return 0, false
}

  • 如果比Follower已经committed还要小,他会把自己committed的情况发回给Leader,没关系,将自己committed的情况发回给Leader
  • 在mayAppend的时候会去比较leader发来的index的(任期,index)是否一致。如果不一致给Leader报告你给的index位置的entry任期跟我对不上。有可能我根本都没有,有可能是完全不一样的东西。你的同步请求我拒绝,并附上我现在的最后一位。RejectHint: r.raftLog.lastIndex(),然后冲突点就是发来的index。
*   报告中的最后一位的作用,待分析
    
    
*   Raft中只要某个位置的(任期,index)一致,那么index之前都是一致的。
  • 如果能对上,说明插入位置前一位我们都一致,这样可以放心往后append了。
*   首先我们算出append之后新的最后一位,lastnewi
*   findConflict
    *   当然了,最好的情况是正好能接上,也就不存在冲突的可能性,无脑往后append新的entry就好了
    *   还有的情况是,follower本地存储的entry比leader想象的还要多,还要复杂。那怎么办,当然是从前往后找到第一个冲突点,然后之后的全部不要,跟leader保持一致。
*   然后跟Leader要求的committed保持一致
*   然后给Leader报告说,你要求的我都执行完了,附上我现在最新的last位置
  • 另外还有一种情况是,Leader的探测请求,Follower

Leader

下面我们剖析下Leader在收到成员的同步响应之后的处理。

case pb.MsgAppResp:
   pr.RecentActive = true

   if m.Reject {
      r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
         r.id, m.RejectHint, m.From, m.Index)
      if pr.maybeDecrTo(m.Index, m.RejectHint) {
         r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
         if pr.State == ProgressStateReplicate {
            pr.becomeProbe()
         }
         r.sendAppend(m.From)
      }
   } else {
      oldPaused := pr.IsPaused()
      if pr.maybeUpdate(m.Index) {
         switch {
         case pr.State == ProgressStateProbe:
            pr.becomeReplicate()
         case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
            r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
            // Transition back to replicating state via probing state
            // (which takes the snapshot into account). If we didn't
            // move to replicating state, that would only happen with
            // the next round of appends (but there may not be a next
            // round for a while, exposing an inconsistent RaftStatus).
            pr.becomeProbe()
            pr.becomeReplicate()
         case pr.State == ProgressStateReplicate:
            pr.ins.freeTo(m.Index)
         }

         if r.maybeCommit() {
            r.bcastAppend()
         } else if oldPaused {
            // If we were paused before, this node may be missing the
            // latest commit index, so send it.
            r.sendAppend(m.From)
         }
         // We've updated flow control information above, which may
         // allow us to send multiple (size-limited) in-flight messages
         // at once (such as when transitioning from probe to
         // replicate, or when freeTo() covers multiple messages). If
         // we have more entries to send, send as many messages as we
         // can (without sending empty messages for the commit index)
         for r.maybeSendAppend(m.From, false) {
         }
         // Transfer leadership is in progress.
         if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
            r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
            r.sendTimeoutNow(m.From)
         }
      }
   }

agree

  • 如果成员接受Leader的同步请求的情况
  • 还记得么,Leader上任的时候大家都是Probe状态,现在转换成ProgressStateReplicate,同时他的Next当然是Match+1
*   ProgressStateSnapshot 见EtcdRaft源码分析(快照复制)
*   ProgressStateReplicate 待分析
    *   到这里说明对方已经接受了日志复制,那么在ins里面删除小于或等于这次index的部分。

maybeUpdate

func (pr *Progress) maybeUpdate(n uint64) bool {
   var updated bool
   if pr.Match < n {
      pr.Match = n
      updated = true
      pr.resume()
   }
   if pr.Next < n+1 {
      pr.Next = n + 1
   }
   return updated
}

  • maybeUpdate,从上面分析就知道,没有拒绝就说明,大家在某种程度是一致的,对方发来的index就表示leader发给他的数据同步到哪里了。首先第一件事情,就是记录下来对方同步的进度。

maybeCommit

func (r *raft) maybeCommit() bool {
   // Preserving matchBuf across calls is an optimization
   // used to avoid allocating a new slice on each call.
   if cap(r.matchBuf) < len(r.prs) {
      r.matchBuf = make(uint64Slice, len(r.prs))
   }
   mis := r.matchBuf[:len(r.prs)]
   idx := 0
   for _, p := range r.prs {
      mis[idx] = p.Match
      idx++
   }
   sort.Sort(mis)
   mci := mis[len(mis)-r.quorum()]
   return r.raftLog.maybeCommit(mci, r.Term)
}

  • maybeCommit, 这里会统计各个成员的进度,如果超过一半的人的同步进度Match已经超过了Leader的committed位置,这个时候Leader才可以安心去commit本地entry了。
  • 最后将commit的变更再次发给成员去同步

reject

如果被对方拒绝

if m.Reject {
   r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
      r.id, m.RejectHint, m.From, m.Index)
   if pr.maybeDecrTo(m.Index, m.RejectHint) {
      r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
      if pr.State == ProgressStateReplicate {
         pr.becomeProbe()
      }
      r.sendAppend(m.From)
   }
}

maybeDecrTo

func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
   if pr.State == ProgressStateReplicate {
      // the rejection must be stale if the progress has matched and "rejected"
      // is smaller than "match".
      if rejected <= pr.Match {
         return false
      }
      // directly decrease next to match + 1
      pr.Next = pr.Match + 1
      return true
   }

   // the rejection must be stale if "rejected" does not match next - 1
   if pr.Next-1 != rejected {
      return false
   }

   if pr.Next = min(rejected, last+1); pr.Next < 1 {
      pr.Next = 1
   }
   pr.resume()
   return true
}

  • 如果对方进度的状态是ProgressStateReplicate,如果冲突点居然比Match要小,感觉不可思议,直接忽略。
    • 否则的话,直接跳到Match+1的地方作为进度的Next,相当于Match之后的全部丢掉,准备重新开始同步。简单直接粗暴。
  • 一般来说pr.Next-1是应该等于rejected的,想想看rejected是插入位置的前一位,专门用来校验用的,而pr.Next-1不也是插入位置的前一位么?所以如果不相等,感觉不可思议,直接忽略。
  • 将对方进度的Next回退到rejectted,其实就相当于Next回退一位,为什么这么做,其实就是在探测啦,回退一位,发给Follower看看是不是还是冲突,不行,回来,再回退一位,如此往复。总会找到相同的时候
  • 如果maybeDecrTo能够成功回退,但还不确定回退的位置,对方能接受,这个时候如果对方是ProgressStateReplicate状态,那么先转为ProgressStateProbe。
  • 好了,该回退的也回退了,将最新的entries按回退的位置再发给对方看看。
  • 可以看到ProgressStateReplicate会直接回退到Match+1, 去试试看,如果还被拒绝,那么会转成ProgressStateProbe,而ProgressStateProbe只会每次回退一位,去试试看。

总结

到这里,整个流程还可以往下继续在Leader和Follower之间来回往复,但是,大体的逻辑就是这样,可以说算法非常精妙。希望你能看懂我在说什么。

作者:Pillar_Zhong
链接:https://www.jianshu.com/p/6496e6944737
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,193评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,306评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,130评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,110评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,118评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,085评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,007评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,844评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,283评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,508评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,395评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,985评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,630评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,797评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,653评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,553评论 2 352

推荐阅读更多精彩内容