Etcd Raft (二) 消息类型整理

Overview

Etcd中的Raft使用了protobuf进行选举轮次间的消息通讯,其中包含的消息类型定义于raft.proto中:

// For description of different message types, see:
// https://pkg.go.dev/go.etcd.io/etcd/raft/v3#hdr-MessageType
enum MessageType {
    MsgHup             = 0;
    MsgBeat            = 1;
    MsgProp            = 2;
    MsgApp             = 3;
    MsgAppResp         = 4;
    MsgVote            = 5;
    MsgVoteResp        = 6;
    MsgSnap            = 7;
    MsgHeartbeat       = 8;
    MsgHeartbeatResp   = 9;
    MsgUnreachable     = 10;
    MsgSnapStatus      = 11;
    MsgCheckQuorum     = 12;
    MsgTransferLeader  = 13;
    MsgTimeoutNow      = 14;
    MsgReadIndex       = 15;
    MsgReadIndexResp   = 16;
    MsgPreVote         = 17;
    MsgPreVoteResp     = 18;
}

其核心为Message类,记录当前的任期(Term),信息的发送和接收方(From->To),以及需要传递的Context、Commit、Entry等字段。

type Message struct {
    Type MessageType  
    To   uint64      
    From uint64      
    Term uint64      
    // logTerm is generally used for appending Raft logs to followers. For example,
    // (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at
    // index=101, and the term of entry at index 100 is 5.
    // (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some
    // entries from its leader as it already has an entry with term 5 at index 100.
    LogTerm    uint64   
    Index      uint64   
    Entries    []Entry  
    Commit     uint64   
    Snapshot   Snapshot 
    Reject     bool     
    RejectHint uint64   
    Context    []byte   
}

(1) MsgHup

MsgHup用于开启选举。假如Follower和Candidate没有收到心跳,则开启一轮新的选举。实现于raft.gotickElection中:

// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
    r.electionElapsed++

    // 自己可以被promote & election timeout 超时了,规定时间没有听到心跳发起选举
    // 发送MsgHup
    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {
            r.logger.Debugf("error occurred during election: %v", err)
        }
    }
}

在核心方法Step中,接收到MsgHup则开启选举。在这里如果preVote为true,则需要进行第一次选举,否则执行常规选举:

  • campaignPreElection :第一次选举
  • campaignElection : 常规的基于时间的选举
  • campaginTransfer : 表示转移Leader
switch m.Type {
case pb.MsgHup:
    if r.preVote {
        r.hup(campaignPreElection)
    } else {
        r.hup(campaignElection)
    }
...
}

其中hup方法本质就是调用campagin,将raft实例转化为Candidate状态,并发送voteMsg给其他node:

func (r *raft) hup(t CampaignType) {
    // 主要核对是否是合法的Follower  
    if r.state == StateLeader || !r.promotable(){
        return
    }
    ...
    }
    // 核对完成,开始选举
    r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
    r.campaign(t)
}

func (r *raft) campaign(t CampaignType) {
    ...
    var term uint64
    var voteMsg pb.MessageType
    // 更改状态
    if t == campaignPreElection {
        r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        // PreVote RPCs are sent for the next term before we've incremented r.Term.
        term = r.Term + 1
    } else {
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
    // 投票,超过半数返回quorum的VoteWon,产生Leader
    if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
        // 假如只有一个node还赢得了选举,改变到正常选举状态
        if t == campaignPreElection {
            r.campaign(campaignElection)
        } else {
            r.becomeLeader()
        }
        return
    }
    // 获取所有id并排序
    var ids []uint64
    {
        idMap := r.prs.Voters.IDs()
        ids = make([]uint64, 0, len(idMap))
        for id := range idMap {
            ids = append(ids, id)
        }
        sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
    }
    for _, id := range ids {
        // 跳过自己 
        if id == r.id {
            continue
        }
        r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
            r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
        // 假如要转让Leader,记录上下文
        var ctx []byte
        if t == campaignTransfer {
            ctx = []byte(t)
        }
        // 以voteMsg的形式发送给其他follower当前的信息  
        r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
    }
}

(2) MsgBeat & MsgHeartbeat & MsgHeartbeatResp

  1. MsgBeat 是一个内部类型,在tickHeartbeat(tickElection外的另一个时钟方法)中触发Leader向Follower发送心跳信号MsgHeartbeat
func (r *raft) tickHeartbeat() {
    r.heartbeatElapsed++
    r.electionElapsed++

    // electionTimeout : default 10次
    // 超过则需要重新计时
    if r.electionElapsed >= r.electionTimeout {
        r.electionElapsed = 0
        if r.checkQuorum {
            if err := r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}); err != nil {
                r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
            }
        }
        // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
        if r.state == StateLeader && r.leadTransferee != None {
            r.abortLeaderTransfer()
        }
    }

    // Only leader sends heartbeat
    if r.state != StateLeader {
        return
    }
    
    if r.heartbeatElapsed >= r.heartbeatTimeout {
        r.heartbeatElapsed = 0
        if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil {
            r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
        }
    }
}

