Lab2 Raft 学习笔记

1. Raft算法

image.png

Raft保证了在任何时候以下属性都成立:

  • Election Safety:在一个给定的term,至多只有一个leader被选出
  • Leader Append-Only:一个leader从不覆盖或删除log中的entry,它只会append新的entry
  • Log Matching:如果两个log包含了一个具有相同index和term的entry,那么这两个log在这个index之前的所有entry都相同
  • Leader Completeness:如果一个log entry在某个term被commit,那么这个entry将会出现在所有数字更大的term的leader的log中
  • State Machine Safety:如果一个server已经把某个给定index上的log entry应用到了它的状态机上,那么其他任何server都不会对同一个index应用一条不同的log entry

1.1 Raft基础

在任何时候,一个server都处在如下三种状态的其中一种:leader,follower或candidate。正常情况下,只有一个server是leader,其他所有server都是follower。follower是被动的,不主动发起任何请求,并且只是简单的响应leader和candidate发来的请求。leader处理所有的client请求。如果一个client联系到了一个follower,follower会将其redirect到leader。candidate状态用来选出一个新的leader。三种状态的转换关系如下图:


状态转换关系图

Raft把时间划分为任意长度的term,如下图所示。term用连续的整数编号,每个term从一个election开始,在election过程中,一个或多个candidate尝试成为leader。如果一个candidate赢了election,它就成为了这个term的leader。某些情况下election会产生split vote,这个term没有leader产生,那么很快就会进入一个新的term并开始新一轮的election。


时间被划分为多个term

Term是Raft中的逻辑时钟,帮助server发现过时的信息,例如过时的leader。每个server都保存一个current term数字,随时间单调递增。如果一个server的current term小于其他server,它就把自己的current term值更新为更大的值。如果一个candidate或leader发现自己的term已经过时,它会立即恢复到follower状态。如果一个server接收到了带有过时的term的请求,就直接抛弃这个请求。

Raft server通过RPC来通信,基本的RPC只有两类:RequestVote和AppendEntries。RequestVote在election过程中由candidate初始化并发起;AppendEntries由leader发起,用于复制log entries并提供heartbeat。

1.2 Leader Election

Raft使用heartbeat机制来触发leader election。server刚启动时处于follower状态,只要它能接收到有效的来自leader或candidate的RPC请求,就一直维持在这个状态。Leader周期性地发送heartbeat(不包含任何log entry的AppendEntries RPC请求)到所有的follower。如果一个follower在一段时间(election timeout)内都没有接收到heartbeat,那么它就会认为此时不具有一个可用的leader,并尝试开始一次election。

为了发起一次election,这个follower首先将自己的term++,并转换为candidate状态,给自己投一票,并并行地发起RequestVote RPC到所有其他server。一个candidate保持在当前状态,直到以下三件事中的其中一件发生:

  • 该candidate赢得了election
  • 另一个server将自己确立为leader
  • 一段时间过去,但仍然没有选出leader
  1. candidate如果获得了整个cluster中相同term内的超过半数的server的投票,则赢得election。每个server采用FCFS策略,在一个term内最多投票给一个candidate,投完票后,server会重置自己的election timeout。一旦一个candidate赢得了election,它就会成为leader,并发送heartbeat信息给所有其他server来确立自己的authority,并防止新的election发生。
  2. 当candidate在等待投票时,可能会接收到来自于其他server的AppendEntries RPC请求,表明该server已经成为了leader。如果这个leader的term(包含在RPC中)至少和当前candidate的一样大,那么candidate就认为这个leader是合法的,并且恢复到follower状态。如果这个leader的term小于candidate当前的term,那么candidate就会抛弃这个RPC,并继续保持在candidate状态。
  3. 如果许多follower都同时成为candidate,投票就有可能过于分散,导致没有candidate获得大多数投票,这一轮election没有server获胜。此时,每个candidate都会设置一个timeout,如果超时,则term++,并发起一轮新的election。Raft使用将timeout随机化(150ms - 300ms)的方法来保证split vote的情况很少发生。这种机制也用于处理split vote。每个candidate在election开始时启动随机化的election timeout。所以election timeout在两个地方被使用到,一是follower经过election timeout变为candidate,二是candidate在发起election后经过election timeout进入下一个term重新发起election。

