ETCD《三》--TICK发起选主

发送方开始Tick

每个Member的初始状态是 StateFollower

每个Member都会初始化一个发送心跳的随机频率,心跳每100ms产生一次,默认累积到至少10次后才会真的发出心跳请求给peer

至少10次是因为Member会设置一个[10, 20)之间的随机次数,以此来尽量避免每个Member在同一时间发出心跳请求,加快选举

func (r *raft) resetRandomizedElectionTimeout() {
    r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}

每隔100ms的tick 会对 electionElapsed 进行累计,累计达到随机数之后才会发送真的心跳请求

func (r *raft) tickElection() {
    r.electionElapsed++

    if r.electionElapsed >= r.randomizedElectionTimeout {
        r.electionElapsed = 0
        if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {
            // 
        }
    }
}

心跳消息,没有指定 term

pb.Message{From: r.id, Type: pb.MsgHup}

Member状态变更:

  • 状态改为StatePreCandidate

  • term 没有增加

  • 发送到peer的消息:携带的是自己的日志的最后term和index,指定的term是自己term+1

pb.Message{To: id, Term: r.term+1, Type: MsgPreVote, Index: last.index, LogTerm: last.term, Context: nil}

这里的发送消息不是由当前协程负责了,这里只是将其追加到msgs []pb.Message中;

实际的发送是由raft_node协程负责的,这个协程会循环监听这些信号,即是否有soft_state、hard_state、snapshot、msgs等

  • soft_state:包括 lead_id 和 状态 state

  • hard_state:包括 term 、vote_id 、 commit_index

func (rn *RawNode) HasReady() bool {
    // TODO(nvanbenschoten): order these cases in terms of cost and frequency.
    r := rn.raft
    if softSt := r.softState(); !softSt.equal(rn.prevSoftSt) {
        return true
    }
    if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
        return true
    }
    if r.raftLog.hasNextUnstableSnapshot() {
        return true
    }
    if len(r.msgs) > 0 || len(r.msgsAfterAppend) > 0 {
        return true
    }
    if r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(rn.applyUnstableEntries()) {
        return true
    }
    if len(r.readStates) != 0 {
        return true
    }
    return false
}

只有要任何一个信号产生,如上述产生了 状态变化 和 msgs ,就会构建一个 Ready 数据发送到 ready_channel

for {
        if advancec == nil && n.rn.HasReady() {
            rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }

        select {
        
        case readyc <- rd:
            n.rn.acceptReady(rd)
            if !n.rn.asyncStorageWrites {
                advancec = n.advancec
            } else {
                rd = Ready{}
            }
            readyc = nil
        case <-advancec:
            n.rn.Advance(rd)
            rd = Ready{}
            advancec = nil
        
        }
    }

而 server_node 也同时监听着 ready_channel,因此这个协程

