Zookeeper Leader和Follower初始化

说在前面

zookeeper ZAB Leader Elect 源码分析 ,已经详细的分析了zookeeper的选主过程,接下来的文章会分析Leader和Follower的初始化过程

初始化示例图

当leader被选出来之后,leader和follower进入集群形成和数据同步状态,包含以下几个过程

1. 连接连接

leader会根据zoo.cfg里面配置ip的第一个端口启动连接监听器LearnerCnxAcceptorHandler来监控follower的连接请求


connect_init.png
新epoch的确定

新形成的集群需要一个新的epoch来表示大家目前是不是工作共一个circle中


new_epoch.png
数同步过程

当集群新的epoch确定之后,集群就开始进行数据恢复,数据恢复完成之后,follower和leader的数据处理引擎启动,之后集群就可以向外提供服务了


data_sync.png

下面进行源码的详解

Leader

经过几轮投票之后,Leader被成功的选了出来,Leading 所对应的QuorumPeer类就会进入Leading过程


while (running) {
                switch (getPeerState()) {
                   case LOOKING:  .....
                   case FOLLOWING: .....
                   case OBSERVING: ......
                   case LEADING:
                       LOG.info("LEADING");
                      try {
                        //生成Leader对象
                        setLeader(makeLeader(logFactory));
                       //作为集群老大开始领导大家
                        leader.lead();
                        setLeader(null);
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                        } finally {
                            if (leader != null) {
                                leader.shutdown("Forcing shutdown");
                                setLeader(null);
                            }
                            updateServerState();
                        }
                        break;
                }

               }

       }

QuorumPeer 创建Leader对象

Leader对象继承自LearnerMaster,Leader的属性太多,我们就不在这里介绍了,在后面如果遇到对应的属性我们再做详细解析

new Leader
// LeaderZooKeeperServer 继承ZookeeperServer,它代表的是节点在Leader角色下的zookeeper服务
public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
        this.self = self;
        this.proposalStats = new BufferStats();
        //获得Leader节点的监听ip地址端口
        Set<InetSocketAddress> addresses;
        if (self.getQuorumListenOnAllIPs()) {
            addresses = self.getQuorumAddress().getWildcardAddresses();
        } else {
            addresses = self.getQuorumAddress().getAllAddresses();
        }

        addresses.stream()
          .map(address -> createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum()))
          .filter(Optional::isPresent)
          .map(Optional::get)
          .forEach(serverSockets::add);

        if (serverSockets.isEmpty()) {
            throw new IOException("Leader failed to initialize any of the following sockets: " + addresses);
        }

        this.zk = zk;
    }
new LeaderZooKeeperServer

LeaderZooKeeperServer的初始化过程最后会初始化ZookeeperServer,下面的代码我在zookeeper单机版server端启动源码分析中有解析过,这里不在重复

 public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
        serverStats = new ServerStats(this);
        this.txnLogFactory = txnLogFactory;
        this.txnLogFactory.setServerStats(this.serverStats);
        this.zkDb = zkDb;
        this.tickTime = tickTime;
        setMinSessionTimeout(minSessionTimeout);
        setMaxSessionTimeout(maxSessionTimeout);
        this.listenBacklog = clientPortListenBacklog;
        this.reconfigEnabled = reconfigEnabled;

        listener = new ZooKeeperServerListenerImpl(this);

        readResponseCache = new ResponseCache(Integer.getInteger(
            GET_DATA_RESPONSE_CACHE_SIZE,
            ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));

        getChildrenResponseCache = new ResponseCache(Integer.getInteger(
            GET_CHILDREN_RESPONSE_CACHE_SIZE,
            ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));

        this.initialConfig = initialConfig;

        this.requestPathMetricsCollector = new RequestPathMetricsCollector();

        this.initLargeRequestThrottlingSettings();

        LOG.info(
            "Created server with"
                + " tickTime {}"
                + " minSessionTimeout {}"
                + " maxSessionTimeout {}"
                + " clientPortListenBacklog {}"
                + " datadir {}"
                + " snapdir {}",
            tickTime,
            getMinSessionTimeout(),
            getMaxSessionTimeout(),
            getClientPortListenBacklog(),
            txnLogFactory.getDataDir(),
            txnLogFactory.getSnapDir());
    }

后面我们解析Leader数据处理链的时候还会解析,LeaderZooKeeperServer现在我们暂且不表。

Leader.lead

Leader创建完之后通过lead方法让自己进入Lead的过程,我们看下lead()方法的源代码,这个方法比较长我们一段一段的来看,先看第一段

            //设置zab的状态为discovery
           self.setZabState(QuorumPeer.ZabState.DISCOVERY);
            self.tick.set(0);
           //zookeeperServer去加载本地数据,正常的情况下由于在选主的时候zk本地的数据已经加载完成了,这里的loadData只是会做一个本地镜像
            zk.loadData();
            
            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

            // Start thread that waits for connection requests from
            // new followers.
           //创建leader端的连接监听器线程管理类,用来创建等待follower的连接的LearnerCnxAcceptorHandler连接监听器
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();
            //lead方法会在这个地方wait,等待有过半数follower到来,然后生成新的epoch
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

LearnerCnxAcceptor

Leader在lead过程中会创建LearnerCnxAcceptor我们看下LearnerCnxAcceptor的run实现

 public void run() {
            if (!stop.get() && !serverSockets.isEmpty()) {
                ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size());
                CountDownLatch latch = new CountDownLatch(serverSockets.size());
                 //根据leader绑定的ip来创建LearnerCnxAcceptorHandler
                serverSockets.forEach(serverSocket ->
                        executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)));

                try {
                    latch.await();
                } catch (InterruptedException ie) {
                    LOG.error("Interrupted while sleeping in LearnerCnxAcceptor.", ie);
                } finally {
                    closeSockets();
                    executor.shutdown();
                    try {
                        if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                            LOG.error("not all the LearnerCnxAcceptorHandler terminated properly");
                        }
                    } catch (InterruptedException ie) {
                        LOG.error("Interrupted while terminating LearnerCnxAcceptor.", ie);
                    }
                }
            }
        }