在leader election阶段,一共有两个timeout:

  • election timeout:follower等待变为candidate的timeout,在150ms - 300ms之间均匀随机分布
  • heartbeat timeout:两条heartbeat AppendEntries RPC之间的interval

1.3 Log复制

一旦一个leader被选出,它就开始接收client的请求。每个client请求包含一条要由复制状态机执行的命令。leader首先把这条命令作为一条新的entry添加到自己的log中,然后并行发起AppendEntries RPC请求到所有其他server,来复制这条entry。当entry被安全地复制完成后,leader把这条entry应用到自己的state machine,并返回执行的结果给client。如果RPC出现错误,则leader无穷地循环重新发送这条AppendEntry RPC,直到所有follower最终都保存了所有的log entry。


log的组织方式

log的组织方式如上图所示。每条log entry保存一个状态机的命令,以及leader接收到这条命令时的term编号。term号用于检测log之间的inconsistency。每个log entry也有一个整数index来表明它在log中的位置。

leader决定何时应用log entry到状态机是安全的,这样的entry被称为是committed。Raft保证committed entries是持久的,并且最终会被所有可用的状态机所执行。一旦leader创建的entry被成功复制到了大多数server上,这个entry就会被commit。这也commit了leader的log中所有在此之前的entry,包括以前的leader创建的entry。leader持续追踪它所知道的已被commit的最大的index,并把这个index包含在未来所有AppendEntries RPC(包括heartbeat)中,来让其他server知晓。一旦一个follower发现一个log entry已经commit,它就把这条entry应用到自己本地的状态机上(以log的顺序)。

Raft保持以下两条属性的正确性,它们共同构成了Log Matching属性:

  • 如果不同log中的两条entry具有相同的index和term,那么它们保存同一条command
  • 如果不同log中的两条entry具有相同的index和term,那么先前的所有log entry都相同

第一条属性由这个事实证明:给定term和index,一个leader最多创建一条entry。第二条属性通过AppendEntries RPC所执行的consistency check保证。leader在发送AppendEntries RPC时,它会把log中新的entries之前的那个entry的index和term包含在内。如果follower没有在自己的log中找到具有相同index和term的entry,它就会拒绝接受新的entry。新的entries指的是AppendEntries RPC中带有的要复制到所有follower上的新的entries。

正常情况下,leader和follower的log会始终保持一致,所以AppendEntries RPC的consistency check永远不会失败。但如果leader或follower崩溃,log就会变得不一致,如下图所示。


log不一致的情况

在Raft中,leader通过强制follower复制leader的log来处理inconsistency,这表明follower的log中的冲突entry会被leader的log中的entry所覆盖。当leader和follower的log不一致时,leader首先会找出两者的log相一致的最后一个log entry,删除follower中在此之后的所有entry,然后把leader log中在此之后的所有entry发送给follower。所有这些操作都在对于AppendEntries RPC的consistency check的回应中发生。leader为每一个follower维护一个nextIndex变量,它记录了leader将会发送给这个follower的下一个log entry的index。当一个leader刚被选举出来时,它会把所有的nextIndex值都初始化为自己的log中最新的entry的index + 1。如果一个follower的log与leader的不一致,下一条AppendEntries RPC的consistency check就会失败,follower拒绝这条RPC请求。此时leader会把对应的nextIndex减1,并重试这条RPC请求,直到最终nextIndex到达leader和follower的log相一致的那个值,此时AppendEntries RPC成功,便消除了follower和leader的log中的所有不一致情况,并把leader发来的log entry复制到follower的log里。一旦AppendEntries成功,leader和follower的log就保持一致了。

1.4 安全

上述机制还不足以确保每个状态机都用同样的顺序执行同样的command。例如,一个follower在leader commit一些log entry的时候是不可用的,后来它又被选为leader,并用心的log entry覆盖了那些已经被commit的entry,从而导致不同的状态机执行了不同的command序列。

这一章在哪个server能够被选为leader这件事上添加了一些额外的限制,这些限制确保了某个给定term的leader包含了先前的term里所有已经commit的entry。

1.4.1 Election限制

Raft保证选举时,新的leader的log包含了所有之前的term里的所有已经commit的entry,因此log entry的传输是单向的从leader传输到follower,leader永远不会覆盖自己的log中已经存在的entry。