for {
            select {
            case <-r.ticker.C:
                r.tick()
            case rd := <-r.Ready():
                    r.transport.Send(msgs)

对于 msgs 会写入到当前Member和Peer之间的 transport 中,这个 transport 是stream 类型的

  • 连接不断开,采用推送的方式,短时间内的消息会进行累计后批量推送(无更多消息 或者 累计达到2k次)

Peer之间的Stream连接会维护两个协程,分别负责写入和读取,上述实际写入的是 msg_channel

Peer连接中的写协程会监听这个 msg_channel,对通道中的消息进行编码后缓存,达到一定数量后通过http 的 response writer 推送给Peer

case m := <-msgc:
            err := enc.encode(&m)
            if err == nil {
                unflushed += m.Size()

                if len(msgc) == 0 || batched > 4096 / 2 {
                    flusher.Flush()
                    sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
                    unflushed = 0
                    batched = 0
                } else {
                    batched++
                }

                continue
            }

Peer接受Tick

Peer之间的Stream连接中的读协程会一直尝试从 http 的 response 中读取数据

for {
        rc, err := cr.dial(t)
        
        err = cr.decodeLoop(rc, t)

读取到消息后再将消息发送到 recv_channel

for {
        m, err := dec.decode()
        
        recvc := cr.recvc
        if m.Type == raftpb.MsgProp {
            recvc = cr.propc
        }

        select {
        case recvc <- m:

而 recv_channel 有另外一个协程监听着,收到消息后直接交给 raft 流程处理

go func() {
        for {
            select {
            case mm := <-p.recvc:
                if err := r.Process(ctx, mm); err != nil {
                    
                }
            case <-p.stopc:
                return
            }
        }
    }()

Peer的Raft流程

对于非 MsgProp 类型的消息,只需要将这个消息发送到 raft_node 的 recv_channel

if m.Type != pb.MsgProp {
        select {
        case n.recvc <- m:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        case <-n.done:
            return ErrStopped
        }
    }

前面提到的 raft_node 不仅会监听 Ready 信号,还会监听在 recv_channel

for {
        if advancec == nil && n.rn.HasReady() {
            rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }

        select {
    case m := <-n.recvc:
        r.Step(m)
        
        case readyc <- rd:
            n.rn.acceptReady(rd)
            if !n.rn.asyncStorageWrites {
                advancec = n.advancec
            } else {
                rd = Ready{}
            }
            readyc = nil

        }
    }

这里 recv_channel 上收到数据后,也是调用 step方法,这个 step 方法 和发送心跳 tick 的是同一个

对于 MsgPreVote 类型的消息

  • 检查能否投票, 如果当前节点还没给任何Member投票 并且 当且没有 lead 就可以;或者是 MsgPreVote 类型的消息,并且 对方的term比自己的大也可以,之前在发送消息 MsgPreVote 的时候,指定的 term是 +1 了的,因此也比自己的 term 大

canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)

  • 对比 term 和 index:对方的 日志term更大;或者日志 term相同,index不低于自己就认为是最新的了;日志term在发送消息的时候是没有 +1 的,因此是一样的

candLastID := entryID{term: m.LogTerm, index: m.Index}
func (l *raftLog) isUpToDate(their entryID) bool {
our := l.lastEntryID()
return their.term > our.term || their.term == our.term && their.index >= our.index
}

同时满足这两个条件,就正常响应给对方Peer,使用的是对方 +1 后的 term

pb.Message{To: m.From, Term: m.Term, Type: pb.MsgPreVoteResp}

否则响应的是拒绝给对方,使用的是自己的 term:

pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true}

和 tick 消息一样也是通过 send 方法发送

不同的是 MsgPreVoteResp 这个类型的消息会追加到 msgsAfterAppend []pb.Message 这个数组中

这个数组中的消息也会被 Ready 信号检测到,与 msgs 数组一样,构建一个 Ready 数据发送到 ready_channel

与 tick 消息一样,采用相同的方式将消息回复给发送方

发送方接受MsgPreVoteResp

发送方发出 tick 到收到 Peer的 MsgPreVoteResp 消息,然后通过相同的流程读取Peer Stream的消息

再次将这个消息发送到 raft_node 的 recv_channel,开始发送方的 raft 流程

对于 MsgPreVoteResp 类型的消息,会进入到 发送 tick 之前状态转换中的 step 方法中

func (r *raft) becomePreCandidate() {
    
    r.step = stepCandidate
    r.trk.ResetVotes()
    r.tick = r.tickElection
    r.lead = None
    r.state = StatePreCandidate
    r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
}

这个 step 方法会对 MsgPreVoteResp 类型的消息进行处理

func stepCandidate(r *raft, m pb.Message) error {

    switch m.Type {
    
    case pb.MsgPreVoteResp:
        gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
        r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
        switch res {
        case quorum.VoteWon:
            if r.state == StatePreCandidate {
                r.campaign(campaignElection)
            } else {
                r.becomeLeader()
                r.bcastAppend()
            }
        case quorum.VoteLost:
            // pb.MsgPreVoteResp contains future term of pre-candidate
            // m.Term > r.Term; reuse r.Term
            r.becomeFollower(r.Term, None)
        }
    case pb.MsgTimeoutNow:
        r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
    }
    return nil
}
  • 首先记录结果,即当前节点收到来自 id 的结果 reject 还是 接收

func (p *ProgressTracker) RecordVote(id uint64, v bool) {
_, ok := p.Votes[id]
if !ok {
p.Votes[id] = v
}
}

  • 对结果进行统计,一方面统计当前同意 和 拒绝的数量

for id, pr := range p.Progress {
if pr.IsLearner {
continue
}
v, voted := p.Votes[id]
if !voted {
continue
}
if v {
granted++
} else {
rejected++
}
}

  • 对结果进行统计,另一方面统计新旧配置中的结果,初始时只有新配置c[0] ,没有旧配置 c[1];如果新旧配置的统计结果一致,则直接返回这个结果,可能是VoteWon、VoteLost 、VotePending之一;如果存在一个配置统计到VoteLost ,那就直接 VoteLost ;如果两份配置票不一致,那就返回 VotePending

func (c JointConfig) VoteResult(votes map[uint64]bool) VoteResult {
r1 := c[0].VoteResult(votes)
r2 := c[1].VoteResult(votes)

if r1 == r2 {
    // If they agree, return the agreed state.
    return r1
}
if r1 == VoteLost || r2 == VoteLost {
    // If either config has lost, loss is the only possible outcome.
    return VoteLost
}
// One side won, the other one is pending, so the whole outcome is.
return VotePending

}

  • 每份配置的统计都按照超过半数原则,赞成票超过半数返回 VoteWon;赞成票 + 没投的超过半数返回 VotePending,说明算上没投的仍有可能超过半数从而 VoteWon;赞成票 +没投的不足半数了,直接返回VoteLost,因为即使没投的都投给你,也不可能超过半数了。

func (c MajorityConfig) VoteResult(votes map[uint64]bool) VoteResult {
if len(c) == 0 {
// By convention, the elections on an empty config win. This comes in
// handy with joint quorums because it'll make a half-populated joint
// quorum behave like a majority quorum.
return VoteWon
}

var votedCnt int //vote counts for yes.
var missing int
for id := range c {
    v, ok := votes[id]
    if !ok {
        missing++
        continue
    }
    if v {
        votedCnt++
    }
}

q := len(c)/2 + 1
if votedCnt >= q {
    return VoteWon
}
if votedCnt+missing >= q {
    return VotePending
}
return VoteLost

}

  • 对投票结果进行处理,如果处于 VotePending状态,什么都不处理,等待更多Member 通过 tick 参与到投票中来;如果是 VoteLost状态,那么直接转换为 Follower;如果是 VoteWon状态,对于 StatePreCandidate 这个状态,还需要额外的处理;其它状态则直接成为 Leader
switch res {
        case quorum.VoteWon:
            if r.state == StatePreCandidate {
                r.campaign(campaignElection)
            } else {
                r.becomeLeader()
                r.bcastAppend()
            }
        case quorum.VoteLost:
            r.becomeFollower(r.Term, None)
        }

VoteLost状态

如果部分Member的初始 term 或 index 比较落后,那么这部分Member在 tick 给其它Peer时会得到拒绝票,进而处于 VoteLost 状态

如果当前Member没有赢得投票,重新回到 Follower 状态

  • 修改状态是 StateFollower

  • 保持 term 不变

然后重置 tick 的随机值,重置投票结果,重新开始 累计 tick 次数,等待达到随机值后,重新发出 tick 请求到Peer

重复上述相同的状态变化和流程

VoteWon状态

由于Member当前处于 StatePreCandidate 这个状态,因此会先进入 Candidate 状态

  • 修改状态是 StateCandidate

  • term自增

  • vote_id 设置为自己,即投自己一票

  • 重置 tick 的随机值,重置投票结果,重新开始 累计 tick 次数,等待达到随机值后,重新发出 tick 请求到Peer

重新构建消息发送给Peer,开启正式的投票,term已经自增了

pb.Message{To: id, Term: term, Type: MsgVote, Index: last.index, LogTerm: last.term, Context: nil}

由于 term、状态等发生了改变,soft_state 和 hard_state 都变化了,也存在新的消息,raft_node 再次检测到 Ready 信号

然后再通过相同的Stream连接发送到Peer

考虑几种情况:

假设所有Member的初始 term 和 index都是一样的

  • 所有Member的 tick 时间近乎一致的情况下,因为 PreVote 消息不会自增 term,但是发出去的消息的 term 会 +1;因此所有Member都会 VoteWon;然后所有Member都进入 Candidate 状态;这时候所有Member的 term 都会自增,再次相同;

  • 部分Member的 状态转换更快,那它的 term 会高于那些较慢的Member;因为新的 MsgVote 消息到来时,比较慢的Peer可能还没更新自己的状态,仍然还处于 StatePreCandidate 这个状态;对于这部分比较慢的Member,在收到新的 MsgVote 消息后,会重新更新自己的状态为 Folloer,并将自身的 term 更新为 MsgVote 消息里的 term,也就是自增后的 term;

如果是case2,即有部分节点比较慢,他在处理 MsgVote 时,由于自己没有给任何人投票,因此会投赞成票给那些比较快的Peer;

但是如果是case1,即所有节点的进度近乎一致,所有节点都是 StateCandidate,并且都已经投了一票给自己,这时候就无法再给其它Peer投赞成票,都投出了拒绝票;所有成员都会收到半数以上的拒绝票,因此就会重新都将自己回退到 StateFollower 状态;表示这一轮选举失败了,重置随机值后,开始下一轮选举。

PreVote的作用

PreVote即预投票功能,是个可选功能,但是默认是启用的

他和Vote消息最大的区别是term不会自增,即LogTerm 是不变的,但是发出去的 term也是 +1的

意味为下一轮选举探路,即向Peer探测下一轮选举是否会投票给我?

case m.Term > r.Term:
        
        switch {
        case m.Type == pb.MsgPreVote:
            
        case m.Type == pb.MsgPreVoteResp && !m.Reject:
            
        default:
            r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
                r.id, r.Term, m.Type, m.From, m.Term)
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                r.becomeFollower(m.Term, m.From)
            } else {
                r.becomeFollower(m.Term, None)
            }
        }

他带来的最大好处是,不会引起现有集群的稳定性,可以看到对于收到 term 更大的消息,对于MsgPreVote 类型的消息什么操作都没有;而对于 MsgVote 类型的消息,则会直接让整个集群的成员都变成 Folloer ,采用这个更大的 term ,开始重新选主,对集群的稳定性有一定影响。

因为etcd集群中的term是只增不减的,一旦出现更大的 term ,那么其它Member都必须向这个 term 靠齐,不管这个 term 是否带有更新的日志;而MsgPreVote 只会使用更大的 term进行试探,没有实际性的增加 term。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容