前面我们分析了zk启动的选举过程,这篇文章分析同步过程。
leader的同步过程
同步入口是QuorumPeer的run方法:
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
- makeLeader实例化一个Leader类,setLeader把实例化的Leader类赋值给leader成员变量。
- leader.lead开始同步过程。
- finally的updateServerState将当前状态设置为LOOKING,重新开始寻找主服务器。对于leader,通常发生在本节点与集群其它节点的连接断开时。对于其它节点,通常发生在与leader的同步端口断开连接时或者无法连接leader的同步端口时。
接下来看Leader的lead方法:
void lead() throws IOException, InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
self.tick.set(0);
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized (this) {
lastProposed = zk.getZxid();
}
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
}
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
QuorumVerifier curQV = self.getQuorumVerifier();
if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
// This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly
// specified by the user; the lack of version in a config file is interpreted as version=0).
// As soon as a config is established we would like to increase its version so that it
// takes presedence over other initial configs that were not established (such as a config
// of a server trying to join the ensemble, which may be a partial view of the system, not the full config).
// We chose to set the new version to the one of the NEWLEADER message. However, before we can do that
// there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE,
// not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier,
// and there's still no agreement on the new version that we'd like to use. Instead, we use
// lastSeenQuorumVerifier which is being sent with NEWLEADER message
// so its a good way to let followers know about the new version. (The original reason for sending
// lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs
// that it finds before starting to propose operations. Here we're reusing the same code path for
// reaching consensus on the new version number.)
// It is important that this is done before the leader executes waitForEpochAck,
// so before LearnerHandlers return from their waitForEpochAck
// hence before they construct the NEWLEADER message containing
// the last-seen-quorumverifier of the leader, which we change below
try {
LOG.debug(String.format("set lastSeenQuorumVerifier to currentQuorumVerifier (%s)", curQV.toString()));
QuorumVerifier newQV = self.configFromString(curQV.toString());
newQV.setVersion(zk.getZxid());
self.setLastSeenQuorumVerifier(newQV, true);
} catch (Exception e) {
throw new IOException(e);
}
}
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// We have to get at least a majority of servers in sync with
// us. We do this by waiting for the NEWLEADER packet to get
// acknowledged
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
try {
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ newLeaderProposal.ackSetsToString()
+ " ]");
HashSet<Long> followerSet = new HashSet<Long>();
for (LearnerHandler f : getLearners()) {
if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
followerSet.add(f.getSid());
}
}
boolean initTicksShouldBeIncreased = true;
for (Proposal.QuorumVerifierAcksetPair qvAckset : newLeaderProposal.qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
initTicksShouldBeIncreased = false;
break;
}
}
if (initTicksShouldBeIncreased) {
LOG.warn("Enough followers present. Perhaps the initTicks need to be increased.");
}
return;
}
startZkServer();
/**
* WARNING: do not use this for anything other than QA testing
* on a real cluster. Specifically to enable verification that quorum
* can handle the lower 32bit roll-over issue identified in
* ZOOKEEPER-1277. Without this option it would take a very long
* time (on order of a month say) to see the 4 billion writes
* necessary to cause the roll-over to occur.
*
* This field allows you to override the zxid of the server. Typically
* you'll want to set it to something like 0xfffffff0 and then
* start the quorum, run some operations and see the re-election.
*/
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.setZooKeeperServer(zk);
}
self.setZabState(QuorumPeer.ZabState.BROADCAST);
self.adminServer.setZooKeeperServer(zk);
// Everything is a go, simply start counting the ticks
// WARNING: I couldn't find any wait statement on a synchronized
// block that would be notified by this notifyAll() call, so
// I commented it out
//synchronized (this) {
// notifyAll();
//}
// We ping twice a tick, so we only update the tick every other
// iteration
boolean tickSkip = true;
// If not null then shutdown this leader
String shutdownMessage = null;
while (true) {
synchronized (this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!tickSkip) {
self.tick.incrementAndGet();
}
// We use an instance of SyncedLearnerTracker to
// track synced learners to make sure we still have a
// quorum of current (and potentially next pending) view.
SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
syncedAckSet.addAck(self.getId());
for (LearnerHandler f : getLearners()) {
if (f.synced()) {
syncedAckSet.addAck(f.getSid());
}
}
// check leader running status
if (!this.isRunning()) {
// set shutdown flag
shutdownMessage = "Unexpected internal error";
break;
}
if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
// Lost quorum of last committed and/or last proposed
// config, set shutdown flag
shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
+ syncedAckSet.ackSetsToString()
+ " ]";
break;
}
tickSkip = !tickSkip;
}
for (LearnerHandler f : getLearners()) {
f.ping();
}
}
if (shutdownMessage != null) {
shutdown(shutdownMessage);
// leader goes in looking state
}
} finally {
zk.unregisterJMX(this);
}
主要过程如下:
- self.setZabState(QuorumPeer.ZabState.DISCOVERY),设置当前的ZAB(zk原子广播协议)状态为DISCOVERY。
- cnxAcceptor.start()监听同步端口。
- long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch())等待半数以上节点连接到同步端口,并确定新的纪元(epoch)的提案值。
- zk.setZxid(ZxidUtils.makeZxid(epoch, 0))根据新协商的纪元确定新的zxid。
- waitForEpochAck(self.getId(), leaderStateSummary),等待半数节点确认新纪元的提案,如果有follower的状态比leader新,则抛出异常开始重新选举。
- self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION),设置当前的ZAB(zk原子广播协议)状态为SYNCHRONIZATION。
- 开始与follower和observer同步数据,leader会告诉其它服务器它缺失的数据或者多余的数据,其它服务器会截短多余的数据或者写入缺失的数据,这个过程后,follower或observer与leader的数据变得一致。
- waitForNewLeaderAck(self.getId(), zk.getZxid())等待半数节点数据同步完成。
- startZkServer(),启动Zookeeper服务器,开始接受请求。
- if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.setZooKeeperServer(zk);
},如果leader需要接受客户端请求,则设置ZooKeeperServer。禁用可以减轻主服务器的压力。 - self.setZabState(QuorumPeer.ZabState.BROADCAST),设置当前的ZAB(zk原子广播协议)状态为BROADCAST。
- 接下来的循环是判断是否丢失了与超过一半follower的连接,如果是,重新开始选举。
follower同步过程
同步入口是QuorumPeer的run方法:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
- makeFollower实例化一个Follower类,setFollower把实例化的Follower类赋值给follower成员变量。
- follower.followLeader开始同步过程。
- finally的updateServerState将当前状态设置为LOOKING,重新开始寻找主服务器。对于leader,通常发生在本节点与集群其它节点的连接断开时。对于其它节点,通常发生在与leader的同步端口断开连接时或者无法连接leader的同步端口时。
接下来看Follower的followLeader方法:
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
long connectionTime = 0;
boolean completedSync = false;
try {
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
connectionTime = System.currentTimeMillis();
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
}
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch "
+ ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch "
+ ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
long startTime = Time.currentElapsedTime();
try {
self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newEpochZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
} finally {
long syncTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
}
if (self.getObserverMasterPort() > 0) {
LOG.info("Starting ObserverMaster");
om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
om.start();
} else {
om = null;
}
// create a reusable packet to reduce gc impact
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
closeSocket();
// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
if (om != null) {
om.stop();
}
zk.unregisterJMX(this);
if (connectionTime != 0) {
long connectionDuration = System.currentTimeMillis() - connectionTime;
LOG.info(
"Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
leaderAddr,
connectionDuration,
completedSync);
messageTracker.dumpToLog(leaderAddr.toString());
}
}
}
- self.setZabState(QuorumPeer.ZabState.DISCOVERY),设置当前的ZAB(zk原子广播协议)状态为DISCOVERY。
- connectToLeader(leaderServer.addr, leaderServer.hostname),连接leader的同步端口。
- registerWithLeader(Leader.FOLLOWERINFO),向leader发送自己的epoch提案值,并等待半数节点发送自己的epoch提案值。然后等待半数节点确认新纪元的提案,如果自己的epoch比最终确认的epoch大,则抛出异常开始重新选举。
- self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION),设置当前的ZAB(zk原子广播协议)状态为SYNCHRONIZATION。
- syncWithLeader(newEpochZxid)与leader同步数据,写入缺失的数据和截短多余的数据。
- self.setZabState(QuorumPeer.ZabState.BROADCAST),同步完成,设置当前的ZAB(zk原子广播协议)状态为BROADCAST。
- 如果自己要作为Observer的Master存在,则初始化ObserverMaster,监听同步端口,处理Observer的请求。
- 接下来的循环就是从leader服务器读到请求包,然后再重放请求了。如果连接断开了,则返回并重新开始选举。
observer同步过程
同步入口是QuorumPeer的run方法:
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
// Add delay jitter before we switch to LOOKING
// state to reduce the load of ObserverMaster
if (isRunning()) {
Observer.waitForObserverElectionDelay();
}
}
break;
- makeObserver实例化一个Observer类,setObserver把实例化的Observer类赋值给observer成员变量。
- observer.observeLeader开始同步过程。
- finally的updateServerState将当前状态设置为LOOKING,重新开始寻找主服务器。对于leader,通常发生在本节点与集群其它节点的连接断开时。对于其它节点,通常发生在与leader的同步端口断开连接时或者无法连接leader的同步端口时。
接下来我们看一下Observer的observeLeader方法:
void observeLeader() throws Exception {
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
long connectTime = 0;
boolean completedSync = false;
try {
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer master = findLearnerMaster();
try {
connectToLeader(master.addr, master.hostname);
connectTime = System.currentTimeMillis();
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
}
self.setLeaderAddressAndId(master.addr, master.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newLeaderZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
QuorumPacket qp = new QuorumPacket();
while (this.isRunning() && nextLearnerMaster.get() == null) {
readPacket(qp);
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when observing the leader", e);
closeSocket();
// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
currentLearnerMaster = null;
zk.unregisterJMX(this);
if (connectTime != 0) {
long connectionDuration = System.currentTimeMillis() - connectTime;
LOG.info(
"Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
leaderAddr,
connectionDuration,
completedSync);
messageTracker.dumpToLog(leaderAddr.toString());
}
}
}
- self.setZabState(QuorumPeer.ZabState.DISCOVERY),设置当前的ZAB(zk原子广播协议)状态为DISCOVERY。
- QuorumServer master = findLearnerMaster(),找到一个leader或者follower作为master,Observer可能不直接连接leader,这样做的好处是减轻leader的负担。
- connectToLeader(master.addr, master.hostname)连接到master的同步端口。
- long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO)向master发送请求拿到确定的纪元epoch。
- self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION),设置当前的ZAB(zk原子广播协议)状态为SYNCHRONIZATION。
- syncWithLeader(newEpochZxid)与master同步数据,写入缺失的数据和截短多余的数据。
- self.setZabState(QuorumPeer.ZabState.BROADCAST),同步完成,设置当前的ZAB(zk原子广播协议)状态为BROADCAST。
- 接下来的循环就是从master服务器读到请求包,然后再重放请求了。如果连接断开了,则返回并重新开始选举。