raft 系列解读(2) 之 测试用例
基于mit的6.824课程,github代码地址:https://github.com/zhuanxuhit/distributed-system
case1:TestInitialElection
测试中3个server,然后启动,验证在同一个任期(term)内是否只有一个leader,并且在2 * RaftElectionTimeout
后,由于心跳的存在,不会发生重选。
在代码实现中,主要有以下几点:
- 实现
AppendEnties
和RequestVote
两个rpc部分功能 - 实现
Make
新建Raft
我们来看下其中主要的关键点:
程序整体组织上是在Make
中启动了一个goroutine,是一个无限循环,根据不同的状态进行不同的处理,结构如下:
follower
先讲第一个状态follower
的处理
所有的server重启后第一个状态都是follower
,如果在election timeout
时间内,既没有收到leader的heartbeat
,也没有收到RequestVote
请求,那么开启选举过程,此时状态将转换为candidate
,代码如下:
rf.resetElectionTimeout()
// 等待心跳,如果心跳未到,但是选举超时了,则开始新一轮选举
select {
case <-rf.heartbeatChan:
case <-time.After(rf.randomizedElectionTimeout):
// 开始重新选举
log.Println("election timeout:", rf.randomizedElectionTimeout)
if rf.status != STATUS_FOLLOWER {
// panic
log.Fatal("status not right when in follower and after randomizedElectionTimeout:", rf.randomizedElectionTimeout)
}
rf.convertToCandidate()
}
candidate
接着开始第二个状态candidate
的处理:
- 第一步,新增本地任期和投票
- 第二步,重置 election timer 并开始广播
- 第三步等待结果
- 1)他自己赢得了选举;
- 2)收到AppendEntries得知另外一个服务器确立他为Leader,转变为follower
- 一个周期时间过去但是没有任何人赢得选举,开始新的选举
结构大致如下:
leader
如果此时赢得了选举,则进入第3个状态leader
的处理:目前leader只实现了一个功能,周期性的发送心跳,功能非常简单,此处不再贴代码了。
rpc
剩下就是两个rpc的发送和接收处理了,其中需要特别注意的点如下:
- 所有rpc处理中:如果收到的请求或者响应中,包含的term大于当前的currentTerm,设置currentTerm=term,然后变为follower
- 所有rpc处理中:判断任期是否小于currentTerm,小于的都丢弃
在完成第一个测试的过程中:AppendEnties只需要处理心跳请求即可。
最后给出代码的地址:https://github.com/zhuanxuhit/distributed-system,tag是:lab3-raft-case1
case2:TestReElection
有3个server,选举出来一个leader后,模拟leader故障,重新选举出一个leader,然后再模拟older leader故障恢复重新加入,此时也只会有一个leader,再模拟3个2个都故障了,那理论上就不会有leader出现了,此时再逐个加入故障的server,都只会有一个leader
直接运行测试
go test -v -run ReElection
- leader故障,新的leader选出来
- 老的leader加入,不影响只有一个leader
- 两个server故障,不会有新的leader
- 恢复一个server,出现leader
- 再次恢复一个server,出现leader
先看第1个,出现的调试信息:
2016/10/10 18:44:46 follower: 0 election timeout: 1.287113937s
2016/10/10 18:44:46 now I begin to candidate,index: 0
2016/10/10 18:44:47 follower: 2 election timeout: 1.54916732s
2016/10/10 18:44:47 now I begin to candidate,index: 2
可以看到0开始选举后,不知道为什么2没有投票,去看代码,发现问题是:
- 当发现远端term大于本地term后,直接转换为follower,并更新当前的currentTerm和voteFor
修改后即可通过测试,接着马上又出现另一个问题:
2016/10/10 18:54:50 candidate: 0 'slog is not at least as up-to-date as receiver’s log
但是我们现在做的是没有日志的,查看代码发现问题是:
- (args.LastLogIndex < rf.commitIndex || args.LastLogTerm < currentTerm),因为currentTerm增加了,但是LastLogTerm是0,所以要考虑
rf.commitIndex == 0
表示还没有日志,则没必要检查
修改完后,再次运行case,这次是两个server故障,不会有新的leader出问题了,选举不出来,接着查原因:
在处理投票的时候,往heartbeatChan
写的时候阻塞了,rf.heartbeatChan = make(chan bool, 1)
是有一个缓冲的channel,那为什么会阻塞呢,我们看下有几个地方会写,几个地方会去读
有两个地方会去写:
- AppendEnties中收到心跳会去写,当去写的时候,说明是已经有leader了,自己会转变为follower
- RequestVote中收到投票也会去写
读的地方也有两个
- 在状态follower中,去读
heartbeatChan
,如果选举超时内没收到心跳,则开始candidate - 在candidate状态,去读去读
heartbeatChan
,表示已经有新的leader产生了
于是就发现了问题:
- 在实现leader任务的时候,没有一个点去触发退出心跳
- 选举失败,应该等待超时,然后重新开始新一轮选举,而不是马上开始新一轮选举,这样子造成彼此都不成功
修改代码后,通过case2
case3:TestBasicAgree
这个case开始要做提交了,实现Start()
函数了,这个case主要测试是:有5个server,没提交前检查没有提交的log,然后提交后,测试该log是否已经被每个server都存储了。
在实现start中,其做的步骤是:
// 客户端的一次日志请求操作触发
// 1)Leader将该请求记录到自己的日志之中;
// 2)Leader将请求的日志以并发的形式,发送AppendEntries RCPs给所有的服务器;
// 3)Leader等待获取多数服务器的成功回应之后(如果总共5台,那么只要收到另外两台回应),
// 将该请求的命令应用到状态机(也就是提交),更新自己的commitIndex 和 lastApplied值;
// 4)Leader在与Follower的下一个AppendEntries RPCs通讯中,
// 就会使用更新后的commitIndex,Follower使用该值更新自己的commitIndex;
// 5)Follower发现自己的 commitIndex > lastApplied
// 则将日志commitIndex的条目应用到自己的状态机(这里就是Follower提交条目的时机)
实现的关键点:在Start函数中,一旦判断出当前server是leader,马上开启一个goroutine,开始异步进行agree工作,然后立即返回,代码如下:
此处第4步和第5步需要在另外的地方完成,一个是heartbeat中,另一个是follower在处理AppendEntries过程中
还有就是在成为leader的时候,需要初始化nextIndex,matchIndex
而在发送heartbeat中,判断log的最大index ≥ nextIndex,如果大于,需要发送从nextIndex开始的log,在发送完后需要判断成功与否,成功则更新
nextIndex,matchIndex
,失败则减少nextIndex
,并重试还有最重要的一点:为了通过测试,记住要在日志提交后,发送消息ApplyMsg
给applymsg
,这样才能通过测试
好了到此为止,写的代码刚好通过第三个测试,继续下一关的!
case4:TestFailAgree
测试的内容是:有3个server,其中一个follower故障,发的命令只有2个能收到,当恢复故障后,发的命令都能收到
出现的问题:由于每个command真正提交都是通过goroutine来执行的,因此每个goroutine之间并发执行,怎么保证前一个agree了,下一个才能agree成功呢?
现在出现的问题是:
map[3:103 5:104 1:101 2:102],乱序,即4还没有提交了,5就提交成功了
现在的问题是:谁也不服谁,当follower恢复后,大家都竞选,但是没有一个成功,查明原因后发现是因为没有处理一个概念:
>如果候选人的日志至少和大多数的服务器节点一样新
这个一样新通过:比较两份日志中最后一条日志条目的索引值和任期号定义谁的日志比较新。如果两份日志最后的条目的任期号不同,那么任期号大的日志更加新。如果两份日志最后的条目任期号相同,那么日志比较长的那个就更加新。
进行到这,发现已经很难调试了,代码太乱,逻辑混乱,于是准备开始重构
现有代码的问题:
- 临界区的混乱,到底哪里加锁,哪里不加
- 各个goroutine之间交互的混乱
- 代码功能组织的问题
重构的代码最重要的一点是:抽象出了状态机,在里面去更新
case5:FailNoAgree
测试内容是:5个server,3个follow故障,此时提交的命令将不会Committed,然后恢复3个follower,此时发送第3个命令,会忘记第2个没有确认的命令,此时第3个命令的index应该还是2
现在出现的问题是:
follow的日志没更新,但是leader的nextIndex确更新了!
2016/10/13 10:44:20 leader is 4
2016/10/13 10:44:22 server:0,currentTerm:3,role:candidate
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:22 server:1,currentTerm:3,role:candidate
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:22 server:2,currentTerm:3,role:candidate
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:22 server:3,currentTerm:2,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1} {2 20 2}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:22 server:4,currentTerm:2,role:leader
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1} {2 20 2}]
nextIndex is:[2 2 2 3 3]
matchIndex is:[1 1 1 2 0]
2016/10/13 10:44:22 恢复3个server
2016/10/13 10:44:25 LeaderId: 4 has big term: 5 than follower: 3 currentTerm: 4
2016/10/13 10:44:25 server 3 len(rf.log) 3 args.PrevLogIndex 1
2016/10/13 10:44:26 重新选举后leader is 4
2016/10/13 10:44:26 server:0,currentTerm:5,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:26 server:1,currentTerm:5,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:26 server:2,currentTerm:5,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:26 server:3,currentTerm:5,role:follower
commitIndex:2,lastApplied:2
log is:[{0 <nil> 0} {2 10 1} {2 20 2}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:26 server:4,currentTerm:5,role:leader
commitIndex:2,lastApplied:2
log is:[{0 <nil> 0} {2 10 1} {2 20 2}]
nextIndex is:[3 3 3 3 3]
matchIndex is:[2 2 2 2 0]
看重新选举后,leader4:matchIndex is:[2 2 2 2 0],但是其他的follower确没有收到新的日志,怎么回事呢?看代码什么情况下回去更新matchIndex呢?
问题在于发送心跳的时候返回了reply=true了,确没有去检查日志是否是最新的
此处记住appendEntries如果返回true,则一定表示是日志一样新了!
true if follower contained entry matching prevLogIndex and prevLogTerm
case6:ConcurrentStarts
这个case测试的是:
同时发送5个命令,然后测试5个命令能够被顺序的提交
测试中的修改是:
将红色框中的内容移动到了锁里面,为了防止并发访问的时候,index得到相同。
case7:Rejoin
测试重新加入直接通过了,之前的代码就能实现
测试内容是:3个server,leader故障,然后向故障的leader发送命令,同时向新选举出来的leader发送命令,大致如下图,最后能统一
case8:Backup
类似case7:不同在于此处有5个server,然后命令更多,测试也是网络分区后出现多leader,然后恢复网络后,再重新同步数据
不用修改,直接通过
case9:Count
case9主要是性能测试,测试rpc的次数不能太多
case10-12:Persist1-3
持久化的逻辑一直没有加上,此处加上的
先看需要持久化哪些数据,然后持久化的时机是什么时候?
需要持久化哪些日志?
e.Encode(rf.currentTerm) // 当前任期
e.Encode(rf.log) // 收到的日志
e.Encode(rf.votedFor) // 投票的
e.Encode(rf.commitIndex) // 已经确认的一致性日志,之后的日志表示还没有确认是否可以同步,一旦确认的日志都不会改变了
既然这几个需要同步,那就是发生改变的时候把数据持久化下来就可以了
需要调用persist()
函数的地方有:
- leader向各个follower发送完日志,确认提交的时候
- follower处理AppendEnties有新日志或者commiIndex更新的时候
case13:Figure8
测试主要测试的是下面的这张图:
描述的问题是:为什么领导人无法通过老的日志的任期号来判断其提交状态。
- (a) S1 是领导者,部分的复制了索引位置 2 的日志条目
- (b) S1 崩溃了,然后 S5 在任期 3 里通过 S3、S4 和自己的选票赢得选举,然后从客户端接收了一条不一样的日志条目放在了索引2 处
- (c) S5 又崩溃了;S1 重新启动,选举成功,开始复制日志。在这时,来自任期 2 的那条日志已经被复制到了集群中的大多数机器上,但是还没有被提交
- (d) S1 又崩溃了,S5 可以重新被选举成功(通过来自 S2,S3 和 S4 的选票),然后覆盖了他们在索引 2 处的日志。但是,在崩溃之前,如果 S1 在自己的任期里复制了日志条目到大多数机器上
- (e) 然后这个条目就会被提交(S5 就不可能选举成功)。 在这个时候,之前的所有日志就会被正常提交处理
Raft采用计算副本数的方式,使得永远不会提交前前 面纪元的日志条目,
现在出现的问题是commit了不同的值?
即在没有达成一致的情况下就就行了提交!
Test: Figure 8 ...
2016/10/13 20:38:35 server:0,currentTerm:2,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {1 1752890841475247006 1}]
nextIndex is:[1 1 1 1 1]
matchIndex is:[0 0 0 0 0]
2016/10/13 20:38:35 server:2,currentTerm:2,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {1 1752890841475247006 1}]
nextIndex is:[1 1 1 1 1]
matchIndex is:[0 0 0 0 0]
2016/10/13 20:38:35 server:4,currentTerm:2,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {1 1752890841475247006 1}]
nextIndex is:[1 1 1 1 1]
matchIndex is:[0 0 0 0 0]
2016/10/13 20:38:35 apply error: commit index=2 server=1 4541014630978635374 != server=3 8558661384468427932
到这就得加上之前忘记的一个策略
如果存在以个N满足 N>commitIndex,多数的matchIndex[i] >= N,并且 log[N].term == currentTerm:设置commitIndex = N
主要是指:leader只会提交本纪元的日志
case14:UnreliableAgree
模拟网络不可靠,在不可靠的情况下cfg.setunreliable(false)
,则有概率还是丢弃请求,在这种情况下测试协议最后还能达成一致
case15:Figure8Unreliable
通过设置cfg.setlongreordering(true)
,在labrpc中会直接睡眠一段时间,模拟这次情况下协议还是达成一致
ms := 200 + rand.Intn(1 + rand.Intn(2000))
time.Sleep(time.Duration(ms) * time.Millisecond)
2016/10/14 14:51:11 server:4,currentTerm:31,role:follower
commitIndex:3,lastApplied:3
2016/10/14 14:51:11 server:3,currentTerm:31,role:follower
commitIndex:3,lastApplied:3
2016/10/14 14:51:11 server:2,currentTerm:31,role:follower
commitIndex:3,lastApplied:3
2016/10/14 14:51:11 server:1,currentTerm:31,role:follower
commitIndex:3,lastApplied:3
2016/10/14 14:51:11 server:0,currentTerm:31,role:leader
commitIndex:3,lastApplied:3
nextIndex is:[186 53 58 51 62]
matchIndex is:[185 0 0 0 0]
2016/10/14 16:09:45 check log type: raft.AppendEntiesArgs value: {6 1 1 1 1 [{1 4411 2} {2 9540 3} {4 3863 4} {6 2769 5}]}
2016/10/14 16:09:45 error log indexserver:0,currentTerm:6,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {1 606 1} {1 4411 2} {4 3863 4} {6 2769 5}]
nextIndex is:[84 0 0 3 2]
matchIndex is:[83 1 1 2 1]
错误日志,由于没有很好的传递日志,代码bug
case16-17:TestReliableChurn,UnreliableChurn
测试通过
下一篇的计划是结合代码再次看下关键实现