ZooKeeper集群模式启动源码分析之同步过程分析

前面我们分析了zk启动的选举过程,这篇文章分析同步过程。

leader的同步过程

同步入口是QuorumPeer的run方法:

case LEADING:
    LOG.info("LEADING");
    try {
        setLeader(makeLeader(logFactory));
        leader.lead();
        setLeader(null);
    } catch (Exception e) {
        LOG.warn("Unexpected exception", e);
    } finally {
        if (leader != null) {
            leader.shutdown("Forcing shutdown");
            setLeader(null);
        }
        updateServerState();
    }
    break;
}
  1. makeLeader实例化一个Leader类,setLeader把实例化的Leader类赋值给leader成员变量。
  2. leader.lead开始同步过程。
  3. finally的updateServerState将当前状态设置为LOOKING,重新开始寻找主服务器。对于leader,通常发生在本节点与集群其它节点的连接断开时。对于其它节点,通常发生在与leader的同步端口断开连接时或者无法连接leader的同步端口时。

接下来看Leader的lead方法:

void lead() throws IOException, InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
    LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
    self.start_fle = 0;
    self.end_fle = 0;

    zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        self.tick.set(0);
        zk.loadData();

        leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

        // Start thread that waits for connection requests from
        // new followers.
        cnxAcceptor = new LearnerCnxAcceptor();
        cnxAcceptor.start();

        long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

        zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

        synchronized (this) {
            lastProposed = zk.getZxid();
        }

        newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);

        if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
            LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
        }

        QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
        QuorumVerifier curQV = self.getQuorumVerifier();
        if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
            // This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly
            // specified by the user; the lack of version in a config file is interpreted as version=0).
            // As soon as a config is established we would like to increase its version so that it
            // takes presedence over other initial configs that were not established (such as a config
            // of a server trying to join the ensemble, which may be a partial view of the system, not the full config).
            // We chose to set the new version to the one of the NEWLEADER message. However, before we can do that
            // there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE,
            // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier,
            // and there's still no agreement on the new version that we'd like to use. Instead, we use
            // lastSeenQuorumVerifier which is being sent with NEWLEADER message
            // so its a good way to let followers know about the new version. (The original reason for sending
            // lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs
            // that it finds before starting to propose operations. Here we're reusing the same code path for
            // reaching consensus on the new version number.)

            // It is important that this is done before the leader executes waitForEpochAck,
            // so before LearnerHandlers return from their waitForEpochAck
            // hence before they construct the NEWLEADER message containing
            // the last-seen-quorumverifier of the leader, which we change below
            try {
                LOG.debug(String.format("set lastSeenQuorumVerifier to currentQuorumVerifier (%s)", curQV.toString()));
                QuorumVerifier newQV = self.configFromString(curQV.toString());
                newQV.setVersion(zk.getZxid());
                self.setLastSeenQuorumVerifier(newQV, true);
            } catch (Exception e) {
                throw new IOException(e);
            }
        }

        newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
            newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        // We have to get at least a majority of servers in sync with
        // us. We do this by waiting for the NEWLEADER packet to get
        // acknowledged

        waitForEpochAck(self.getId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);
        self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());
        self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);

        try {
            waitForNewLeaderAck(self.getId(), zk.getZxid());
        } catch (InterruptedException e) {
            shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                     + newLeaderProposal.ackSetsToString()
                     + " ]");
            HashSet<Long> followerSet = new HashSet<Long>();

            for (LearnerHandler f : getLearners()) {
                if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
                    followerSet.add(f.getSid());
                }
            }
            boolean initTicksShouldBeIncreased = true;
            for (Proposal.QuorumVerifierAcksetPair qvAckset : newLeaderProposal.qvAcksetPairs) {
                if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
                    initTicksShouldBeIncreased = false;
                    break;
                }
            }
            if (initTicksShouldBeIncreased) {
                LOG.warn("Enough followers present. Perhaps the initTicks need to be increased.");
            }
            return;
        }

        startZkServer();

        /**
         * WARNING: do not use this for anything other than QA testing
         * on a real cluster. Specifically to enable verification that quorum
         * can handle the lower 32bit roll-over issue identified in
         * ZOOKEEPER-1277. Without this option it would take a very long
         * time (on order of a month say) to see the 4 billion writes
         * necessary to cause the roll-over to occur.
         *
         * This field allows you to override the zxid of the server. Typically
         * you'll want to set it to something like 0xfffffff0 and then
         * start the quorum, run some operations and see the re-election.
         */
        String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
        if (initialZxid != null) {
            long zxid = Long.parseLong(initialZxid);
            zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
        }

        if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
            self.setZooKeeperServer(zk);
        }

        self.setZabState(QuorumPeer.ZabState.BROADCAST);
        self.adminServer.setZooKeeperServer(zk);

        // Everything is a go, simply start counting the ticks
        // WARNING: I couldn't find any wait statement on a synchronized
        // block that would be notified by this notifyAll() call, so
        // I commented it out
        //synchronized (this) {
        //    notifyAll();
        //}
        // We ping twice a tick, so we only update the tick every other
        // iteration
        boolean tickSkip = true;
        // If not null then shutdown this leader
        String shutdownMessage = null;

        while (true) {
            synchronized (this) {
                long start = Time.currentElapsedTime();
                long cur = start;
                long end = start + self.tickTime / 2;
                while (cur < end) {
                    wait(end - cur);
                    cur = Time.currentElapsedTime();
                }

                if (!tickSkip) {
                    self.tick.incrementAndGet();
                }

                // We use an instance of SyncedLearnerTracker to
                // track synced learners to make sure we still have a
                // quorum of current (and potentially next pending) view.
                SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
                syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
                if (self.getLastSeenQuorumVerifier() != null
                    && self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
                    syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
                }

                syncedAckSet.addAck(self.getId());

                for (LearnerHandler f : getLearners()) {
                    if (f.synced()) {
                        syncedAckSet.addAck(f.getSid());
                    }
                }

                // check leader running status
                if (!this.isRunning()) {
                    // set shutdown flag
                    shutdownMessage = "Unexpected internal error";
                    break;
                }

                if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
                    // Lost quorum of last committed and/or last proposed
                    // config, set shutdown flag
                    shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
                                      + syncedAckSet.ackSetsToString()
                                      + " ]";
                    break;
                }
                tickSkip = !tickSkip;
            }
            for (LearnerHandler f : getLearners()) {
                f.ping();
            }
        }
        if (shutdownMessage != null) {
            shutdown(shutdownMessage);
            // leader goes in looking state
        }
    } finally {
        zk.unregisterJMX(this);
    }