如果一个candidate想要被成功选出,它就必须与cluster中的大多数server通信,这就表明每个已经commit的entry必须至少存在于其中一个server。如果candidate的log至少和大多数server中的任意一台一样up-to-date,那么它就包含了所有committed entries。RequestVote RPC实现了这一限制:RPC包含了candidate的log信息,如果voter发现自己的log比candidate的log更加up-to-date,就会否决这次投票。

判断up-to-date的方式:比较log中的最后一个entry的term和index。如果两个entry的term不同,term号更大的更加up-to-date;如果两个entry的term相同,log更长的更加up-to-date。

1.4.2 提交先前的term的entry

leader无法立即得出结论:一旦一个之前的term的entry已经被存储在大多数server上,则这个entry是committed。下图就展示了这样一种情况,一个旧的log entry已经存储在了大多数server上,但是仍然有可能被将来的一个leader覆盖。


已保存在大多数server上的log entry被覆盖

为了消除上图所示的这种情况,Raft从不通过计算replica数量的方法来commit之前的term的log entry。之后leader当前term的log entry才通过计算replica数量的方法来commit。一个某个当前term的entry以这种方式被commit,所有之前的entry都被间接地commit了,这是由Log Matching属性保证的。

1.5 Follower和candidate故障

如果一个follower和candidate故障了,未来发送给它的RequestVote和AppendEntries RPC都会fail。Raft通过无限次重传来解决这一问题。如果故障的server重启,则RPC会成功完成。如果一个server在完成了RPC但还没有回应之前发生故障,它重启之后就会再次受到相同的RPC请求。由于Raft RPC是幂等的,收到相同的请求不会造成任何错误。例如,一个follower接收到了一个AppendEntries请求,请求中包含了已经保存在它本地的log entry,follower就会直接忽略这些entry。

1.6 时序和可用性

Raft保证安全性不依赖于时序,系统不会因为某些事件发生得早于或晚于预期而产生错误的结果。然而,可用性(系统及时回应client的能力)是依赖于时序的。

Leader选举是Raft中时序非常重要的一部分。Raft能够选出并维持一个稳定的leader,只要系统满足以下时序条件:

broadcastTime << electionTimeout << MTBF

broadcastTime是一个server并行发送RPC到cluster中的所有其他server并接收到响应的平均时间;electionTimeout是1.2节中介绍的election timeout;MTBF是单个server两次故障之间的平均时间。broadcastTime应比electionTime小一个数量级,从而保证leader可以可靠地发送heartbeat给follower并防止它们发起election。由于electionTimeout的选取是随机的,split vote的现象发生的可能性很低。electionTimeout应该比MTBF小几个数量级,从而保证系统能够稳定运行。当leader故障时,系统大约在一个electionTimeout时间内是不可用的,这个时间应在整个时间段内只占很小一部分。

broadcastTime的范围一般为0.5ms到20ms,因此electionTimeout的范围一般是10ms到500ms。典型的MTBF时间是几个月或更长,因此很容易满足要求。

2. 其他学习资料

3. 踩坑记录

  • Raft的一个大前提是如果接收到一个RPC中的term高于server自己当前的term,则更新自己的term,并且切换到follower状态,这一条规则对于任何情况下都适用,包括AppendEntries的发送者、AppendEntries的接受者、RequestVote的发送者以及RequestVote的接受者。即使是处在candidate状态的server,接收到这样的RPC也要立即恢复到follower状态,这就是两个RPC请求的reply中都有一个term域的原因。
  • 对于多个goroutine并发读写的所有变量,在读和写时都要加锁,不然会出现各种奇怪的data race问题。可以添加一些atomic方法来实现原子的读写,从而避免代码中出现大量的Lock(), Unlock()
  • 运行2A的test时,有时会出现如下图所示的FAIL。从log的时间可以看出,server 0在开始term 4的election之后一直阻塞在发送RequestVote的步骤上,直到6秒后才发现发送到server 2的RPC fail了,而这段时间本已经足够server 0进入下一个term,开始新一轮election。这一错误复现概率不高,一般10次中出现1次。经过检查代码发现原因在于没有另起一个单独的goroutine来执行切换到candidate的操作,而是直接在electionTimeoutMonitor中执行switchToCandidate,如下:


    发送RequestVote阻塞