Leader接收到MsgBeat后开始发送心跳:

switch m.Type {
case pb.MsgBeat:
    r.bcastHeartbeat()
    return nil
...
}

// bcastHeartbeat 发送RPC,包含之前的上下文
func (r *raft) bcastHeartbeat() {
    lastCtx := r.readOnly.lastPendingRequestCtx()
    if len(lastCtx) == 0 {
        r.bcastHeartbeatWithCtx(nil)
    } else {
        r.bcastHeartbeatWithCtx([]byte(lastCtx))
    }
}

func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
    r.prs.Visit(func(id uint64, _ *tracker.Progress) {
        if id == r.id {
            return
        }
        r.sendHeartbeat(id, ctx)
    })
}

  1. MsgHeartbeatsendHeartBeat中发送来自Leader的心跳。
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
    // 发送当前最小匹配的commit : min(to.matched, r.committed)
    commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
    m := pb.Message{
        To:      to,
        Type:    pb.MsgHeartbeat,
        Commit:  commit,
        Context: ctx,
    }

    r.send(m)
}

在核心函数Step中,当收到的msg任期大于当前任期时,在以下情况中需要变为Follower:

  • msgApp : 日志信息
  • msgHeartbeart :心跳信息
  • msgSnap :快照信息
case m.Term > r.Term:
    ...
    if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                r.becomeFollower(m.Term, m.From)
            } else {
                r.becomeFollower(m.Term, None)
            }
}

MsgHeartbeat 的任期高于Candidate的任期,Candidate变回Follower;若是Follower则重新计时。

stepCandidate:
    case pb.MsgHeartbeat:
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleHeartbeat(m)

stepFollower:
    case pb.MsgHeartbeat:
        r.electionElapsed = 0
        r.lead = m.From
        r.handleHeartbeat(m)
  1. MsgHeartbeatRespMsgHeartbeat的响应,在handleHeartbeat中由Follower(Candidate已经变成了Follower)记录当前commit,随后发送响应。
func (r *raft) handleHeartbeat(m pb.Message) {
    r.raftLog.commitTo(m.Commit)
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

Leader得知是哪个Follower回应了自己的心跳,假如Leader的commitId大于Follower的match index,则发送sendAppend补充日志。

StepLeader:
case pb.MsgHeartbeatResp:
        pr.RecentActive = true
        pr.ProbeSent = false

        // 假如Follower尚未接受的MsgApp已经满了,空出一个slot
        if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
            pr.Inflights.FreeFirstOne()
        }

        // 只有当 leader last committed index 大于 follower Match index, 让Follower追加日志
        if pr.Match < r.raftLog.lastIndex() {
            r.sendAppend(m.From)
        }

        if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
            return nil
        }

        // 处理只读相关
        if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
            return nil
        }

        rss := r.readOnly.advance(m)
        for _, rs := range rss {
            if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
                r.send(resp)
            }
        }

(3) MsgProp

MsgProp提出向log entry追加日志的提议,这些提议将被重定向给Leader,视作Leader的本地消息,因此在send中我们不能将发送者的Term追加到待发送的提议中。MsgReadIndex也是同理。

send:
    ...
    if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
        m.Term = r.Term
    }

Follower接收到MsgProp后,发送提议给Leader。假如Candidate收到MsgProp,说明Follower发给Leader的信息被发给了自己,当前没有Leader,则放弃该提议。

stepFollower:
case pb.MsgProp:
    ...
    m.To = r.lead
    r.send(m)

stepCandidate:
case pb.MsgProp:
    r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
    return ErrProposalDropped

Leader处理MsgProp时,首先调用appendEntry将条目追加到自己的Log中,随后调用bcastAppend将这些条目发送给其他节点。

stepLeader:
case pb.MsgProp:
    ...
    if !r.appendEntry(m.Entries...) {
        return ErrProposalDropped
    }
    r.bcastAppend()
    return nil

(4) MsgApp & MsgAppResp

MsgApp包含要复制的日志entry和当前的Term,在maybeSendAppend中发送该消息。假如无法正确获取Term和Entry,可能当前日志条目已经被snapshot了,因此发送snapshot。否则就发送追加这段entry的消息。

func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
    ...
    term, errt := r.raftLog.term(pr.Next - 1)
    ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
    if len(ents) == 0 && !sendIfEmpty {
        return false
    }

    if errt != nil || erre != nil { 
       // send snapshot if we failed to get term or entries
        ...
    } else {
        m.Type = pb.MsgApp
        m.Index = pr.Next - 1
        m.LogTerm = term
        m.Entries = ents
        m.Commit = r.raftLog.committed
        if n := len(m.Entries); n != 0 {
            switch pr.State {
            // optimistically increase the next when in StateReplicate
            case tracker.StateReplicate:
                last := m.Entries[n-1].Index
                pr.OptimisticUpdate(last)
                pr.Inflights.Add(last)
            // last index 未知,因此无法更新
            case tracker.StateProbe:
                pr.ProbeSent = true
            default:
                r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
            }
        }
    }
    r.send(m)
    return true
}

