Zookeeper(七)-服务端集群模式-启动流程-2

接上一节继续

10. Follower接收同步数据(Learner.syncWithLeader)

protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
    QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
    QuorumPacket qp = new QuorumPacket();
    long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
    boolean snapshotNeeded = true;
    // 读取数据包
    readPacket(qp);
    // 已经提交的packets
    LinkedList<Long> packetsCommitted = new LinkedList<Long>();
    // 未提交的packets
    LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
    synchronized (zk) {
        if (qp.getType() == Leader.DIFF) {
            LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
            snapshotNeeded = false;
        } else if (qp.getType() == Leader.SNAP) {
            LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
            // The leader is going to dump the database clear our own database and read
            // 清空当前内存数据
            zk.getZKDatabase().clear();
            // 反序列化从leader传递过来的snapshotlog
            zk.getZKDatabase().deserializeSnapshot(leaderIs);
            String signature = leaderIs.readString("signature");
            if (!signature.equals("BenWasHere")) {
                LOG.error("Missing signature. Got " + signature);
                throw new IOException("Missing signature");                   
            }
            // 设置lastZxid
            zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
        } else if (qp.getType() == Leader.TRUNC) {
            // 当前server zxid > leader zxid的情况,清除leader zxid之后的log
            //we need to truncate the log to the lastzxid of the leader
            LOG.warn("Truncating log to get in sync with the leader 0x" + Long.toHexString(qp.getZxid()));
            boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
            if (!truncated) {
                // not able to truncate the log
                LOG.error("Not able to truncate the log " + Long.toHexString(qp.getZxid()));
                System.exit(13);
            }
            zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
        } else {
            LOG.error("Got unexpected packet from leader "
                    + qp.getType() + " exiting ... " );
            System.exit(13);

        }
        zk.createSessionTracker();
        
        long lastQueued = 0;
        boolean isPreZAB1_0 = true;
        boolean writeToTxnLog = !snapshotNeeded;
        outerLoop:
        while (self.isRunning()) {
            // 循环读取
            readPacket(qp);
            switch(qp.getType()) {
            case Leader.PROPOSAL:
                // 包装消息头-Txnheader 消息体-Recode
                PacketInFlight pif = new PacketInFlight();
                pif.hdr = new TxnHeader();
                // 反序列化消息体
                pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                if (pif.hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x" + Long.toHexString(pif.hdr.getZxid())
                        + " expected 0x" + Long.toHexString(lastQueued + 1));
                }
                lastQueued = pif.hdr.getZxid();
                // PacketInFlight放入未提交链表
                packetsNotCommitted.add(pif);
                break;
            case Leader.COMMIT:
                if (!writeToTxnLog) {
                    pif = packetsNotCommitted.peekFirst();
                    // 不相等说明不连续
                    if (pif.hdr.getZxid() != qp.getZxid()) {
                        LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                    } else {
                        // 更新DataTree
                        zk.processTxn(pif.hdr, pif.rec);
                        // 从notCommitted中删除
                        packetsNotCommitted.remove();
                    }
                } else {
                    packetsCommitted.add(qp.getZxid());
                }
                break;
            ......
            case Leader.UPTODATE:
                // 1.0版本之前
                if (isPreZAB1_0) {
                    zk.takeSnapshot();
                    self.setCurrentEpoch(newEpoch);
                }
                self.cnxnFactory.setZooKeeperServer(zk);                
                break outerLoop;
            case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                // 创建updatingEpoch文件,说明正在进行newLeader数据同步,如果已经存在该文件则创建文件失败抛出异常
                File updating = new File(self.getTxnFactory().getSnapDir(), QuorumPeer.UPDATING_EPOCH_FILENAME);
                if (!updating.exists() && !updating.createNewFile()) {
                    throw new IOException("Failed to create " + updating.toString());
                }
                // Diff时不进行snapshot
                if (snapshotNeeded) {
                    // 已经同步结束,生成snapshot文件
                    zk.takeSnapshot();
                }
                // 更新Epoch文件
                self.setCurrentEpoch(newEpoch);
                if (!updating.delete()) {
                    throw new IOException("Failed to delete " + updating.toString());
                }
                writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
                isPreZAB1_0 = false;
                // 发送ACK
                writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                break;
            }
        }
    }
    ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
    // 发送ack
    writePacket(ack, true);
    ......
}
  • readPacket(qp)读取leader发送的:QuorumPacket type:SNAP/TRUNC/DIFF;
  • readPacket(qp)循环读取leader发送数据,先是同步数据,然后是NEWLEADER;
  • DIFF
    1. case Leader.PROPOSAL反序列化出TxnHeader/Recode,包装成PacketInFlight放入链表packetsNotCommitted;
    2. case Leader.COMMIT当前QuorumPacket zxid放入链表packetsCommitted;
  • SNAP
    1.清空内存中ZKDatabase;
    2.反序列化从leader传递过来的snapshotlog;
    3.签名验证;
  • TRUNC
    1.先清空内存中ZKDatabase;
    2.删除txnlog中大于leader zxid的部分;
    3.重新loadDataBase;
  • case Leader.NEWLEADER数据同步之后leader发送NEWLEADER
    1.创建updateEpoch文件,并在设置当前epoch后将其删除。 QuorumPeer.loadDataBase()使用此文件检测在拍摄快照之后设置当前epoch之前服务器宕机的情况;
    2.不是DIFF同步时,同步结束之后生成snapshot文件;
    3.更新epoch,更新成功之后删除updateEpoch文件;
    4.发送leader针对NEWLEADER的ACK;

