发送方开始Tick
每个Member的初始状态是 StateFollower
每个Member都会初始化一个发送心跳的随机频率,心跳每100ms产生一次,默认累积到至少10次后才会真的发出心跳请求给peer
至少10次是因为Member会设置一个[10, 20)之间的随机次数,以此来尽量避免每个Member在同一时间发出心跳请求,加快选举
func (r *raft) resetRandomizedElectionTimeout() {
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
每隔100ms的tick 会对 electionElapsed 进行累计,累计达到随机数之后才会发送真的心跳请求
func (r *raft) tickElection() {
r.electionElapsed++
if r.electionElapsed >= r.randomizedElectionTimeout {
r.electionElapsed = 0
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {
//
}
}
}
心跳消息,没有指定 term
pb.Message{From: r.id, Type: pb.MsgHup}
Member状态变更:
状态改为
StatePreCandidate
term 没有增加
发送到peer的消息:携带的是自己的日志的最后term和index,指定的term是自己term+1
pb.Message{To: id, Term: r.term+1, Type: MsgPreVote, Index: last.index, LogTerm: last.term, Context: nil}
这里的发送消息不是由当前协程负责了,这里只是将其追加到msgs []pb.Message
中;
实际的发送是由raft_node协程负责的,这个协程会循环监听这些信号,即是否有soft_state、hard_state、snapshot、msgs等
soft_state:包括 lead_id 和 状态 state
hard_state:包括 term 、vote_id 、 commit_index
func (rn *RawNode) HasReady() bool {
// TODO(nvanbenschoten): order these cases in terms of cost and frequency.
r := rn.raft
if softSt := r.softState(); !softSt.equal(rn.prevSoftSt) {
return true
}
if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
return true
}
if r.raftLog.hasNextUnstableSnapshot() {
return true
}
if len(r.msgs) > 0 || len(r.msgsAfterAppend) > 0 {
return true
}
if r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(rn.applyUnstableEntries()) {
return true
}
if len(r.readStates) != 0 {
return true
}
return false
}
只有要任何一个信号产生,如上述产生了 状态变化 和 msgs ,就会构建一个 Ready 数据发送到 ready_channel
for {
if advancec == nil && n.rn.HasReady() {
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
select {
case readyc <- rd:
n.rn.acceptReady(rd)
if !n.rn.asyncStorageWrites {
advancec = n.advancec
} else {
rd = Ready{}
}
readyc = nil
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
advancec = nil
}
}
而 server_node 也同时监听着 ready_channel,因此这个协程
for {
select {
case <-r.ticker.C:
r.tick()
case rd := <-r.Ready():
r.transport.Send(msgs)
对于 msgs 会写入到当前Member和Peer之间的 transport 中,这个 transport 是stream 类型的
- 连接不断开,采用推送的方式,短时间内的消息会进行累计后批量推送(无更多消息 或者 累计达到2k次)
Peer之间的Stream连接会维护两个协程,分别负责写入和读取,上述实际写入的是 msg_channel
Peer连接中的写协程会监听这个 msg_channel,对通道中的消息进行编码后缓存,达到一定数量后通过http 的 response writer 推送给Peer
case m := <-msgc:
err := enc.encode(&m)
if err == nil {
unflushed += m.Size()
if len(msgc) == 0 || batched > 4096 / 2 {
flusher.Flush()
sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
unflushed = 0
batched = 0
} else {
batched++
}
continue
}
Peer接受Tick
Peer之间的Stream连接中的读协程会一直尝试从 http 的 response 中读取数据
for {
rc, err := cr.dial(t)
err = cr.decodeLoop(rc, t)
读取到消息后再将消息发送到 recv_channel
for {
m, err := dec.decode()
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select {
case recvc <- m:
而 recv_channel 有另外一个协程监听着,收到消息后直接交给 raft 流程处理
go func() {
for {
select {
case mm := <-p.recvc:
if err := r.Process(ctx, mm); err != nil {
}
case <-p.stopc:
return
}
}
}()
Peer的Raft流程
对于非 MsgProp 类型的消息,只需要将这个消息发送到 raft_node 的 recv_channel
if m.Type != pb.MsgProp {
select {
case n.recvc <- m:
return nil
case <-ctx.Done():
return ctx.Err()
case <-n.done:
return ErrStopped
}
}
前面提到的 raft_node 不仅会监听 Ready 信号,还会监听在 recv_channel
for {
if advancec == nil && n.rn.HasReady() {
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
select {
case m := <-n.recvc:
r.Step(m)
case readyc <- rd:
n.rn.acceptReady(rd)
if !n.rn.asyncStorageWrites {
advancec = n.advancec
} else {
rd = Ready{}
}
readyc = nil
}
}
这里 recv_channel 上收到数据后,也是调用 step方法,这个 step 方法 和发送心跳 tick 的是同一个
对于 MsgPreVote 类型的消息
- 检查能否投票, 如果当前节点还没给任何Member投票 并且 当且没有 lead 就可以;或者是 MsgPreVote 类型的消息,并且 对方的term比自己的大也可以,之前在发送消息 MsgPreVote 的时候,指定的 term是 +1 了的,因此也比自己的 term 大
canVote := r.Vote == m.From ||
// ...we haven't voted and we don't think there's a leader yet in this term...
(r.Vote == None && r.lead == None) ||
// ...or this is a PreVote for a future term...
(m.Type == pb.MsgPreVote && m.Term > r.Term)
- 对比 term 和 index:对方的 日志term更大;或者日志 term相同,index不低于自己就认为是最新的了;日志term在发送消息的时候是没有 +1 的,因此是一样的
candLastID := entryID{term: m.LogTerm, index: m.Index}
func (l *raftLog) isUpToDate(their entryID) bool {
our := l.lastEntryID()
return their.term > our.term || their.term == our.term && their.index >= our.index
}
同时满足这两个条件,就正常响应给对方Peer,使用的是对方 +1 后的 term
pb.Message{To: m.From, Term: m.Term, Type: pb.MsgPreVoteResp}
否则响应的是拒绝给对方,使用的是自己的 term:
pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true}
和 tick 消息一样也是通过 send 方法发送
不同的是 MsgPreVoteResp 这个类型的消息会追加到 msgsAfterAppend []pb.Message
这个数组中
这个数组中的消息也会被 Ready 信号检测到,与 msgs 数组一样,构建一个 Ready 数据发送到 ready_channel
与 tick 消息一样,采用相同的方式将消息回复给发送方
发送方接受MsgPreVoteResp
发送方发出 tick 到收到 Peer的 MsgPreVoteResp 消息,然后通过相同的流程读取Peer Stream的消息
再次将这个消息发送到 raft_node 的 recv_channel,开始发送方的 raft 流程
对于 MsgPreVoteResp 类型的消息,会进入到 发送 tick 之前状态转换中的 step 方法中
func (r *raft) becomePreCandidate() {
r.step = stepCandidate
r.trk.ResetVotes()
r.tick = r.tickElection
r.lead = None
r.state = StatePreCandidate
r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
}
这个 step 方法会对 MsgPreVoteResp 类型的消息进行处理
func stepCandidate(r *raft, m pb.Message) error {
switch m.Type {
case pb.MsgPreVoteResp:
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
switch res {
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
// pb.MsgPreVoteResp contains future term of pre-candidate
// m.Term > r.Term; reuse r.Term
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}
- 首先记录结果,即当前节点收到来自 id 的结果 reject 还是 接收
func (p *ProgressTracker) RecordVote(id uint64, v bool) {
_, ok := p.Votes[id]
if !ok {
p.Votes[id] = v
}
}
- 对结果进行统计,一方面统计当前同意 和 拒绝的数量
for id, pr := range p.Progress {
if pr.IsLearner {
continue
}
v, voted := p.Votes[id]
if !voted {
continue
}
if v {
granted++
} else {
rejected++
}
}
- 对结果进行统计,另一方面统计新旧配置中的结果,初始时只有新配置c[0] ,没有旧配置 c[1];如果新旧配置的统计结果一致,则直接返回这个结果,可能是VoteWon、VoteLost 、VotePending之一;如果存在一个配置统计到VoteLost ,那就直接 VoteLost ;如果两份配置票不一致,那就返回 VotePending
func (c JointConfig) VoteResult(votes map[uint64]bool) VoteResult {
r1 := c[0].VoteResult(votes)
r2 := c[1].VoteResult(votes)
if r1 == r2 {
// If they agree, return the agreed state.
return r1
}
if r1 == VoteLost || r2 == VoteLost {
// If either config has lost, loss is the only possible outcome.
return VoteLost
}
// One side won, the other one is pending, so the whole outcome is.
return VotePending
}
- 每份配置的统计都按照超过半数原则,赞成票超过半数返回 VoteWon;赞成票 + 没投的超过半数返回 VotePending,说明算上没投的仍有可能超过半数从而 VoteWon;赞成票 +没投的不足半数了,直接返回VoteLost,因为即使没投的都投给你,也不可能超过半数了。
func (c MajorityConfig) VoteResult(votes map[uint64]bool) VoteResult {
if len(c) == 0 {
// By convention, the elections on an empty config win. This comes in
// handy with joint quorums because it'll make a half-populated joint
// quorum behave like a majority quorum.
return VoteWon
}
var votedCnt int //vote counts for yes.
var missing int
for id := range c {
v, ok := votes[id]
if !ok {
missing++
continue
}
if v {
votedCnt++
}
}
q := len(c)/2 + 1
if votedCnt >= q {
return VoteWon
}
if votedCnt+missing >= q {
return VotePending
}
return VoteLost
}
- 对投票结果进行处理,如果处于 VotePending状态,什么都不处理,等待更多Member 通过 tick 参与到投票中来;如果是 VoteLost状态,那么直接转换为 Follower;如果是 VoteWon状态,对于 StatePreCandidate 这个状态,还需要额外的处理;其它状态则直接成为 Leader
switch res {
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
r.becomeFollower(r.Term, None)
}
VoteLost状态
如果部分Member的初始 term 或 index 比较落后,那么这部分Member在 tick 给其它Peer时会得到拒绝票,进而处于 VoteLost 状态
如果当前Member没有赢得投票,重新回到 Follower 状态
修改状态是 StateFollower
保持 term 不变
然后重置 tick 的随机值,重置投票结果,重新开始 累计 tick 次数,等待达到随机值后,重新发出 tick 请求到Peer
重复上述相同的状态变化和流程
VoteWon状态
由于Member当前处于 StatePreCandidate 这个状态,因此会先进入 Candidate 状态
修改状态是 StateCandidate
term自增
vote_id 设置为自己,即投自己一票
重置 tick 的随机值,重置投票结果,重新开始 累计 tick 次数,等待达到随机值后,重新发出 tick 请求到Peer
重新构建消息发送给Peer,开启正式的投票,term已经自增了
pb.Message{To: id, Term: term, Type: MsgVote, Index: last.index, LogTerm: last.term, Context: nil}
由于 term、状态等发生了改变,soft_state 和 hard_state 都变化了,也存在新的消息,raft_node 再次检测到 Ready 信号
然后再通过相同的Stream连接发送到Peer
考虑几种情况:
假设所有Member的初始 term 和 index都是一样的
所有Member的 tick 时间近乎一致的情况下,因为 PreVote 消息不会自增 term,但是发出去的消息的 term 会 +1;因此所有Member都会 VoteWon;然后所有Member都进入 Candidate 状态;这时候所有Member的 term 都会自增,再次相同;
部分Member的 状态转换更快,那它的 term 会高于那些较慢的Member;因为新的 MsgVote 消息到来时,比较慢的Peer可能还没更新自己的状态,仍然还处于 StatePreCandidate 这个状态;对于这部分比较慢的Member,在收到新的 MsgVote 消息后,会重新更新自己的状态为 Folloer,并将自身的 term 更新为 MsgVote 消息里的 term,也就是自增后的 term;
如果是case2,即有部分节点比较慢,他在处理 MsgVote 时,由于自己没有给任何人投票,因此会投赞成票给那些比较快的Peer;
但是如果是case1,即所有节点的进度近乎一致,所有节点都是 StateCandidate,并且都已经投了一票给自己,这时候就无法再给其它Peer投赞成票,都投出了拒绝票;所有成员都会收到半数以上的拒绝票,因此就会重新都将自己回退到 StateFollower 状态;表示这一轮选举失败了,重置随机值后,开始下一轮选举。
PreVote的作用
PreVote即预投票功能,是个可选功能,但是默认是启用的
他和Vote消息最大的区别是term不会自增,即LogTerm 是不变的,但是发出去的 term也是 +1的
意味为下一轮选举探路,即向Peer探测下一轮选举是否会投票给我?
case m.Term > r.Term:
switch {
case m.Type == pb.MsgPreVote:
case m.Type == pb.MsgPreVoteResp && !m.Reject:
default:
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
他带来的最大好处是,不会引起现有集群的稳定性,可以看到对于收到 term 更大的消息,对于MsgPreVote 类型的消息什么操作都没有;而对于 MsgVote 类型的消息,则会直接让整个集群的成员都变成 Folloer ,采用这个更大的 term ,开始重新选主,对集群的稳定性有一定影响。
因为etcd集群中的term是只增不减的,一旦出现更大的 term ,那么其它Member都必须向这个 term 靠齐,不管这个 term 是否带有更新的日志;而MsgPreVote 只会使用更大的 term进行试探,没有实际性的增加 term。