最近在看RocketMQ 的raft实现,名字叫Dledger。找了一篇源码分析的博客发现其中很多细节都解释的不是很清晰。Dledger 选主过程
首先我们要知道RocketMQ Dledger有哪些特性,在github官网上我们可以看到Dledger实现了很多原论文In search of an understandable consensus algorithm中没有的的特性。因为我暂时没有找到官方的设计文档,代码中的注释也暂时不是很完整,因此理解这些特性对于理解源代码有很重要的意义。其中最重要的是Pre-vote 机制。
本文需要读者对Raft算法的原论文有一定的熟悉程度。也可以看一下这个博客和源码。Dledger 选主过程
Pre-vote机制是在原作者的258页博士论文Ongaro Phd
第九章中提过的,其目标是解决在网络分区发生时,处于少数分区的节点不会一直增加Term, 更具体的应用可以在原论文中搜索得到。
Pre-vote的思想简单来说就是,在Candidate increase term之前,要先在不增加term的情况下看自己是否满足 比majority数量的node要 more up-to-date。如果一个Candidate知道自己一定不可能被选为Leader,那么就会自觉的increase term的值。这是Pre-vote。
而DLedger的实现里还有一个很大和原论文不同的地方。Dledger 会产生更多的Candidate节点。举例来说,当Leader得知有更大term的voting process正在进行,或者具有更大term的leader已经被选举出来之后,Leader并不会退化为follower,而是退化为 "尚未进行pre-vote的Candidate“。 这个体现了Dledger 不同的node有不同的leader preference(使用者可能希望leader 大多数情况都在某一个node上)。而这样的设计可以使以前的Leader能够在因为某些暂时意外原因stepped down之后有机会尽快恢复继续当leader。
流程概述
每个node都会启动一个StateMaintainer() 的线程,这个线程就每隔10ms去检查一下当前node的角色,然后看要不要做什么动作。其中最核心的办法就是maintainAsCandidate() 方法。
maintainAsCandidate() 方法可以分为三个部分,首先第一个是判断要不要进行投票,以及如果要投票的话要不要增加term的值。我稍微改写了一部分代码,让逻辑看起来更清楚, 看源码的话和下面的代码会有点不同(不过功能是一样的)
private void maintainAsCandidate() throws Exception {
// check if timer has expired or need to act immedately; if not, just exit
synchronized (memberState) {
if (needIncreaseTermImmediately) {
memberState.nextTerm();
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
needIncreaseTermImmediately = false;
return;
}
}
if (System.currentTimeMillis() < nextTimeToRequestVote) {
return;
}
synchronized (memberState) {
...
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT ) {
term = memberState.nextTerm();
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
} else {
term = memberState.currTerm();
}
ledgerEndIndex = memberState.getLedgerEndIndex();
ledgerEndTerm = memberState.getLedgerEndTerm();
...
}
接下来是voter的反应,最主要看handleVote, 这里我们把发起投票的称为voting candidate,接受投票的voter, 虽然实际上一个voter 身份也可能是一个Candidate。
public CompletableFuture<VoteResponse> handleVote(VoteRequest request, boolean self) {
//hold the lock to get the latest term, leaderId, ledgerEndIndex
synchronized (memberState) {
// Pre condition check
if (request.getTerm() < memberState.currTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_VOTE_TERM));
} else if (request.getTerm() == memberState.currTerm()) {
if (memberState.currVoteFor() == null) {
//let it go
} else if (memberState.currVoteFor().equals(request.getLeaderId())) {
//repeat just let it go
} else {
if (memberState.getLeaderId() != null) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_HAS_LEADER));
} else {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_ALREADY_VOTED));
}
}
} else {
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_TERM_NOT_READY));
}
// More up-to-date check
if (request.getLedgerEndTerm() < memberState.getLedgerEndTerm()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_EXPIRED_LEDGER_TERM));
} else if (request.getLedgerEndTerm() == memberState.getLedgerEndTerm() && request.getLedgerEndIndex() < memberState.getLedgerEndIndex()) {
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.REJECT_SMALL_LEDGER_END_INDEX));
}
// TODO this should be candidateId, the only place to set CurrVoteFor
memberState.setCurrVoteFor(request.getLeaderId());
return CompletableFuture.completedFuture(new VoteResponse(request).term(memberState.currTerm()).voteResult(VoteResponse.RESULT.ACCEPT));
}
}
- 如果voting candidate term 比 voter 的还小,那么直接拒绝
- 如果voting candidate term 比 voter 的大,按照DLedger的实现,voter 会立刻更新自己的term,把自己变成candidate,并且等待一个计时器的时间,并把自己变成candidate。为什么要这么实现,一开始可能比较难理解,现在先记住就好了:DLedger要求只有voter 和 voting candidate 的term相同时,才可以投赞成票
- 如果voting candidate term 和 voter 的相等,voter 就会开始判断了,
- 如果voter 还有leader,则告诉voting candidate
我拒绝你并且我现在还有leader。
- 如果voter 已经投过别人了,则告诉voting candidate我拒绝你并且我投过别人了。
- 其他情况下,就进入到more up-to-date 的 check (这个就不细讲了,raft论文里有为什么还要check more up-to-date)
如果最后所有check都通过,就投出宝贵的赞成票啦。
在我第一遍看的时候,最难理解的就是,为什么当voting candidate 的 term 比voter的大的时候,voter 的行为是:1. 变成candidate(或者保持candidate) 2. 把needIncreaseImmediately设置为true。
因为raft原论文说的是,voter此时应该 :1. 退化到follower 状态 2. 更新term到和 voting candidate 一样的值。
脑海里走了一遍流程后,我发现这两个其实本质上流程是类似的,只有细微的差别
原raft流程
- voter 收到 voting candidate 的比自己大的term时,把自己的term 更新,然后退化为follower,开始一个计时器。如果计时器时间内voter 没有收到 heartbeat,则变为candidate,立马发出投票请求
Dledger 流程
voter 收到 voting candidate 比自己大的term时,没有在handleVote里更新term的值,而是设置needIncreaseImmediately为true,并且转换为candidate 身份。注意这里voter 返回给voting candidate 是拒绝,但是是REJECT_TERM_NOT_READY。这个返回很有用,马上会提到
voter 的下一次状态循环会进入到maintainAsCandidate()函数,然后因为needIncreaseImmediately为true,所以把term更新,同时重置计时器,假设计时器是 1000ms,然后立马退出maintainAsCandidate()。这里也就是说虽然voter 变成了candidate, 但是并没有立刻发出投票(这一点很重要,因为此时voter 的currVoted 还是null,使得接下来给之前的voting candidate 投赞成票可能)
而voting candidate 收到voter的REJECT_TERM_NOT_READY之后,会去检查,接受我的加上返回REJECT_TERM_NOT_READY的有没有超过半数。如果有的话就不等计时器,立马再发起一次投票。这样做的原因是,voting candidate 知道所有返回REJECT_TERM_NOT_READY 的voter 都是立刻马上就会把term更新到和自己一样,并且他们还不会马上给自己投票,所以自己要赶在voter 的term和voting candidate一样之后,但是还没有给voter 自己投票之前,去拿到accept票。
// voting candidate react, notice this time it didn't reset the timer
else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
}
对比raft 原论文实现,dledger的voter 在收到比自己大的term时,虽然没有立即返回同意,但是在时间上,因为voter 会立刻更新term, voting candidate 也会立刻重新发送投票请求,所以其实可以认为两种实质上相同。
另外至于为什么原raft 论文说voter 应该退化成为follower, 而dledger实现是变成candidate。我们可以证明这两种是相同的。
- 假设新的leader被选举出来后,voter 能收到心跳包,那么
根据handleHeartbeat() 的实现,DLedger 的voter 会在收到一个和自己term 相同的leader 发来的心跳包之后退化为follower
else if (request.getTerm() == memberState.currTerm()) {
// This means it is Candidate?
if (memberState.getLeaderId() == null) {
changeRoleToFollower(request.getTerm(), request.getLeaderId());
return CompletableFuture.completedFuture(new HeartBeatResponse());
}
...
}
...
- 假设新的leader被选举出来后,voter 不能收到心跳包。那么原raft的实现follower会在timeout之后变成candidate再立刻发送投票请求。而对于Dledger的实现,刚变成candidate的voter 也会在等待一个计时器时间之后才发起投票请求。所以其实本质上两种实现是相同的。
接下来我们看个例子可能会更清楚一点。
一个全新的cluster启动
我们假设有4个 node, node 0, 1, 2 从follower转变为candidate的timeout 时间由小到大排序, 优先级由高到低。node 2 和 node 3 优先级可认为相同。
1.刚启动时,根据代码里的定义
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | null | 0 | candidate | WAIT_TO_REVOTE |
node1 | null | 0 | candidate | WAIT_TO_REVOTE |
node2 | null | 0 | candidate | WAIT_TO_REVOTE |
node3 | null | 0 | candidate | WAIT_TO_REVOTE |
- 所有node同时往外发出vote请求,在投自己一票之后,收到其他nodes回复之前,处于这个状态
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | node0 | 0 | candidate | WAIT_TO_REVOTE |
node1 | node1 | 0 | candidate | WAIT_TO_REVOTE |
node2 | node2 | 0 | candidate | WAIT_TO_REVOTE |
node3 | node3 | 0 | candidate | WAIT_TO_REVOTE |
以node1 情况为例,当node0 收到node 1 的请求时,开始判断
- candidate 的term 和node 1 的term相同 于是判断currVotedFor
- node 0 的currVotedFor和既不是null,也不是node1,因此会回复一个REJECT_ALREADY_VOTED。
其他情况同理,因此没有leader被选出来
而各个node在统计回复的时候会发现属于默认情况,于是执行以下语句
else {
// TODO so this is the most important branch ?
// TODO when will this be executed, wait till next round
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
}
状态如下
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | node0 | 0 | candidate | WAIT_TO_VOTE_NEXT |
node1 | node1 | 0 | candidate | WAIT_TO_VOTE_NEXT |
node2 | node2 | 0 | candidate | WAIT_TO_VOTE_NEXT |
node3 | node3 | 0 | candidate | WAIT_TO_VOTE_NEXT |
这里注意,不同node的nextTimeToRequestVote就开始变得不同了
- 因为node 0是最高优先级,首先进入下一个投票阶段。由于其处于WAIT_TO_VOTE_NEXT阶段,因此需要
调用memberState.nextTerm()
。 这个方法非常关键,因为这个方法除了更新term外(注意不一定是+1,细节之后讲),还会把currVotedFor设置成为null。对于node0,调用了memberState.nextTerm()
之后处于如下状态
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | null | 1 | candidate | WAIT_TO_VOTE_NEXT |
node1 | node1 | 0 | candidate | WAIT_TO_VOTE_NEXT |
node2 | node2 | 0 | candidate | WAIT_TO_VOTE_NEXT |
node3 | node3 | 0 | candidate | WAIT_TO_VOTE_NEXT |
然后node0 会向自己和其他node发送vote请求, (因此node0的 currVotedFor会又变成node0)
- node 1,2,3在收到请求之后,会发现请求中包含的term大于自身的term,因此返回REJECT_TERM_NOT_READY, 并在返回前执行以下两行代码
changeRoleToCandidate(request.getTerm());
needIncreaseTermImmediately = true;
这两句话也很重要,其中changeRoleToCandidate(request.getTerm()) 除了把角色变更为Candidate之外(变更前可能处于follower,可能处于leader,更有可能已经处于candidate状态了)还会检查是否需要更新knownMaxTermInGroup 的值,这个值是当前节点所遇见过的最大的term值。
我们之前提到的memberState.nextTerm() 方法就和这个有关
public synchronized long nextTerm() {
... // some checking
if (knownMaxTermInGroup > currTerm) {
currTerm = knownMaxTermInGroup;
} else {
++currTerm;
}
currVoteFor = null;
persistTerm();
return currTerm;
}
可见,如果在调用nextTerm时, knownMaxTermInGroup比currTerm大,则仅仅将currTerm设置为knownMaxTermInGroup。而只有当两者相同时,currTerm才会在原有基础上加1
这里很重要,因为其表达了两种不同的状态转移。
回到我们的情况,node 1, 2, 3 由于设置了, 在下一个紧接着的maintainAsCandidate() 函数调用中不会立即返回,而是会接着执行下去。
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
return;
}
根据maintainAsCandidate()的逻辑,node 1, 2 ,3 也会立刻调用一次nextTerm(),刚刚调用nextTerm() 之后的状态如下
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
node1 | null | 1 | candidate | WAIT_TO_VOTE_NEXT |
node2 | null | 1 | candidate | WAIT_TO_VOTE_NEXT |
node3 | null | 1 | candidate | WAIT_TO_VOTE_NEXT |
不过也是由于needIncreaseTermImmediately为true,node 1,2,3 并不会立即马上发起投票,而是执行以下代码
if (needIncreaseTermImmediately) {
nextTimeToRequestVote = getNextTimeToRequestVote();
needIncreaseTermImmediately = false;
return;
}
也就是说node 1,2,3 不会立刻发送vote 请求,因此currVotedFor在一段时间内都会保持为null,一段时间多长是由nextTimeToRequestVote决定的。之后我们会谈到这个时间是怎么来的。这段时间就是node 0 能成为leader的原因。
- 在node 1,2,3 回复给node 0 REJECT_TERM_NOT_READY 之后, node 0 非常有可能会执行到这一段代码
else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
}
notReadyTermNum 就是返回的response中REJECT_TERM_NOT_READY的数目。这段代码里最重要的不是写了的部分,而是没写的部分。
注意到这段代码里没有调用nextTimeToRequestVote = getNextTimeToRequestVote();
也就是没有调用计时器,这意味着再立即执行一个vote。不过这次并没有增加term的值,term的值增加的条件是
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
long prevTerm = memberState.currTerm();
term = memberState.nextTerm();
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
}
- 当node 0 的第二次vote 请求到达 node 1,2,3 时,状态如下
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | node0 | 1 | candidate | REVOTE_IMMEDIATELY |
node1 | null | 1 | candidate | WAIT_TO_VOTE_NEXT |
node2 | null | 1 | candidate | WAIT_TO_VOTE_NEXT |
node3 | null | 1 | candidate | WAIT_TO_VOTE_NEXT |
- 由于node 1,2,3 发现currTerm和request中的term相同,并且也没有投过其他的,于是就非常快乐地同意了这个请求。 同意之后状态如下
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | node0 | 1 | candidate | REVOTE_IMMEDIATELY |
node1 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
node2 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
node3 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
- 而node0 收到超过半数的同意,自然也就成为leader,不过此时node 1 ,2, 3 还是 candidate 状态,什么时候变为follower状态呢?
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | node0 | 1 | leader | REVOTE_IMMEDIATELY |
node1 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
node2 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
node3 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
- 当node 0成为leader后会发送heartBeat,而node 123 在收到heartBeat的时候会做一些检查,检查通过后就把自己变成follower。
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node0 | node0 | 1 | leader | REVOTE_IMMEDIATELY |
node1 | node0 | 1 | follower | WAIT_TO_REVOTE |
node2 | node0 | 1 | follower | WAIT_TO_REVOTE |
node3 | node0 | 1 | follower | WAIT_TO_REVOTE |
8.1但是假设在7过后,node 0 直接宕机了,丢失所有通讯,虽然node 1 ,2, 3 都同意了,但是由于没有收到心跳包回复,并不会退化为follower状态。于是node1 成功进入第一次投票,但是它会发现自己投票失败了,并且集群里没找到有leader的follower。假设 node 2, 3 同样如此,于是
node 2, 3, 也都给自己投了一票,发现有效validNums 通过了,收到了有效回复,但是因为赞同票没有过半,直接进入WAIT_TO_VOTE_NEXT 状态
9.2 如果此时leader直接挂掉,第一个成为candi的会立刻发送投票请求,但是会收到错误,发现集群中仍有leader,于是重置计时器(注意并没有增加term),这种情况会一直持续到所有follower都自动超时变成candidate才结束, 而由于假设node 3 最后才超时, 所以node 3 会直接timeout而成为candidate。 由于这种timeout掉成为candidate的并没有重置计时器,因此会立刻发起一次vote 请求。由于此时node 1, 2 可能还在处于WAIT_TO_REVOTE的计时器当中,因此node 3 发出vote 请求。由于此次请求没有任何node还在followr状态,因此node 3 不会继续处于 WAIT_TO_REVOTE 状态,而是认为此次投票失败,进入WAIT_TO_VOTE_NEXT 状态。
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node1 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
node2 | node0 | 1 | candidate | WAIT_TO_REVOTE |
node3 | node0 | 1 | candidate | WAIT_TO_REVOTE |
虽然node 3 是WAIT_TO_VOTE_NEXT 状态了,但是毕竟还是要再等一个计时器的时间才能更新term的值。 再此期间 node 1, 2 均有可能进入WAIT_TO_VOTE_NEXT 状态(当然也有可能一个基本计时器时间阻止了这种事情的发生。),不过一定有一个最先处于WAIT_TO_VOTE_NEXT 的node 更新term的值,同时向还没有更新term的节点发出投票请求, 我们假设是node 1。
node | currVotedFor | currTerm | role | lastParseResult |
---|---|---|---|---|
node1 | node0 | 2 | candidate | WAIT_TO_REVOTE |
node2 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
node3 | node0 | 1 | candidate | WAIT_TO_VOTE_NEXT |
而收到的这样的投票请求的节点,无论是处于WAIT_TO_VOTE_NEXT 状态还是处于WAIT_TO_REVOTE 状态,都还没有更新term的值, 因此他们会返回TERM_NOT_READY 然后设置needIncreaseTermImmediately为True。
之后的事和前面分析的就很差不多了。
我们现在需要想的是,如果这种情况下接收node把needIncreaseTermImmediately设置为true之后,到底处于什么状态,是WAIT_TO_VOTE_NEXT 还是 WAIT_TO_REVOTE 状态呢 还是都有可能呢?
这个其实很重要,因为如果处于WAIT_TO_VOTE_NEXT 状态想表达的意思是大家term都相同的情况下我得把term + 1 打破僵局。然而现在已经有人比我先打破这个僵局了,我也因为那个人的vote request 把term 更新到和他一样了,那么也没有必要再去增加term了。
举个例子,假设node1 在收到node 2, 3, NOT_READY_TERM 之后宕机了。本应该立刻向2,3 再次发送的vote 请求没能发出。node 2 顺利渡过因为needIncreaseTermImmediately 的一段计时器,带着WAIT_TO_VOTE_NEXT 状态再次进入maintainAsCandidate(),这次会把term + 1, 这样的话又会有额外的一轮投票。我们有没有可能在设置needIncreaseTermImmediately的时候把candidate的状态设置为WAIT_TO_REVOTE 呢(也就是默认初识状态)
pre-vote 体现在哪里
代码里没有明确的命名某个方法为preVote相关,但是我们的revote机制看起来有点相关。接下来我们深入探讨一下到底有没有revote相关的机制。
一些思考
首先最有趣的一点在于,我们其实可以发现,任何能够投出Accept票的节点都处于candidate 状态。首先我们知道,所有的代码里只有一行是可以改变某一个node的term的,也就是
memberState.nextTerm() 方法,而这个方法在maintainAsCandidate() 中,也就是说,一个node如果想要升高自己的term,必须处于Candidate 的WAIT_TO_VOTE_NEXT 状态或者 needIncreaseTermImmediately 为true。
其次,在原论文中,每个appendEntries RPC 也会携带leader的term以及一些其他的控制信息(比如maxCommitIndex)。如果某个follower发现leader的term比自己当前的term大,也会更新自己的term。但是似乎在Dledger的实现里,control plane和data plane分的很开,所有和term相关的状态转移都在DLedgerLeaderElector中实现了(不过不是很确定,我还没看appendEntries,只是发现nextTerm只在maintainAsCandidate()处被使用过)
只有follower和leader才有leaderId, candidate没有leaderId的。因此我们可以近似认为,处于WAIT_TO_REVOTE 但是还没有发起过任何 投票请求的candidate才是真正能投票的人,任何想要投出有效票数的node都必须先成为candidate节点, 并且还没有投过票才算。如果已经给自己投过一票了,那么就算是真正的成为该term下的竞争者了。