11. Leader处理NEWLEADER-ACK(LearnerHandler.run)

public void run() {
    ......
    qp = new QuorumPacket();
    // 读取ack
    ia.readRecord(qp, "packet");
    if(qp.getType() != Leader.ACK){
        LOG.error("Next packet was supposed to be an ACK");
        return;
    }
    LOG.info("Received NEWLEADER-ACK message from " + getSid());
    // 等待过半的follower的ack
    leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
    syncLimitCheck.start();
    // now that the ack has been processed expect the syncLimit
    sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
    // 等待leader启动完成继续往下运行
    synchronized(leader.zk){
        while(!leader.zk.isRunning() && !this.isInterrupted()){
            leader.zk.wait(20);
        }
    }
    // leader启动后,发送UPTODATE
    queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
    ......
}
  • ia.readRecord(qp, "packet")读取NEWLEADER的ACK;
  • leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType())等待收到过半的服务器的NEWLEADER-ACK之后继续执行;
  • leader.zk.wait(20)2. Leader.lead中调用startZkServer(),zkServer启动之后state置为RUNNING,否则阻塞等待;
  • queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null))发送UPTODATE,告知follower leader已经启动完成;

12. Follower处理UPTODATE(Learner.syncWithLeader)

protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
    ......
    case Leader.UPTODATE:
        // 1.0版本之前
        if (isPreZAB1_0) {
            zk.takeSnapshot();
            self.setCurrentEpoch(newEpoch);
        }
        self.cnxnFactory.setZooKeeperServer(zk);                
        break outerLoop;
    ......
    ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
    // 发送ack
    writePacket(ack, true);
        sock.setSoTimeout(self.tickTime * self.syncLimit);
    // 启动ZooKeeper
    zk.startup();
    self.updateElectionVote(newEpoch);
    // 再以snapshot的方式同步数据时,开始同步数据到follower 收到uptodate数据包之间有可能leader也有数据变更,这时需要将这部分数据提交。
    // 经过以上的步骤之后,follower 服务启动完成。作为follower 它一方面需要处理来自客户端的请求,
    // 一方面也需要处理来自leader 的心跳数据、proposal、commit请求
    if (zk instanceof FollowerZooKeeperServer) {
        FollowerZooKeeperServer fzk = (FollowerZooKeeperServer)zk;
        for(PacketInFlight p: packetsNotCommitted) {
            // 写入日志文件
            fzk.logRequest(p.hdr, p.rec);
        }
        for(Long zxid: packetsCommitted) {
            fzk.commit(zxid);
        }
    } 
    ......
}
  • case Leader.UPTODATE
    1.LearnerZooKeeperServer设置ServerCnxnFactory,默认NIOServerCnxnFactory,可以开始接收客户端请求;
    2.循环读取leader发送数据结束;
  • writePacket(ack, true)发送leader针对数据同步的ACK;
  • zk.startup()启动ZooKeeperServer;
  • fzk.logRequest(p.hdr, p.rec)
  • fzk.commit(zxid)构造Request入队SyncRequestProcessor.queuedRequests,写入日志文件(参考Zookeeper(五)-服务端单机模式-请求处理);
  • fzk.commit(zxid)Request放入链表CommitProcessor.committedRequests(后续分析请求流程时再详细分析);
    至此follower端启动流程结束

12. Leader处理ACK(LearnerHandler.run)

public void run() {
    ......
    // 同步完毕,等待接收follower或observer请求
    while (true) {
        qp = new QuorumPacket();
        // 读取ack
        ia.readRecord(qp, "packet");
        switch (qp.getType()) {
        case Leader.ACK:
            syncLimitCheck.updateAck(qp.getZxid());
            leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
            break;
    ......
}
  • ia.readRecord(qp, "packet")读取ACK(该while循环还会处理follower或observer的请他请求,这里只分析启动过程的ACK);
  • leader.processAck此时outstandingProposals为空,直接返回;
    至此leader端启动流程结束
    ---------over----------
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容