LearnerCnxAcceptorHandler

Leader端用来接收follower连接请求的线程
我看下LearnerCnxAcceptorHandler的run方法

public void run() {
                try {
                    Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + serverSocket.getLocalSocketAddress());

                    while (!stop.get()) {
                       //acceptConnections实现了Leader ServerSocket监听follower的连接请求
                        acceptConnections();
                    }
                } catch (Exception e) {
                    LOG.warn("Exception while accepting follower", e);
                    if (fail.compareAndSet(false, true)) {
                        handleException(getName(), e);
                        halt();
                    }
                } finally {
                    latch.countDown();
                }
            }
LearnerCnxAcceptorHandler.acceptConnections
  private void acceptConnections() throws IOException {
                Socket socket = null;
                boolean error = false;
                try {
                  //接收follower的连接请求
                    socket = serverSocket.accept();

                    // start with the initLimit, once the ack is processed
                    // in LearnerHandler switch to the syncLimit
                    //设置socket的超时时间
                    socket.setSoTimeout(self.tickTime * self.initLimit);
                    socket.setTcpNoDelay(nodelay);

                    BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
                    //创建LearnerHandler线程来表示和处理follower发送来的请求
                    LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
                    fh.start();
                } catch (SocketException e) {
                    error = true;
                    if (stop.get()) {
                        LOG.warn("Exception while shutting down acceptor.", e);
                    } else {
                        throw e;
                    }
                } catch (SaslException e) {
                    LOG.error("Exception while connecting to quorum learner", e);
                    error = true;
                } catch (Exception e) {
                    error = true;
                    throw e;
                } finally {
                    // Don't leak sockets on errors
                    if (error && socket != null && !socket.isClosed()) {
                        try {
                            socket.close();
                        } catch (IOException e) {
                            LOG.warn("Error closing socket: " + socket, e);
                        }
                    }
                }
            }
LearnerHandler 是follower在Leader端的表示

我们暂时先不做详细介绍,等下面讲Follower的时候我们再解析

Follower

上面讲解了master初始化的一部分,为什么不继续讲解呢,因为这个时候只有结合follower端的动作才能更好的理解,下面我们进入follower初始化过程的解析。
在master被选出来之后非master节点会把自己设置成follower节点(这里我们不讲observer)然后进入followLeader阶段,下面是follower节点QuorumPeer following代码片段


                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);
                        updateServerState();
                    }
                    break;

创建Follower

创建follower首先会创建FollowerZooKeeperServer,FollowerZooKeeperServer继承自ZooKeeperServer,代表了follower节点的zookeeper实例,但是它有自己独特的请求处理链在后续讲解请求处理的时候,我会详细解析

       new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));

follower.followLeader

Follower实例被创建起来之后通followLeader来与Leader进行交互

 void followLeader() throws InterruptedException {
        self.end_fle = Time.currentElapsedTime();
        long electionTimeTaken = self.end_fle - self.start_fle;
        self.setElectionTimeTaken(electionTimeTaken);
        ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
        LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
        self.start_fle = 0;
        self.end_fle = 0;
        fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);

        long connectionTime = 0;
        boolean completedSync = false;

        try {
            //把zab的状态由election改变成discovery
            self.setZabState(QuorumPeer.ZabState.DISCOVERY);
           //根据投票的结果找到Leader节点
            QuorumServer leaderServer = findLeader();
            try {
                 //建立到Leader节点的socket连接
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                connectionTime = System.currentTimeMillis();
                 //向Leader节点注册自己,获取到新的epoch,至此,follower已经成为zookeeper集群中一个合法的follower节点了
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
                if (self.isReconfigStateChange()) {
                    throw new Exception("learned about role change");
                }
                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch "
                              + ZxidUtils.zxidToString(newEpochZxid)
                              + " is less than our accepted epoch "
                              + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                long startTime = Time.currentElapsedTime();
                try {
                     //follower设置leader的adress和sid
                    self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
                  // follower更改zab状态为synchronization,进入数据同步状态
                    self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
                   //与leader同步数据
                    syncWithLeader(newEpochZxid);
                //同步完数据之后,follower修改zab状态为broadcast,zab协议的最后一个过程完成,节点可以向外提供服务了
                    self.setZabState(QuorumPeer.ZabState.BROADCAST);
                    completedSync = true;
                } finally {
                    long syncTime = Time.currentElapsedTime() - startTime;
                    ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
                }
                if (self.getObserverMasterPort() > 0) {
                    LOG.info("Starting ObserverMaster");

                    om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
                    om.start();
                } else {
                    om = null;
                }
                // create a reusable packet to reduce gc impact todo: at this point follower is ready to serve
                QuorumPacket qp = new QuorumPacket();
                //follower接下来会进入如下的循环体中,接受来自leader的消息然后处理
                while (this.isRunning()) {
                     //接受来自leader的消息
                    readPacket(qp);
                   //处理消息:关于消息的处理,我们放在后面解析
                    processPacket(qp);
                }
            } catch (Exception e) {
                LOG.warn("Exception when following the leader", e);
                closeSocket();

                // clear pending revalidations
                pendingRevalidations.clear();
            }
        } finally {
            if (om != null) {
                om.stop();
            }
            zk.unregisterJMX(this);

            if (connectionTime != 0) {
                long connectionDuration = System.currentTimeMillis() - connectionTime;
                LOG.info(
                    "Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
                    leaderAddr,
                    connectionDuration,
                    completedSync);
                messageTracker.dumpToLog(leaderAddr.toString());
            }
        }
    }
connectToLeader