func (rf *Raft) electionTimeoutMonitor() {
    m := rf.timeoutManager
    for !rf.killed() {
        // DPrintf("Checking electionTimeout on server %d...\\n", rf.me)
        // electionTimeout is valid both in follower and in candidate status
        if m.CheckElectionTimeout() && !rf.isLeader() {
            DPrintf("[%d] ElectionTimeout fired\\n", rf.me)
            // 这样写是错误的
            // rf.switchToCandidate()
            // 正确的写法
                go rf.switchToCandidate()
        } else {
            time.Sleep(ElectionTimeoutCheckInterval)
        }
    }
}

  • 运行Test 2A时,有时会出现一个term内有多个leader的情况,查看log得知,server 0在一个term内vote给了两个不同的server,如下图所示。这一问题出现的原因在于RequestVote RPC handler中的锁的粒度不正确。判断是否已经vote和授予vote这两个操作应看做一个atomic的整体。


    一个term内有多个leader
  • 运行2B的test时,concurrentStart()总是fail,从log上可以看出,三个server都成功apply了所有command,但是test结果显示有command重复,而有其他command丢失。这是由于Start()中的锁粒度不正确,导致两个command不同的并发的Start()返回了同一个index,必须保证获取index的操作和append log的操作整体是原子的。


    concurrentStart fail
  • RPC handler有时会收到args == nil的请求,如果不加任何判断,这种请求会导致整个test fail。因此在执行handler之前,要先对args做一下是否为nil的判断。如果是nil,直接返回,不进行任何操作

  • 在2B的基础上,实现state的持久化,并运行2C的test时会出现多个FAIL,尤其是TestFigure8Unreliable2C。通过查看log可以发现,FAIL的根本原因在于2C中的几个test模拟了非常恶劣的网络情况,例如RPC会严重超时、网络被partition导致server之间互相无法通信等等,这就使得很难选出leader。此外,客户端还同时向leader发送了大量command,导致被选出的leader的log非常长。等到网络状况恢复后,leader要向follower发送要append的log,但是follower可能只有几条log,但是leader有几百条log,因此follower在接收到leader发来的AppendEntries请求后会返回false,而leader判断出这个false是由于log inconsistency产生的,就把对应的nextIndex - 1。这个方法原理上是正确的,但是问题在于leader和follower的log相差了几百条,因此需要几百次AppendEntries RPC才能最终达到一致,太费时间,所以test fail了。这个问题的解决方案是在AppendEntries RPC的reply中添加两个字段conflictIndexconflictTerm,分别表示follower与leader产生分歧的entry index和term。具体的计算方法见学习资料第二条中的MIT助教的总结。leader在收到一个由于log consistency而失败的AppendEntries reply后,就根据reply中的这两个字段来调整对应的NextIndex值,从而使得leader和follower快速地在log上达成一致。

  • 在检查是否已经有committed entry可以apply的方法中,偶尔会出现rf.lastApplied >= len(rf.log)的情况,导致rf.log[rf.lastApplied]出现index out of range的问题。这个问题的根本原因在于同一时间可能有多个这个方法在并行地运行,而方法中的加锁粒度不正确。如下面代码段所示,假设一个AppendEntries RPC调用了checkCommittedEntriesToApply()方法,运行在goroutine A中。在循环条件中,使用了原子的加载方法来获取rf.commitIndexrf.lastApplied,假设rf.commitIndex = 100, rf.lastApplied = 99,循环条件判断成立,准备开始进入循环体。此时,程序切换到了由另一个AppendEntries RPC触发的,执行在goroutine B中的checkCommittedEntriesToApply()方法。这个方法也得到了这两个值100和99,并执行了一轮循环体。循环体中把rf.lastApplied++,因此循环结束后rf.lastApplied == 100,此时再切换回到A的goroutine。A把rf.lastApplied++,因此rf.lastApplied == 101,这就导致rf.log[rf.lastApplied].Command出现index out of range的error了。

// wrong code
// function to check that whether there is committed entries to apply
func (rf *Raft) checkCommittedEntriesToApply() {
  // use atomic load methods to get rf.commitIndex and rf.lastApplied
    for !rf.killed() && rf.loadCommitIndex() > rf.loadLastApplied() {
    // goroutine switching happens here
        rf.mu.Lock()
        rf.lastApplied++
        msg := ApplyMsg{
            CommandValid: true,
            Command:      rf.log[rf.lastApplied].Command,
            CommandIndex: rf.lastApplied,
        }
        DPrintf("[%d] Committed entry %d is applied, command is %v. Current commitIndex is %d\\n",
            rf.me, rf.lastApplied, msg.Command, rf.commitIndex)
        rf.mu.Unlock()
        rf.applyCh <- msg
    }
}