Follower和Candidate收到MsgApp的处理后,需要追加日志。在此有可能发生日志冲突(Follower和Leader日志不一致),需要根据hintIndex查找当前冲突的index。
在常规情况下,Leader的日志长度要长于Follower的长度,因此Follower的日志可以看作是Leader的前缀。此处的日志冲突主要是由于网络分区或系统过载,导致Leader可能挂掉或是消息不同步引起的,主要表现在日志从冲突点到尾端不相同。假如从最后一个Follower的最后一个index往前重新尝试追加日志,指针探测会经历冲突部分日志长度×网络来回延迟的计算量,从而导致计算非常缓慢,可能需要好几个小时的探测。
例如下面的例子要从Index为6,Term为2的地方往前探测,而Follower的冲突Term均小于Leader在index为2的Term(3)。所以从Leader的Index为1的Term(1)开始探测的话,Term(1)肯定小于Follower最后的冲突Term(2),从而有效减少探测次数:

            //   idx             1 2 3 4 5 6 7 8 9
            //                   -----------------
            //   term (Leader)   1 3 3 3 5 5 5 5 5
            //   term (Follow)   1 1 1 1 2 2

Follower和Candidate需要处理自己的追加日志,如果追加成功就响应,否则处理日志冲突,并将Reject设为true:

func (r *raft) handleAppendEntries(m pb.Message) {
    // 当前committed已经超过了msg的日志,直接响应
    if m.Index < r.raftLog.committed {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
        return
    }

    // 假如成功Append后响应
    if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
    } else {
        r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
            r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
        // 查找可能发生冲突的索引
        // 也可以使用lastIndex-1,使用index和lastIndex的最小值更快
        hintIndex := min(m.Index, r.raftLog.lastIndex())
        hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
        hintTerm, err := r.raftLog.term(hintIndex)
        if err != nil {
            panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
        }
        r.send(pb.Message{
            To:         m.From,
            Type:       pb.MsgAppResp,
            Index:      m.Index,
            Reject:     true,
            RejectHint: hintIndex,
            LogTerm:    hintTerm,
        })
    }
}

Leader收到来自Follower和Candidate的请求后,假如此前有日志冲突(m.Reject为true),则通过消息中传来的RejectHint确定nextIndex重试;若没有冲突则更新marchIndex和nextIndex,进行后续的日志操作。

stepLeader:
switch m.Type {
    case pb.MsgAppResp:
        pr.RecentActive = true

        if m.Reject {  // 日志冲突
            nextProbeIdx := m.RejectHint
            if m.LogTerm > 0 {
                // 冲突点
                nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
            }
            // 改变进行重试的Next Index
            // 假如是StateReplicate,下一次用 last match index + 1 进行重试
            // 否则使用M.Index和nextProbeIdx的最小值进行重试
            if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
                r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
                // 变为探测状态
                if pr.State == tracker.StateReplicate {
                    pr.BecomeProbe()
                }
                r.sendAppend(m.From)
            }
        } else {  
            // 更新Match和Next Index;状态相关更新和后续的sendAppend
            ...
            }
        }

(5) MsgVote & MsgPreVote

MsgVote逻辑比较简单,在MsgHup开启选举后,在campaign中转变为Candidate发起投票。PreVote算法中,Candidate首先要确认自己能赢得集群中大多数节点的投票,这样才会把自己的term增加,然后发起真正的投票。这种算法解决了网络分区节点在重新加入时,会中断集群的问题。

func (r *raft) campaign(t CampaignType) {
    ...
    var term uint64
    var voteMsg pb.MessageType
    if t == campaignPreElection {
        r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        // PreVote RPCs are sent for the next term before we've incremented r.Term.
        term = r.Term + 1
    } else {
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
    ...
}

满足以下条件可以在Step中发送投票响应:

  1. 投的票之前已经投过了,重复投
  2. 当前没有Leader且没有投票过
  3. 该请求是PreVote
  4. Candidate是最新的
Step:
case pb.MsgVote, pb.MsgPreVote:
        // 1. 已经投过的票重新投
        canVote := r.Vote == m.From ||
            // 2. 还没投票,且当前Term没有Leader
            (r.Vote == None && r.lead == None) ||
            // 3. 这是一个给未来Term的preVote请求
            (m.Type == pb.MsgPreVote && m.Term > r.Term)
        // 确定当前Candidate是最新的
        // term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
        if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
            // 发送投票响应
            r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
            if m.Type == pb.MsgVote {
                // Only record real votes.
                r.electionElapsed = 0
                r.Vote = m.From
            }
        } ...

(6) MsgSnap & MsgSnapResp

MsgSnap处理逻辑和MsgHeartbeatMsgApp一致,除了在上文提到的maybeAppend中,假如不append,就install snapshot。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容