Follower连接Leader,我们直接看实现


 protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {

        this.leaderAddr = multiAddr;
        Set<InetSocketAddress> addresses;
        if (self.isMultiAddressReachabilityCheckEnabled()) {
            // even if none of the addresses are reachable, we want to try to establish connection
            // see ZOOKEEPER-3758
            addresses = multiAddr.getAllReachableAddressesOrAll();
        } else {
            addresses = multiAddr.getAllAddresses();
        }
        ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
        CountDownLatch latch = new CountDownLatch(addresses.size());
        AtomicReference<Socket> socket = new AtomicReference<>(null);
       //根据leader的地址创建LeaderConnector,LeaderConnector是真正建立到Leader连接的线程类
        addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);

        try {
            latch.await();
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while trying to connect to Leader", e);
        } finally {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                    LOG.error("not all the LeaderConnector terminated properly");
                }
            } catch (InterruptedException ie) {
                LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
            }
        }

        if (socket.get() == null) {
            throw new IOException("Failed connect to " + multiAddr);
        } else {
            sock = socket.get();
        }

        self.authLearner.authenticate(sock, hostname);
        //通过LeaderConnector我们创建了到Leader的socket连接,
        // leaderIs封装了到leader socket的输入流
        leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
        bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
        // leaderOs封装了到leader socket的输出流
        leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
    }
LeaderConnector

LeaderConnector的作用就是follower建立到Leader的socket连接,它是一个线程类,当到到Leader的socket建立完成之后这个类的使命也就完成了,其线程也就结束了,这个类的实现没有什么好说了的,在这里就不解析了。

Follower.registerWithLeader

Follower向Leader注册自己的信息

 protected long registerWithLeader(int pktType) throws IOException {
        /*
         * Send follower info, including last zxid and sid
         */
        //follower从db中获取已经处理最大的事物id
        long lastLoggedZxid = self.getLastLoggedZxid();
         //QuorumPacket是Leader的Follower之间消息序列化载体
        QuorumPacket qp = new QuorumPacket();
        qp.setType(pktType);
        qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));

        /*
         * Add sid to payload
         */
        //LearningInfo表示的是本follower
        LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
        ByteArrayOutputStream bsid = new ByteArrayOutputStream();
        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
         //把follower的信息(sid,协议版本号,verify的版本号)序列化到ByteArrayOutputStream中,然后写入QuorumPacket的data中
        boa.writeRecord(li, "LearnerInfo");
        qp.setData(bsid.toByteArray());
        //把QuorumPacket通过socket发送给服务端
        writePacket(qp, true);
        //等待服务端的返回,leader会返回新的epoch给到客户端
        readPacket(qp);
        //从leader的返回中获得新的epoch
        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
        if (qp.getType() == Leader.LEADERINFO) {
            // we are connected to a 1.0 server so accept the new epoch and read the next packet
            leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
            byte[] epochBytes = new byte[4];
            final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
            if (newEpoch > self.getAcceptedEpoch()) {
                //把新的epoch写入到本机的acceptedEpoch文件中,表示follower在当前新circle中了
                wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
                self.setAcceptedEpoch(newEpoch);
            } else if (newEpoch == self.getAcceptedEpoch()) {
                  //如果新的epoch和acceptedEpoch相同,那么做不任何处理
                // since we have already acked an epoch equal to the leaders, we cannot ack
                // again, but we still need to send our lastZxid to the leader so that we can
                // sync with it if it does assume leadership of the epoch.
                // the -1 indicates that this reply should not count as an ack for the new epoch
                wrappedEpochBytes.putInt(-1);
            } else {
                //如果follower的acceptedEpoch大于新的epoch那么表示本机所在circle大于新集群设定的circle,直接报错
                throw new IOException("Leaders epoch, "
                                      + newEpoch
                                      + " is less than accepted epoch, "
                                      + self.getAcceptedEpoch());
            }
            //向leader发送对新的epoch的ack(包括自己的事物id,自己当前的epoch)
            QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
            writePacket(ackNewEpoch, true);
            return ZxidUtils.makeZxid(newEpoch, 0);
        } else {
            if (newEpoch > self.getAcceptedEpoch()) {
                self.setAcceptedEpoch(newEpoch);
            }
            if (qp.getType() != Leader.NEWLEADER) {
                LOG.error("First packet should have been NEWLEADER");
                throw new IOException("First packet should have been NEWLEADER");
            }
            return qp.getZxid();
        }
    }

到这里我停一下,follower既然向leader发送了消息,我们看下leader端是如何处理的


上面我提到服务端创建线程类LearnerHandler来处理follower的请求,我们现在看下LearnerHandler的run方法

LearnerHandler.run

LearnerHandler处理和follower之间的所有通信数据,代码很长,下面我分段进行讲解,我先看第一段,接受follower发送来的epoch

 public void run() {
        try {
            //把代表follower的LearnerHandler加入到learnerMaster的follower列表中
            learnerMaster.addLearnerHandler(this);
            tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();
           //初始化输入流和输出流
            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);

            QuorumPacket qp = new QuorumPacket();
            //通过输入流读取来自follower发送的来的一个epoch信息
            ia.readRecord(qp, "packet");

            messageTracker.trackReceived(qp.getType());
            if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
                LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());

                return;
            }

            if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
                throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
            }
            //获取消息体
            byte[] learnerInfoData = qp.getData();
            if (learnerInfoData != null) {
                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                if (learnerInfoData.length >= 8) {
                    //从消息体中获得follower的sid
                    this.sid = bbsid.getLong();
                }
                if (learnerInfoData.length >= 12) {
                     //获取protocolVersion
                    this.version = bbsid.getInt(); // protocolVersion
                }
                if (learnerInfoData.length >= 20) {
                     //获取configVersion 
                    long configVersion = bbsid.getLong();
                    if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
                        throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                    }
                }
            } else {
                this.sid = learnerMaster.getAndDecrementFollowerCounter();
            }
           //根据sid获取follower配置的ip和端口信息
            String followerInfo = learnerMaster.getPeerInfo(this.sid);
            if (followerInfo.isEmpty()) {
                LOG.info(
                    "Follower sid: {} not in the current config {}",
                    this.sid,
                    Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
            } else {
                LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
            }

            if (qp.getType() == Leader.OBSERVERINFO) {
                learnerType = LearnerType.OBSERVER;
            }

            learnerMaster.registerLearnerHandlerBean(this, sock);
              //getEpochFromZxid 获取客户端发送的epoch
            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
       long peerLastZxid;
            StateSummary ss = null;
            long zxid = qp.getZxid();
           // learnerMaster.getEpochToPropose:leader QuorumPeer lead在getEpochToPropose方法上会等待过半数的以上的participant到来,才会进行执行lead剩余的代码
          //每一个follower到来,只要他们在同一个轮次,这个follower就会更改leader中participant个数的状态,如果follower加上leader的数量过半了,那么leader的QuorumPeer线程就会跳出等待,继续执行,同时返回新的epoch,表示集群进入新的工作轮次
            long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);

