zookeeper一致性算法系列 --- 选举
概念介绍
electionEpoch 选举周期, Notification中的字段
logicalclock 逻辑时钟, 每次进入选举流程都会自增加1, 如果收到的选举包的比我的大, logicalclock = notification.electionEpoch, 并更新选票, 重新发送选票
peerEpoch leader周期
lastProcessedZxid 选举之前已经处理的zk事物id
proposedLeader 推选的leader id
proposedEpoch 推选的leader 的peerEpoch
proposedZxid 推选的leader 的事物id
选举原则, 数据最新的节点优先, myid大的优先
选举
集群对外提供服务之前的第一步, 选主过程, 一般三种情况触发选举过程
- 集群初始化(包括集群中的节点重启, 重新加入集群)
- leader挂了, follower没有收到leader的心跳包
- leader未收到follower的心跳回复
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(
self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
self.start_fle = System.currentTimeMillis();
}
try {
# 收到的选票, 包括leading, following, looking状态的选票
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
# 包括following, leading状态的选票
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
# 逻辑时钟自增
logicalclock++;
# 初始化选票信息为自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
# 给集群中的cadinate 发送选票
sendNotifications();
/*
* Loop in which we exchange notifications until we find a leader
*/
### 判断自身状态是否在选举状态
### 注意!!!!!!
### 当选举流程退出之后, 接收线程和发送线程并不退出, 继续处理工作状态
### 当一个节点重启之后就会用到, 此时 follwer/leader会直接将集群当前的leader信息发回
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
# 从队列中抓notificaiton
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
/*
* Exponential backoff
*/
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
# 排除不属于该集群的节点以及OBSERVER节点
else if(self.getVotingView().containsKey(n.sid)) {
/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
switch (n.state) {
# 发送消息的节点正在选主状态
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock) {
# 对方的选举周期比当前节点大, 更新逻辑时钟为对方选举周期以及选票信息
logicalclock = n.electionEpoch;
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
# 选票信息更新了, 重新发送选票信息
sendNotifications();
} else if (n.electionEpoch < logicalclock) {
# 对方的选举周期比我的小, 不理就好了
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
# 选举周期相同, 选票pk, peerEpoch > zxid > myid
# 对方的选票信息代表的数据比我的选票数据更新, 更新我的选票, 重新发送选票
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
# 将收到的选票信息保存
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
# 判断收到的选票是否与我推举的leader一致, 是否可以结束选主流程
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid,
logicalclock,
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
# 1. 节点重启, 因为集群中有leader, 因此不会触发整个集群选主, 新加入的节点会收到leader/follower发来的notification
# 2. 选主过程中, 已经有节点成为leader, 还为结束选举的节点也会来到这里
/*
* Consider all notifications from the same epoch
* together.
*/
if(n.electionEpoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch));
if(ooePredicate(recvset, outofelection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify
* a majority is following the same leader.
*/
outofelection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if(ooePredicate(outofelection, outofelection, n)) {
synchronized(this){
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
n.state, n.sid);
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
try {
if(self.jmxLeaderElectionBean != null){
MBeanRegistry.getInstance().unregister(
self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
}
}