// correct code
// function to check that whether there is committed entries to apply
func (rf *Raft) checkCommittedEntriesToApply() {
  rf.mu.Lock()
  defer rf.mu.Unlock()
    for !rf.killed() && rf.commitIndex > rf.lastApplied {
        rf.lastApplied++
        msg := ApplyMsg{
            CommandValid: true,
            Command:      rf.log[rf.lastApplied].Command,
            CommandIndex: rf.lastApplied,
        }
        DPrintf("[%d] Committed entry %d is applied, command is %v. Current commitIndex is %d\\n",
            rf.me, rf.lastApplied, msg.Command, rf.commitIndex)
        rf.applyCh <- msg
    }
}

  • 实际上,在每个AppendEntries RPC结束后调用checkCommittedEntriesToApply()并不是个好方法。这不仅造成了上面描述的多个goroutine的竞争问题,而且其实全局只需要一个这样的后台方法在运行就可以了。因此把这个方法改写成常驻后台的一个单独的goroutine更加简单。
  • 2C中的TestFigure8Unreliable2C和TestUnreliableChurn2C大约有10%的概率会fail,原因是failed to reach on agreement。查看test_test.go和config.go的源码可以看到,测试中要求最后的一条command应该能够在2秒内达成在所有server上都被commit。由于retry被设置为true,因此可以进行重试,若反复重试10秒后仍未能达成一致,则test失败。
  • 运行2C的test的过程中,有很低的概率(大约20次中出现一次)出现这样的的bug:在广播AppendEntries时,leader检测到有log entry要发送给某个follower,执行代码copy(newEntries, rf.log[nextIndex:logLength])时出现nextIndex == -1,导致程序崩溃。具体log如下:
2020/12/03 23:43:49 [4 - Term 20 - follower] Sending AppendEntries RPC to server 0...
2020/12/03 23:43:49 [4 - Term 20 - follower] Sending AppendEntries RPC to server 1...
2020/12/03 23:43:49 [4 - Term 20 - follower] New entries detected for server 2, nextIndex is -1, logLength is 15
panic: runtime error: slice bounds out of range [-1:]

根据log中的内容可以发现,server 4已经是follower状态了,但依然在执行leader的工作:广播AppendEntries给其他server。出现这个问题的原因在于广播AppendEntries是在一个循环中完成的,并且循环加锁的粒度太小,导致server 4的身份在两段加锁的代码之间发生了转换,但是循环本身并没有对server的身份做判断。而nextIndex这个变量是通过rf.loadNextIndex()这个原子方法获得的,这个方法会首先判断server是否为leader,如果不是leader则返回-1。为了解决这个问题在循环体中,要加上一条判断rf.identity == IndentityLeader是否成立。只有在server是leader时才允许发送AppendEntries RPC。

4. 可能的优化

  • 对于leader特有的数据结构nextIndex和matchIndex,单独设置一把锁,而不要使用rf.mu的server全局锁,提高并发粒度(对于test来说效果不明显,没有采用)
  • 对nextIndex和matchIndex分段加锁
  • 当leader和follower的entry不匹配时,快速回溯nextIndex,具体方法上面已经讨论过

5. 感想

  • 锁的粒度非常重要,很多bug的根本原因都是没有把若干个操作看做一个原子的整体,每个操作单独加锁,从而导致程序崩溃。在不考虑性能的情况下,保险的方法是使用粒度比较大的锁,例如一个RPC handler整体加锁
  • 两段加锁的代码之间有可能发生任何事情,例如server身份的切换等等,因此在进行循环等比较耗时的操作时,要在循环条件内加上对于rf.identity的判断,避免当server的身份转换后,仍然在执行之前遗留下的其他身份的工作,从而产生bug
  • 在每个lab初期实现主要功能时,最好用go test -race -run命令在测试过程中检测是否存在data race
  • 多用DPrintf()打log,所有log都有统一的前缀,表明是哪一个server、目前处在哪个term、当前server的身份,这样能够很大程度上帮助debug
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。