第二段 Leader发送新生成的epoch给到follower

           //根据新生成的epoch生成leader新zxid
          long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

            if (this.getVersion() < 0x10000) {
                // we are going to have to extrapolate the epoch information
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                // fake the message
                learnerMaster.waitForEpochAck(this.getSid(), ss);
            } else {
                byte[] ver = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
                 //创建消息发送体 发送新的leader zxid给follower
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
               //通过把socket把消息发送出去
                oa.writeRecord(newEpochPacket, "packet");
                messageTracker.trackSent(Leader.LEADERINFO);
                bufferedOutput.flush();
               //读取follower对leader新生成的epoch的ack
                QuorumPacket ackEpochPacket = new QuorumPacket();
                ia.readRecord(ackEpochPacket, "packet");
                messageTracker.trackReceived(ackEpochPacket.getType());
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
                    return;
                }

这个时候我再回到Leader的lead方法,当主线程从getEpochToPropose返回后发生了什么

            //获得新生成的epoch
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
            
             //epoch生成zxid
            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

            synchronized (this) {
                lastProposed = zk.getZxid();
            }
             //生成宣称自己是leader的报文
            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);

            if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
            }

            QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
            QuorumVerifier curQV = self.getQuorumVerifier();
            if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
               
                try {
                    LOG.debug(String.format("set lastSeenQuorumVerifier to currentQuorumVerifier (%s)", curQV.toString()));
                    QuorumVerifier newQV = self.configFromString(curQV.toString());
                    newQV.setVersion(zk.getZxid());
                    self.setLastSeenQuorumVerifier(newQV, true);
                } catch (Exception e) {
                    throw new IOException(e);
                }
            }

            newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
            if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
                newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }

           //等待过半数的participant对新epoch的ack信息
            waitForEpochAck(self.getId(), leaderStateSummary);

这个时候leader在等待有过半数的participant对新epoch的ack

我们在回到LearnerHandler看下当读取到follower发送来的epoch的ack后发生了什么

 ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
 ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
 //当收到follower发送来的对新epoch的ack后调用leader的waitForEpochAck方法
    learnerMaster.waitForEpochAck(this.getSid(), ss);
waitForEpochAck

我们分析下waitForEpochAck,其实它的代码比较简单就是判断有没没有过半数的participant对新epoch进行ack,如果有,那么这个这个新epoch就在整个集群中生效了,成为合法的大家公认的在本轮次使用的epoch

 public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
    // electingFollowers 保存了已经对新epoch进行ack的participant数量
        synchronized (electingFollowers) {
            if (electionFinished) {
                return;
            }
            if (ss.getCurrentEpoch() != -1) {
                if (ss.isMoreRecentThan(leaderStateSummary)) {
                    throw new IOException("Follower is ahead of the leader, leader summary: "
                                          + leaderStateSummary.getCurrentEpoch()
                                          + " (current epoch), "
                                          + leaderStateSummary.getLastZxid()
                                          + " (last zxid)");
                }
                if (ss.getLastZxid() != -1 && isParticipant(id)) {
                    electingFollowers.add(id);
                }
            }
            QuorumVerifier verifier = self.getQuorumVerifier();
             //如果有过半数的participant对new epoch进行了ack,那么本轮的选举正式完成
            if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
                electionFinished = true;
                electingFollowers.notifyAll();
            } else {
              //如果还没有过半数的participant对new epoch 进行ack那么线程进入wait等待
                long start = Time.currentElapsedTime();
                long cur = start;
                long end = start + self.getInitLimit() * self.getTickTime();
                while (!electionFinished && cur < end) {
                    electingFollowers.wait(end - cur);
                    cur = Time.currentElapsedTime();
                }
                if (!electionFinished) {
                    throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
                }
            }
        }
    }

