Overview
Etcd中的Raft使用了protobuf进行选举轮次间的消息通讯,其中包含的消息类型定义于raft.proto
中:
// For description of different message types, see:
// https://pkg.go.dev/go.etcd.io/etcd/raft/v3#hdr-MessageType
enum MessageType {
MsgHup = 0;
MsgBeat = 1;
MsgProp = 2;
MsgApp = 3;
MsgAppResp = 4;
MsgVote = 5;
MsgVoteResp = 6;
MsgSnap = 7;
MsgHeartbeat = 8;
MsgHeartbeatResp = 9;
MsgUnreachable = 10;
MsgSnapStatus = 11;
MsgCheckQuorum = 12;
MsgTransferLeader = 13;
MsgTimeoutNow = 14;
MsgReadIndex = 15;
MsgReadIndexResp = 16;
MsgPreVote = 17;
MsgPreVoteResp = 18;
}
其核心为Message
类,记录当前的任期(Term),信息的发送和接收方(From->To),以及需要传递的Context、Commit、Entry等字段。
type Message struct {
Type MessageType
To uint64
From uint64
Term uint64
// logTerm is generally used for appending Raft logs to followers. For example,
// (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at
// index=101, and the term of entry at index 100 is 5.
// (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some
// entries from its leader as it already has an entry with term 5 at index 100.
LogTerm uint64
Index uint64
Entries []Entry
Commit uint64
Snapshot Snapshot
Reject bool
RejectHint uint64
Context []byte
}
(1) MsgHup
MsgHup
用于开启选举。假如Follower和Candidate没有收到心跳,则开启一轮新的选举。实现于raft.go
的tickElection
中:
// tickElection is run by followers and candidates after r.electionTimeout.
func (r *raft) tickElection() {
r.electionElapsed++
// 自己可以被promote & election timeout 超时了,规定时间没有听到心跳发起选举
// 发送MsgHup
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgHup}); err != nil {
r.logger.Debugf("error occurred during election: %v", err)
}
}
}
在核心方法Step
中,接收到MsgHup
则开启选举。在这里如果preVote
为true,则需要进行第一次选举,否则执行常规选举:
- campaignPreElection :第一次选举
- campaignElection : 常规的基于时间的选举
- campaginTransfer : 表示转移Leader
switch m.Type {
case pb.MsgHup:
if r.preVote {
r.hup(campaignPreElection)
} else {
r.hup(campaignElection)
}
...
}
其中hup
方法本质就是调用campagin
,将raft实例转化为Candidate状态,并发送voteMsg给其他node:
func (r *raft) hup(t CampaignType) {
// 主要核对是否是合法的Follower
if r.state == StateLeader || !r.promotable(){
return
}
...
}
// 核对完成,开始选举
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}
func (r *raft) campaign(t CampaignType) {
...
var term uint64
var voteMsg pb.MessageType
// 更改状态
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
// 投票,超过半数返回quorum的VoteWon,产生Leader
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
// 假如只有一个node还赢得了选举,改变到正常选举状态
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
// 获取所有id并排序
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
for _, id := range ids {
// 跳过自己
if id == r.id {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
// 假如要转让Leader,记录上下文
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
// 以voteMsg的形式发送给其他follower当前的信息
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
(2) MsgBeat & MsgHeartbeat & MsgHeartbeatResp
-
MsgBeat
是一个内部类型,在tickHeartbeat
(tickElection
外的另一个时钟方法)中触发Leader向Follower发送心跳信号MsgHeartbeat
func (r *raft) tickHeartbeat() {
r.heartbeatElapsed++
r.electionElapsed++
// electionTimeout : default 10次
// 超过则需要重新计时
if r.electionElapsed >= r.electionTimeout {
r.electionElapsed = 0
if r.checkQuorum {
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}); err != nil {
r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
}
}
// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
if r.state == StateLeader && r.leadTransferee != None {
r.abortLeaderTransfer()
}
}
// Only leader sends heartbeat
if r.state != StateLeader {
return
}
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
if err := r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}); err != nil {
r.logger.Debugf("error occurred during checking sending heartbeat: %v", err)
}
}
}
Leader接收到MsgBeat
后开始发送心跳:
switch m.Type {
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
...
}
// bcastHeartbeat 发送RPC,包含之前的上下文
func (r *raft) bcastHeartbeat() {
lastCtx := r.readOnly.lastPendingRequestCtx()
if len(lastCtx) == 0 {
r.bcastHeartbeatWithCtx(nil)
} else {
r.bcastHeartbeatWithCtx([]byte(lastCtx))
}
}
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
if id == r.id {
return
}
r.sendHeartbeat(id, ctx)
})
}
-
MsgHeartbeat
在sendHeartBeat中
发送来自Leader的心跳。
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// 发送当前最小匹配的commit : min(to.matched, r.committed)
commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}
r.send(m)
}
在核心函数Step
中,当收到的msg任期大于当前任期时,在以下情况中需要变为Follower:
- msgApp : 日志信息
- msgHeartbeart :心跳信息
- msgSnap :快照信息
case m.Term > r.Term:
...
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
r.becomeFollower(m.Term, m.From)
} else {
r.becomeFollower(m.Term, None)
}
}
当 MsgHeartbeat
的任期高于Candidate的任期,Candidate变回Follower;若是Follower则重新计时。
stepCandidate:
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
stepFollower:
case pb.MsgHeartbeat:
r.electionElapsed = 0
r.lead = m.From
r.handleHeartbeat(m)
-
MsgHeartbeatResp
是MsgHeartbeat
的响应,在handleHeartbeat
中由Follower(Candidate已经变成了Follower)记录当前commit,随后发送响应。
func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
Leader得知是哪个Follower回应了自己的心跳,假如Leader的commitId大于Follower的match index,则发送sendAppend
补充日志。
StepLeader:
case pb.MsgHeartbeatResp:
pr.RecentActive = true
pr.ProbeSent = false
// 假如Follower尚未接受的MsgApp已经满了,空出一个slot
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
pr.Inflights.FreeFirstOne()
}
// 只有当 leader last committed index 大于 follower Match index, 让Follower追加日志
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}
// 处理只读相关
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
rss := r.readOnly.advance(m)
for _, rs := range rss {
if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
r.send(resp)
}
}
(3) MsgProp
MsgProp
提出向log entry追加日志的提议,这些提议将被重定向给Leader,视作Leader的本地消息,因此在send中我们不能将发送者的Term追加到待发送的提议中。MsgReadIndex
也是同理。
send:
...
if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
m.Term = r.Term
}
Follower接收到MsgProp
后,发送提议给Leader。假如Candidate收到MsgProp
,说明Follower发给Leader的信息被发给了自己,当前没有Leader,则放弃该提议。
stepFollower:
case pb.MsgProp:
...
m.To = r.lead
r.send(m)
stepCandidate:
case pb.MsgProp:
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
Leader处理MsgProp
时,首先调用appendEntry
将条目追加到自己的Log中,随后调用bcastAppend
将这些条目发送给其他节点。
stepLeader:
case pb.MsgProp:
...
if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
(4) MsgApp & MsgAppResp
MsgApp
包含要复制的日志entry和当前的Term,在maybeSendAppend
中发送该消息。假如无法正确获取Term和Entry,可能当前日志条目已经被snapshot了,因此发送snapshot。否则就发送追加这段entry的消息。
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
...
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if len(ents) == 0 && !sendIfEmpty {
return false
}
if errt != nil || erre != nil {
// send snapshot if we failed to get term or entries
...
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in StateReplicate
case tracker.StateReplicate:
last := m.Entries[n-1].Index
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
// last index 未知,因此无法更新
case tracker.StateProbe:
pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m)
return true
}
Follower和Candidate收到MsgApp
的处理后,需要追加日志。在此有可能发生日志冲突(Follower和Leader日志不一致),需要根据hintIndex查找当前冲突的index。
在常规情况下,Leader的日志长度要长于Follower的长度,因此Follower的日志可以看作是Leader的前缀。此处的日志冲突主要是由于网络分区或系统过载,导致Leader可能挂掉或是消息不同步引起的,主要表现在日志从冲突点到尾端不相同。假如从最后一个Follower的最后一个index往前重新尝试追加日志,指针探测会经历冲突部分日志长度×网络来回延迟的计算量,从而导致计算非常缓慢,可能需要好几个小时的探测。
例如下面的例子要从Index为6,Term为2的地方往前探测,而Follower的冲突Term均小于Leader在index为2的Term(3)。所以从Leader的Index为1的Term(1)开始探测的话,Term(1)肯定小于Follower最后的冲突Term(2),从而有效减少探测次数:
// idx 1 2 3 4 5 6 7 8 9
// -----------------
// term (Leader) 1 3 3 3 5 5 5 5 5
// term (Follow) 1 1 1 1 2 2
Follower和Candidate需要处理自己的追加日志,如果追加成功就响应,否则处理日志冲突,并将Reject设为true:
func (r *raft) handleAppendEntries(m pb.Message) {
// 当前committed已经超过了msg的日志,直接响应
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}
// 假如成功Append后响应
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
// 查找可能发生冲突的索引
// 也可以使用lastIndex-1,使用index和lastIndex的最小值更快
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
hintTerm, err := r.raftLog.term(hintIndex)
if err != nil {
panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
}
r.send(pb.Message{
To: m.From,
Type: pb.MsgAppResp,
Index: m.Index,
Reject: true,
RejectHint: hintIndex,
LogTerm: hintTerm,
})
}
}
Leader收到来自Follower和Candidate的请求后,假如此前有日志冲突(m.Reject为true),则通过消息中传来的RejectHint确定nextIndex重试;若没有冲突则更新marchIndex和nextIndex,进行后续的日志操作。
stepLeader:
switch m.Type {
case pb.MsgAppResp:
pr.RecentActive = true
if m.Reject { // 日志冲突
nextProbeIdx := m.RejectHint
if m.LogTerm > 0 {
// 冲突点
nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
}
// 改变进行重试的Next Index
// 假如是StateReplicate,下一次用 last match index + 1 进行重试
// 否则使用M.Index和nextProbeIdx的最小值进行重试
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
// 变为探测状态
if pr.State == tracker.StateReplicate {
pr.BecomeProbe()
}
r.sendAppend(m.From)
}
} else {
// 更新Match和Next Index;状态相关更新和后续的sendAppend
...
}
}
(5) MsgVote & MsgPreVote
MsgVote
逻辑比较简单,在MsgHup
开启选举后,在campaign
中转变为Candidate发起投票。PreVote算法中,Candidate首先要确认自己能赢得集群中大多数节点的投票,这样才会把自己的term增加,然后发起真正的投票。这种算法解决了网络分区节点在重新加入时,会中断集群的问题。
func (r *raft) campaign(t CampaignType) {
...
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term = r.Term + 1
} else {
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
...
}
满足以下条件可以在Step
中发送投票响应:
- 投的票之前已经投过了,重复投
- 当前没有Leader且没有投票过
- 该请求是PreVote
- Candidate是最新的
Step:
case pb.MsgVote, pb.MsgPreVote:
// 1. 已经投过的票重新投
canVote := r.Vote == m.From ||
// 2. 还没投票,且当前Term没有Leader
(r.Vote == None && r.lead == None) ||
// 3. 这是一个给未来Term的preVote请求
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// 确定当前Candidate是最新的
// term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
// 发送投票响应
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
r.Vote = m.From
}
} ...
(6) MsgSnap & MsgSnapResp
MsgSnap
处理逻辑和MsgHeartbeat
、MsgApp
一致,除了在上文提到的maybeAppend
中,假如不append,就install snapshot。