主要过程如下:

  1. self.setZabState(QuorumPeer.ZabState.DISCOVERY),设置当前的ZAB(zk原子广播协议)状态为DISCOVERY。
  2. cnxAcceptor.start()监听同步端口。
  3. long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch())等待半数以上节点连接到同步端口,并确定新的纪元(epoch)的提案值。
  4. zk.setZxid(ZxidUtils.makeZxid(epoch, 0))根据新协商的纪元确定新的zxid。
  5. waitForEpochAck(self.getId(), leaderStateSummary),等待半数节点确认新纪元的提案,如果有follower的状态比leader新,则抛出异常开始重新选举。
  6. self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION),设置当前的ZAB(zk原子广播协议)状态为SYNCHRONIZATION。
  7. 开始与follower和observer同步数据,leader会告诉其它服务器它缺失的数据或者多余的数据,其它服务器会截短多余的数据或者写入缺失的数据,这个过程后,follower或observer与leader的数据变得一致。
  8. waitForNewLeaderAck(self.getId(), zk.getZxid())等待半数节点数据同步完成。
  9. startZkServer(),启动Zookeeper服务器,开始接受请求。
  10. if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
    self.setZooKeeperServer(zk);
    },如果leader需要接受客户端请求,则设置ZooKeeperServer。禁用可以减轻主服务器的压力。
  11. self.setZabState(QuorumPeer.ZabState.BROADCAST),设置当前的ZAB(zk原子广播协议)状态为BROADCAST。
  12. 接下来的循环是判断是否丢失了与超过一半follower的连接,如果是,重新开始选举。
follower同步过程

同步入口是QuorumPeer的run方法:

try {
    LOG.info("FOLLOWING");
    setFollower(makeFollower(logFactory));
    follower.followLeader();
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
} finally {
    follower.shutdown();
    setFollower(null);
    updateServerState();
}
break;
  1. makeFollower实例化一个Follower类,setFollower把实例化的Follower类赋值给follower成员变量。
  2. follower.followLeader开始同步过程。
  3. finally的updateServerState将当前状态设置为LOOKING,重新开始寻找主服务器。对于leader,通常发生在本节点与集群其它节点的连接断开时。对于其它节点,通常发生在与leader的同步端口断开连接时或者无法连接leader的同步端口时。

接下来看Follower的followLeader方法:

void followLeader() throws InterruptedException {
    self.end_fle = Time.currentElapsedTime();
    long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);
    ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
    LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
    self.start_fle = 0;
    self.end_fle = 0;
    fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);

    long connectionTime = 0;
    boolean completedSync = false;

    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        QuorumServer leaderServer = findLeader();
        try {
            connectToLeader(leaderServer.addr, leaderServer.hostname);
            connectionTime = System.currentTimeMillis();
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
            if (self.isReconfigStateChange()) {
                throw new Exception("learned about role change");
            }
            //check to see if the leader zxid is lower than ours
            //this should never happen but is just a safety check
            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch "
                          + ZxidUtils.zxidToString(newEpochZxid)
                          + " is less than our accepted epoch "
                          + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            long startTime = Time.currentElapsedTime();
            try {
                self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
                self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                syncWithLeader(newEpochZxid);
                self.setZabState(QuorumPeer.ZabState.BROADCAST);
                completedSync = true;
            } finally {
                long syncTime = Time.currentElapsedTime() - startTime;
                ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
            }
            if (self.getObserverMasterPort() > 0) {
                LOG.info("Starting ObserverMaster");

                om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                om.start();
            } else {
                om = null;
            }
            // create a reusable packet to reduce gc impact
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning()) {
                readPacket(qp);
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when following the leader", e);
            closeSocket();

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        if (om != null) {
            om.stop();
        }
        zk.unregisterJMX(this);

        if (connectionTime != 0) {
            long connectionDuration = System.currentTimeMillis() - connectionTime;
            LOG.info(
                "Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
                leaderAddr,
                connectionDuration,
                completedSync);
            messageTracker.dumpToLog(leaderAddr.toString());
        }
    }
}
  1. self.setZabState(QuorumPeer.ZabState.DISCOVERY),设置当前的ZAB(zk原子广播协议)状态为DISCOVERY。
  2. connectToLeader(leaderServer.addr, leaderServer.hostname),连接leader的同步端口。
  3. registerWithLeader(Leader.FOLLOWERINFO),向leader发送自己的epoch提案值,并等待半数节点发送自己的epoch提案值。然后等待半数节点确认新纪元的提案,如果自己的epoch比最终确认的epoch大,则抛出异常开始重新选举。
  4. self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION),设置当前的ZAB(zk原子广播协议)状态为SYNCHRONIZATION。
  5. syncWithLeader(newEpochZxid)与leader同步数据,写入缺失的数据和截短多余的数据。
  6. self.setZabState(QuorumPeer.ZabState.BROADCAST),同步完成,设置当前的ZAB(zk原子广播协议)状态为BROADCAST。
  7. 如果自己要作为Observer的Master存在,则初始化ObserverMaster,监听同步端口,处理Observer的请求。
  8. 接下来的循环就是从leader服务器读到请求包,然后再重放请求了。如果连接断开了,则返回并重新开始选举。
observer同步过程

同步入口是QuorumPeer的run方法:

case OBSERVING:
    try {
        LOG.info("OBSERVING");
        setObserver(makeObserver(logFactory));
        observer.observeLeader();
    } catch (Exception e) {
        LOG.warn("Unexpected exception", e);
    } finally {
        observer.shutdown();
        setObserver(null);
        updateServerState();

        // Add delay jitter before we switch to LOOKING
        // state to reduce the load of ObserverMaster
        if (isRunning()) {
            Observer.waitForObserverElectionDelay();
        }
    }
    break;
  1. makeObserver实例化一个Observer类,setObserver把实例化的Observer类赋值给observer成员变量。
  2. observer.observeLeader开始同步过程。
  3. finally的updateServerState将当前状态设置为LOOKING,重新开始寻找主服务器。对于leader,通常发生在本节点与集群其它节点的连接断开时。对于其它节点,通常发生在与leader的同步端口断开连接时或者无法连接leader的同步端口时。

接下来我们看一下Observer的observeLeader方法:

void observeLeader() throws Exception {
    zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
    long connectTime = 0;
    boolean completedSync = false;
    try {
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        QuorumServer master = findLearnerMaster();
        try {
            connectToLeader(master.addr, master.hostname);
            connectTime = System.currentTimeMillis();
            long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
            if (self.isReconfigStateChange()) {
                throw new Exception("learned about role change");
            }

            self.setLeaderAddressAndId(master.addr, master.getId());
            self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
            syncWithLeader(newLeaderZxid);
            self.setZabState(QuorumPeer.ZabState.BROADCAST);
            completedSync = true;
            QuorumPacket qp = new QuorumPacket();
            while (this.isRunning() && nextLearnerMaster.get() == null) {
                readPacket(qp);
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when observing the leader", e);
            closeSocket();

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        currentLearnerMaster = null;
        zk.unregisterJMX(this);
        if (connectTime != 0) {
            long connectionDuration = System.currentTimeMillis() - connectTime;

            LOG.info(
                "Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
                leaderAddr,
                connectionDuration,
                completedSync);
            messageTracker.dumpToLog(leaderAddr.toString());
        }
    }
}
  1. self.setZabState(QuorumPeer.ZabState.DISCOVERY),设置当前的ZAB(zk原子广播协议)状态为DISCOVERY。
  2. QuorumServer master = findLearnerMaster(),找到一个leader或者follower作为master,Observer可能不直接连接leader,这样做的好处是减轻leader的负担。
  3. connectToLeader(master.addr, master.hostname)连接到master的同步端口。
  4. long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO)向master发送请求拿到确定的纪元epoch。
  5. self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION),设置当前的ZAB(zk原子广播协议)状态为SYNCHRONIZATION。
  6. syncWithLeader(newEpochZxid)与master同步数据,写入缺失的数据和截短多余的数据。
  7. self.setZabState(QuorumPeer.ZabState.BROADCAST),同步完成,设置当前的ZAB(zk原子广播协议)状态为BROADCAST。
  8. 接下来的循环就是从master服务器读到请求包,然后再重放请求了。如果连接断开了,则返回并重新开始选举。
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。