我们再回到follower端,当follower发送完对new epoch的ack后就会进入zab synchronization阶段,我看下 follower端syncWithLeader的实现,同样这个方法比较长我们分段分析


  protected void syncWithLeader(long newLeaderZxid) throws Exception {
          //预生产对leader的LEADERINFO的ack报文
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

        QuorumVerifier newLeaderQV = null;

        // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
        // For SNAP and TRUNC the snapshot is needed to save that history
        boolean snapshotNeeded = true;
        boolean syncSnapshot = false;
       //读取来自leader的数据同步指令
        readPacket(qp);

这个时候我们再回到LearnerHandler,当new epoch被过半数的participant接受后,LearnerHandler进入syncFollower

LearnerHandler syncFollower

syncFollower方法根据follower的f_zxid和leader自己已经处理的zxid 来决定如何恢复follower的数据

boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
        /*
         * When leader election is completed, the leader will set its
         * lastProcessedZxid to be (epoch < 32). There will be no txn associated
         * with this zxid.
         *
         * The learner will set its lastProcessedZxid to the same value if
         * it get DIFF or SNAP from the learnerMaster. If the same learner come
         * back to sync with learnerMaster using this zxid, we will never find this
         * zxid in our history. In this case, we will ignore TRUNC logic and
         * always send DIFF if we have old enough history
         */
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
        // Keep track of the latest zxid which already queued
        long currentZxid = peerLastZxid;
        boolean needSnap = true;
       //获取leader的zk数据库
        ZKDatabase db = learnerMaster.getZKDatabase();
        boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
        ReentrantReadWriteLock lock = db.getLogLock();
        ReadLock rl = lock.readLock();
        try {
            rl.lock();
             //leader目前已经处理的最大事物id
            long maxCommittedLog = db.getmaxCommittedLog();
             //leader目前已经处理的同时还在事物队列中的最小事物id
            long minCommittedLog = db.getminCommittedLog();

            //leader目前已经处理的且被ack的最新事物id
            //当maxCommittedLog>lastProcessedZxid时候
           //说当前leader 有一些事物在log中还没有被集群其他机器ack
            long lastProcessedZxid = db.getDataTreeLastProcessedZxid();

            LOG.info("Synchronizing with Learner sid: {} maxCommittedLog=0x{}"
                     + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
                     + " peerLastZxid=0x{}",
                     getSid(),
                     Long.toHexString(maxCommittedLog),
                     Long.toHexString(minCommittedLog),
                     Long.toHexString(lastProcessedZxid),
                     Long.toHexString(peerLastZxid));

            if (db.getCommittedLog().isEmpty()) {
                /*
                 * It is possible that committedLog is empty. In that case
                 * setting these value to the latest txn in learnerMaster db
                 * will reduce the case that we need to handle
                 *
                 * Here is how each case handle by the if block below
                 * 1. lastProcessZxid == peerZxid -> Handle by (2)
                 * 2. lastProcessZxid < peerZxid -> Handle by (3)
                 * 3. lastProcessZxid > peerZxid -> Handle by (5)
                 */
                minCommittedLog = lastProcessedZxid;
                maxCommittedLog = lastProcessedZxid;
            }

            /*
            * 下面的英文注释给出了syncFollower要处理的几种数据同步方案
             * Here are the cases that we want to handle
             *
             * 1. Force sending snapshot (for testing purpose)
             //如果follower端的事物id已经和master 事物id相同,那么直接给follower发送一个空的diff状态
             * 2. Peer and learnerMaster is already sync, send empty diff          
              //如果follower的zxid大于master的zxid,那么发送trunc消息,指示follower删除对应zxid所关联的事物,
             //但是如果follower发送来的zxid为newEpochZxid说明这个follower本身还没有处理过任何事物。那么不能发送trunc
             * 3. Follower has txn that we haven't seen. This may be old leader
             *    so we need to send TRUNC. However, if peer has newEpochZxid,
             *    we cannot send TRUNC since the follower has no txnlog
              //如果follower的zxid在leader的committedLog 范围之内,我们需要发送diff
             * 4. Follower is within committedLog range or already in-sync.
             *    We may need to send DIFF or TRUNC depending on follower's zxid
             *    We always send empty DIFF if follower is already in-sync
              //如果follower的zxid小于minCommittedLog那么需要使用leader的commitLog和磁盘上的事物信息来恢复follower端的数据,
             //如果失败,直接使用snap的方式同步数据
             * 5. Follower missed the committedLog. We will try to use on-disk
             *    txnlog + committedLog to sync with follower. If that fail,
             *    we will send snapshot
             */
         //下面分析下上面提到的5条follower数据同步准则

            if (forceSnapSync) {
                // Force learnerMaster to use snapshot to sync with follower
                LOG.warn("Forcing snapshot sync - should not see this in production");
            } else if (lastProcessedZxid == peerLastZxid) {
                // Follower is already sync with us, send empty diff
                //follower端的zxid和leader的zxid相同,那么直接给follower发送一个diff指令,后面不会发送任何数据
                LOG.info(
                    "Sending DIFF zxid=0x{} for peer sid: {}",
                    Long.toHexString(peerLastZxid),
                    getSid());
                queueOpPacket(Leader.DIFF, peerLastZxid);
                needOpPacket = false;
                needSnap = false;
            } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
               //如果follower的zxid大于leader已经提交的最大的zxid,那么向follower端发送trunc命令,
              //指示follower端把自己数据库中多存储的事物给删除了,只保留到事物id为maxCommittedLog的那些事物
                // Newer than committedLog, send trunc and done
                LOG.debug(
                    "Sending TRUNC to follower zxidToSend=0x{} for peer sid:{}",
                    Long.toHexString(maxCommittedLog),
                    getSid());
                queueOpPacket(Leader.TRUNC, maxCommittedLog);
                currentZxid = maxCommittedLog;
                needOpPacket = false;
                needSnap = false;
            } else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
                //如果follower的fzxid在maxCommittedLog和minCommittedLog之前,那么说明follower当前的事物落后于leader,需要把leader在(fzxid,maxCommittedLog]之间事物发送给follower
                // Follower is within commitLog range
                LOG.info("Using committedLog for peer sid: {}", getSid());
                Iterator<Proposal> itr = db.getCommittedLog().iterator();
                //把follower端缺失的事物保存到带同步队列中
                currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
                needSnap = false;
            } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
                // Use txnlog and committedLog to sync
               //如果follower的fzxid小于minCommittedLog,那么说明follower落后于leader比较多,需要把leader在(fzxid,maxCommittedLog]之间的事物发送给follower,
            //这些事物有一部分[minCommittedLog,maxCommittedLog]是leader缓存在内存中committedLog,另一部分(fzxid,minCommittedLog)事物在log中,需要从这两个部分中去分别恢复数据

                // Calculate sizeLimit that we allow to retrieve txnlog from disk
                long sizeLimit = db.calculateTxnLogSizeLimit();
                // This method can return empty iterator if the requested zxid
                // is older than on-disk txnlog
              //根据fzxid和sizeLimit来获取事物log文件迭代器
                Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
                if (txnLogItr.hasNext()) {
                    LOG.info("Use txnlog and committedLog for peer sid: {}", getSid());
                    //获取第一个需要同步都follower的zxid
                    currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);

                    if (currentZxid < minCommittedLog) {
                        //如果currentZxid小于minCommittedLog,那么直接发snap给到follower,不需要通过diff的方式了
                        LOG.info(
                            "Detected gap between end of txnlog: 0x{} and start of committedLog: 0x{}",
                            Long.toHexString(currentZxid),
                            Long.toHexString(minCommittedLog));
                        currentZxid = peerLastZxid;
                        // Clear out currently queued requests and revert
                        // to sending a snapshot.
                        queuedPackets.clear();
                        needOpPacket = true;
                    } else {
                        LOG.debug("Queueing committedLog 0x{}", Long.toHexString(currentZxid));
               
                        Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
         //queueCommittedProposals正常会把log中的比fzxid大的事物id都放入带同步到follower队列中
         //但是这里面也可能会出现一些异常的情况,就是fzxid在log事物中,那么这个时候需要向follower发送一个trunc命令,当然fzxid出现在log事物中那么向follower发送diff命令。
         //还有一种情况是log的事物id大于fzxid单是发现他们不是同一个epoch,这个时候需要对follower做snap 全量数据的同步
                        currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
                        needSnap = false;
                    }
                }
                // closing the resources
                if (txnLogItr instanceof TxnLogProposalIterator) {
                    TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
                    txnProposalItr.close();
                }
            } else {
                LOG.warn(
                    "Unhandled scenario for peer sid: {} maxCommittedLog=0x{}"
                        + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
                        + " peerLastZxid=0x{} txnLogSyncEnabled={}",
                    getSid(),
                    Long.toHexString(maxCommittedLog),
                    Long.toHexString(minCommittedLog),
                    Long.toHexString(lastProcessedZxid),
                    Long.toHexString(peerLastZxid),
                    txnLogSyncEnabled);
            }
            if (needSnap) {
                currentZxid = db.getDataTreeLastProcessedZxid();
            }

            LOG.debug("Start forwarding 0x{} for peer sid: {}", Long.toHexString(currentZxid), getSid());
            leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);
        } finally {
            rl.unlock();
        }

        if (needOpPacket && !needSnap) {
           //直接给follower发送snap来做全量数据恢复
            // This should never happen, but we should fall back to sending
            // snapshot just in case.
            LOG.error("Unhandled scenario for peer sid: {} fall back to use snapshot",  getSid());
            needSnap = true;
        }
         //返回是不是需要做snap数据同步
        return needSnap;
    }

我们在回到LeaderHandler的主线程,看看在确定了是不是需要按照snap的方式恢复follower数据后发生了什么

 //syncFollower上面我们已经解析了,会返回是不是需要按照snap的方式去恢复follower的数据
 //如果needSnap是false,那么leader可能是按照diff的方式把待恢复的事物放在了queuedPackets中,
 boolean needSnap = syncFollower(peerLastZxid, learnerMaster);

            // syncs between followers and the leader are exempt from throttling because it
            // is importatnt to keep the state of quorum servers up-to-date. The exempted syncs
            // are counted as concurrent syncs though
             //对于observer同步数据时候 需要限流
            boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
            /* if we are not truncating or sending a diff just send a snapshot */
            if (needSnap) {
               //如果needSnap为true
               // syncThrottler 是同步限流器
                syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
                syncThrottler.beginSync(exemptFromThrottle);
                ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
                try {
                  //获得leader最新已经处理的zxid
                    long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
                    //向follower发送snap同步数据指令
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    messageTracker.trackSent(Leader.SNAP);
                    bufferedOutput.flush();

                    LOG.info(
                        "Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                            + "send zxid of db as 0x{}, {} concurrent snapshot sync, "
                            + "snapshot sync was {} from throttle",
                        Long.toHexString(peerLastZxid),
                        Long.toHexString(leaderLastZxid),
                        Long.toHexString(zxidToSend),
                        syncThrottler.getSyncInProgress(),
                        exemptFromThrottle ? "exempt" : "not exempt");
                    // Dump data to peer
                   //leader把本地数据库发送给follower
                    learnerMaster.getZKDatabase().serializeSnapshot(oa);
                    oa.writeString("BenWasHere", "signature");
                    //数据冲刷到网络上
                    bufferedOutput.flush();
                } finally {
                    ServerMetrics.getMetrics().SNAP_COUNT.add(1);
                }
            } else {
                syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
                syncThrottler.beginSync(exemptFromThrottle);
                ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
                ServerMetrics.getMetrics().DIFF_COUNT.add(1);
            }

            LOG.debug("Sending NEWLEADER message to {}", sid);
            // the version of this quorumVerifier will be set by leader.lead() in case
            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if

              //下面是Leader向follower发送 我是Leader的通知,并且等待follower的ack
            // we got here, so the version was set
            if (getVersion() < 0x10000) {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();

            // Start thread that blast packets in the queue to learner
           //把待发送到follower的暂存在queuedPackets的数据使用单独的线程发送出去
            startSendingPackets();

             //读取follower端的发送回来的ack
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");

            messageTracker.trackReceived(qp.getType());
            if (qp.getType() != Leader.ACK) {
                LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
                return;
            }

            LOG.debug("Received NEWLEADER-ACK message from {}", sid);
            //等待有过半数的participant确认了Leader的NEWLEADER请求,
           //这个时候leader的quorumPeer也在等待follower对自己NEWLEADER的确认
            learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());

 syncLimitCheck.start();
            // sync ends when NEWLEADER-ACK is received
            syncThrottler.endSync();
            if (needSnap) {
                ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
            } else {
                ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
            }
            syncThrottler = null;

            // now that the ack has been processed expect the syncLimit
            sock.setSoTimeout(learnerMaster.syncTimeout());

            /*
             * Wait until learnerMaster starts up
             */
            //当有过半数的participant确认了Leader的NEWLEADER地位,那么LearnerHandler就开始等待Leader事物处理引擎的启动
            learnerMaster.waitForStartup();
          //当leader的server启动完成后,下面的代码就是开始处理具体的业务请求了,我们在后面会解析  
Follower.syncWithLeader

下面我在说回Follower,看看follower的syncWithLeader后面发生了什么,这个方法也是巨长,我们先分析数据同步的这一块

 protected void syncWithLeader(long newLeaderZxid) throws Exception {
        QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
        QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

        QuorumVerifier newLeaderQV = null;

        // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
        // For SNAP and TRUNC the snapshot is needed to save that history
        boolean snapshotNeeded = true;
        boolean syncSnapshot = false;
        //读取来自Leader发送过来的数据同步指令
        readPacket(qp);
        Deque<Long> packetsCommitted = new ArrayDeque<>();
        Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
        synchronized (zk) {
            if (qp.getType() == Leader.DIFF) {
               //得到leader 发送的diff数据同步指令
                LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                self.setSyncMode(QuorumPeer.SyncMode.DIFF);
                snapshotNeeded = false;
            } else if (qp.getType() == Leader.SNAP) {
               //得到leader 发送的snap数据同步指令

                self.setSyncMode(QuorumPeer.SyncMode.SNAP);
                LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
                // The leader is going to dump the database
                // db is clear as part of deserializeSnapshot()
                //follower本地zk数据库直接从leader发送来的snap数据流反序列结果
                zk.getZKDatabase().deserializeSnapshot(leaderIs);
                // ZOOKEEPER-2819: overwrite config node content extracted
                // from leader snapshot with local config, to avoid potential
                // inconsistency of config node content during rolling restart.
                if (!self.isReconfigEnabled()) {
                    LOG.debug("Reset config node content from local config after deserialization of snapshot.");
                    zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
                }
                String signature = leaderIs.readString("signature");
                if (!signature.equals("BenWasHere")) {
                    LOG.error("Missing signature. Got {}", signature);
                    throw new IOException("Missing signature");
                }
                //设置本地库最新处理的事物id
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

                // immediately persist the latest snapshot when there is txn log gap
                syncSnapshot = true;
            } else if (qp.getType() == Leader.TRUNC) {
               //接受leader发送来的trunc指令,进行无效事物的删除
                //we need to truncate the log to the lastzxid of the leader
                self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
                LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
                //删除无效的事物id
                boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
                if (!truncated) {
                    // not able to truncate the log
                    LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
                    ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
                }
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

            } else {
                LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
                ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
            }
         //初始化本地zk库的/zookeeper/config节点的值   
         zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier()); 
         //创建LearnerSessionTracker,关于  SessionTracker的作用我在之前的源码中有专门解析 
         zk.createSessionTracker();

            long lastQueued = 0;

            // in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
            // we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
            // we need to make sure that we don't take the snapshot twice.
            boolean isPreZAB1_0 = true;
            //If we are not going to take the snapshot be sure the transactions are not applied in memory
            // but written out to the transaction log
            boolean writeToTxnLog = !snapshotNeeded;
            TxnLogEntry logEntry;
            // we are now going to start getting transactions to apply followed by an UPTODATE
           //这个循环就是处理接受来自leader用来恢复follower数据的proposal,LEADERINFO和UPTODATE
            outerLoop:
            while (self.isRunning()) {
                 //等待leader发送来的消息,接受到的第一个消息应该为UPTODATE
                readPacket(qp);
                switch (qp.getType()) {
                case Leader.PROPOSAL:
                    PacketInFlight pif = new PacketInFlight();
                    logEntry = SerializeUtils.deserializeTxn(qp.getData());
                    pif.hdr = logEntry.getHeader();
                    pif.rec = logEntry.getTxn();
                    pif.digest = logEntry.getDigest();
                    if (pif.hdr.getZxid() != lastQueued + 1) {
                        LOG.warn(
                            "Got zxid 0x{} expected 0x{}",
                            Long.toHexString(pif.hdr.getZxid()),
                            Long.toHexString(lastQueued + 1));
                    }
                    lastQueued = pif.hdr.getZxid();

                    if (pif.hdr.getType() == OpCode.reconfig) {
                        SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
                        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
                        self.setLastSeenQuorumVerifier(qv, true);
                    }

                    packetsNotCommitted.add(pif);
                    break;
                //正常来说每一个proposal后面都会有一个commit指令
                case Leader.COMMIT:
                case Leader.COMMITANDACTIVATE:
                    pif = packetsNotCommitted.peekFirst();
                    if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
                        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
                        boolean majorChange = self.processReconfig(
                            qv,
                            ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
                            true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    }
                    if (!writeToTxnLog) {
                        if (pif.hdr.getZxid() != qp.getZxid()) {
                            LOG.warn(
                                "Committing 0x{}, but next proposal is 0x{}",
                                Long.toHexString(qp.getZxid()),
                                Long.toHexString(pif.hdr.getZxid()));
                        } else {
                          //把proposal应用到本地数据库
                            zk.processTxn(pif.hdr, pif.rec);
                            packetsNotCommitted.remove();
                        }
                    } else {
                        packetsCommitted.add(qp.getZxid());
                    }
                    break;
                case Leader.INFORM:
                case Leader.INFORMANDACTIVATE:
                    PacketInFlight packet = new PacketInFlight();

                    if (qp.getType() == Leader.INFORMANDACTIVATE) {
                        ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
                        long suggestedLeaderId = buffer.getLong();
                        byte[] remainingdata = new byte[buffer.remaining()];
                        buffer.get(remainingdata);
                        logEntry = SerializeUtils.deserializeTxn(remainingdata);
                        packet.hdr = logEntry.getHeader();
                        packet.rec = logEntry.getTxn();
                        packet.digest = logEntry.getDigest();
                        QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData()));
                        boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    } else {
                        logEntry = SerializeUtils.deserializeTxn(qp.getData());
                        packet.rec = logEntry.getTxn();
                        packet.hdr = logEntry.getHeader();
                        packet.digest = logEntry.getDigest();
                        // Log warning message if txn comes out-of-order
                        if (packet.hdr.getZxid() != lastQueued + 1) {
                            LOG.warn(
                                "Got zxid 0x{} expected 0x{}",
                                Long.toHexString(packet.hdr.getZxid()),
                                Long.toHexString(lastQueued + 1));
                        }
                        lastQueued = packet.hdr.getZxid();
                    }
                    if (!writeToTxnLog) {
                        // Apply to db directly if we haven't taken the snapshot
                        zk.processTxn(packet.hdr, packet.rec);
                    } else {
                        packetsNotCommitted.add(packet);
                        packetsCommitted.add(qp.getZxid());
                    }

                    break;
                case Leader.UPTODATE:
                  //接受到来自leader的UPTODATE的信息
                    LOG.info("Learner received UPTODATE message");
                    if (newLeaderQV != null) {
                        boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
                        if (majorChange) {
                            throw new Exception("changes proposed in reconfig");
                        }
                    }
                    if (isPreZAB1_0) {
                        zk.takeSnapshot(syncSnapshot);
                        self.setCurrentEpoch(newEpoch);
                    }
                   
                    self.setZooKeeperServer(zk);
                    self.adminServer.setZooKeeperServer(zk);
                   //跳出循环,完成数据同步,准备启动本地的zk服务
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
                    // means this is Zab 1.0
                    //接受到了leader的 NEWLEADER 信号
                    LOG.info("Learner received NEWLEADER message");
                    if (qp.getData() != null && qp.getData().length > 1) {
                        try {
                           //从leader发送来的消息中获得机器机器的信息
                            QuorumVerifier qv = self.configFromString(new String(qp.getData()));
                            self.setLastSeenQuorumVerifier(qv, true);
                            newLeaderQV = qv;
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    if (snapshotNeeded) {
                        zk.takeSnapshot(syncSnapshot);
                    }
                    //设置currentEpoch
                    self.setCurrentEpoch(newEpoch);
                    writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
                    isPreZAB1_0 = false;
                     //给leader的 NEWLEADER发送ack消息
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }
       //发送对leader UPTODATE的ack
      ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
        writePacket(ack, true);
        sock.setSoTimeout(self.tickTime * self.syncLimit);
        self.setSyncMode(QuorumPeer.SyncMode.NONE);
        //启动follower的执行引擎,
        zk.startup();

follower执行引擎启动

在收到了来自服务端的UPTODATE的消息后,follower就会进入启动执行引擎的过程:初始化请求处理链,启动sessionTracker

protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
        syncProcessor.start();
    }
follower 处理来自leader的请求

followLeader方法处理来自leader请求的代码片段,我会在zk集群请求处理过程中去解析

  while (this.isRunning()) {
                    //读取leader发送来的消息
                    readPacket(qp);
                    //处理接受的leader消息
                    processPacket(qp);
                }
Leader 执行引擎启动

当leader收到过半数的对自己LEADERINFO的ack之后,那么就会启动zk的执行引擎包括创建session tracker,初始化请求处理链,下面是leader请求处理链的初始化过程,我会在下一篇解析zk处理请求的时候详细解析他们

 protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
        commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
        proposalProcessor.initialize();
        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
        prepRequestProcessor.start();
        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
        //处理节点为container类型的线程类
        setupContainerManager();
    }
Leader QuorumPeer

leader 执行引擎启动完成之后,leader主线程进入定期检查各个follower是不是处于同步的状态任务

LearnerHandler 请求处理

当Leader启动完成之后,对应的LearnerHandler进入请求处理代码,我会在下一遍文章中去解析这个过程

 while (true) {
                qp = new QuorumPacket();
                ia.readRecord(qp, "packet");
                messageTracker.trackReceived(qp.getType());

                long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
                if (qp.getType() == Leader.PING) {
                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                }
                tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();

                packetsReceived.incrementAndGet();

                ByteBuffer bb;
                long sessionId;
                int cxid;
                int type;

                switch (qp.getType()) {
                case Leader.ACK:
                    if (this.learnerType == LearnerType.OBSERVER) {
                        LOG.debug("Received ACK from Observer {}", this.sid);
                    }
                    syncLimitCheck.updateAck(qp.getZxid());
                    learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                    break;
                case Leader.PING:
                    // Process the touches
                    ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                    DataInputStream dis = new DataInputStream(bis);
                    while (dis.available() > 0) {
                        long sess = dis.readLong();
                        int to = dis.readInt();
                        learnerMaster.touch(sess, to);
                    }
                    break;
                case Leader.REVALIDATE:
                    ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
                    learnerMaster.revalidateSession(qp, this);
                    break;
                case Leader.REQUEST:
                    bb = ByteBuffer.wrap(qp.getData());
                    sessionId = bb.getLong();
                    cxid = bb.getInt();
                    type = bb.getInt();
                    bb = bb.slice();
                    Request si;
                    if (type == OpCode.sync) {
                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                    } else {
                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                    }
                    si.setOwner(this);
                    learnerMaster.submitLearnerRequest(si);
                    requestsReceived.incrementAndGet();
                    break;
                default:
                    LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                    break;
                }
            }
        } catch (IOException e) {
            if (sock != null && !sock.isClosed()) {
                LOG.error("Unexpected exception causing shutdown while sock still open", e);
                //close the socket to make sure the
                //other side can see it being close
                try {
                    sock.close();
                } catch (IOException ie) {
                    // do nothing
                }
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected exception in LearnerHandler.", e);
        } catch (SyncThrottleException e) {
            LOG.error("too many concurrent sync.", e);
            syncThrottler = null;
        } catch (Exception e) {
            LOG.error("Unexpected exception in LearnerHandler.", e);
            throw e;
        } finally {
            if (syncThrottler != null) {
                syncThrottler.endSync();
                syncThrottler = null;
            }
            String remoteAddr = getRemoteAddress();
            LOG.warn("******* GOODBYE {} ********", remoteAddr);
            messageTracker.dumpToLog(remoteAddr);
            shutdown();
        }

至此就完成了对zk Leader,Follower初始化的分析

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355

推荐阅读更多精彩内容

  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,567评论 0 11
  • 彩排完,天已黑
    刘凯书法阅读 4,218评论 1 3
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 125,068评论 2 7