ZooKeeper服务端启动源码 集群

集群和单机版启动类都是QuorumPeerMain,进入initializeAndRun方法

启动

  1. 解析配置文件zoo.cfg
  2. 创建并启动历史文件清理器DatadirCleanupManager
  3. 根据集群模式还是单机模式的启动
if (args.length == 1 && config.servers.size() > 0) {
  // 集群
  runFromConfig(config);
} else {
  ZooKeeperServerMain.main(args);
}

集群模式会进入if块

初始化

运行runFromConfig方法,在runFromConfig方法内部可以看到,其核心实例是QuorumPeer,而不再是单机模式的ZooKeeperServer实例,QuorumPeer实例可以看作是集群的一个节点,集群中的所有的QuorumPeer实例协作完成集群的选举、投票。

  1. 创建并配置ServerCnxnFactory,和单机版一致。

    ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
    

    cnxnFactory会赋值给quorumPeerquorumPeer.setCnxnFactory(cnxnFactory);

  2. 实例化quorumPeer并设值

    quorumPeer = getQuorumPeer();
    // 设置集群所有的peer,集群机器之间互相通信
    quorumPeer.setQuorumPeers(config.getServers());
    ...
    

    这个就是根据配置中server.id解析出来的,如

    server.1=localhost:2888:3888
    server.2=localhost:2887:3887
    server.3=localhost:2886:3886
    
  3. 创建持久化文件管理器FileTxnSnapLog,并给quorumPeer赋值

    quorumPeer.setTxnFactory(new FileTxnSnapLog(
            new File(config.getDataLogDir()),
            new File(config.getDataDir())));
    
  4. 创建内存数据库,并赋值给quorumPeer

    quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
    
  5. 初始化并启动quorumPeer

    quorumPeer.initialize();
    quorumPeer.start();
    quorumPeer.join();
    

    QuorumPeer#start方法

    //QuorumPeer#start
    public synchronized void start() {
      loadDataBase();
      cnxnFactory.start();        
      startLeaderElection();
      super.start();
    }
    

    启动quorumPeer步骤有

    • 加载内存数据库
    • 启动cnxnFactory,客户端连接的IO线程
    • 集群选举
    • 选举线程启动
    1. 集群版加载内存数据库会去分析当前的Epoch
    private long acceptedEpoch = -1;
    private long currentEpoch = -1;
    
    1. 启动cnxnFactory后,这时候客户端IO线程是没法工作的,因为在创建客户端连接的时候需要zkServer变量,处理调用链

      protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk) {
        return new NIOServerCnxn(zkServer, sock, sk, this);
      }
      

      需要等集群选举完成、数据同步完成后,为其赋值,才能开启工作

    所以先主要分析集群选举和选举线程启动

集群选举

集群选举需要当前peer与其他机器在选举端口上建立连接,然后发送投票进行选举,选举端口在配置文件中配置

server.id - This is the host:port[:port] that the server with the given id will use for the quorum protocol.

其中,第一个端口用于指定Follower服务器与Leader进行运行时通信和数据同步时所使用的端口,第二个端口则专门用于进行Leader选举过程中的投票通信,在初始化时``quorumPeer`为其赋值。

  1. 初始化投票
    QuorumPeer#startLeaderElection方法初始化投票

    • 创建当前投票,优先给自己投票
      currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());

    • 创建选举算法,默认electionType=3,也就是FastLeaderElection

      // QuorumPeer#createElectionAlgorithm
      case 3:
        qcm = createCnxnManager();
         // 监听连接
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null) {
          listener.start();
          le = new FastLeaderElection(this, qcm);
        }
      

      创建Leader选举所需的网络IO层QuorumCnxManager,同时启动对Leader选举端口的监听,等待集群中其他服务器创建连接。

调用start方法启动线程,进入run方法

  1. 注册JMX服务

    jmxQuorumBean = new QuorumBean(this);
    MBeanRegistry.getInstance().register(jmxQuorumBean, null);
    ...
    
  2. 检测当前服务器状态,并根据当前状态做处理

    switch (getPeerState()) {
      case LOOKING:
        ...
      case OBSERVING:
        ...
      case FOLLOWING:
        ...
      case LEADING:
         ...
    }
    

    集群启动状态当然是LOOKING

    private ServerState state = ServerState.LOOKING;
    

    LOOKING状态的机器需要去获取集群的Leader,如果当前没有Leader,则进入选举模式。

    setCurrentVote(makeLEStrategy().lookForLeader());
    
  3. Leader选举
    选举算法以默认的FastLeaderElection#lookForLeader为例,该方法开始新一轮Leader选举。每当QuorumPeer将其状态更改为LOOKING时,就会调用此方法,并向所有其他peers发送通知。具体选举算法单独分析。

  4. 完成选举后服务器状态为:OBSERVINGFOLLOWINGLEADING,对应角色分别是ObserverFollowerLeaderObserverFollower的区别在于Observer不参与任何投票。

角色交互

完成集群选举后,集群Leader和Followers之间需要进行数据同步,并在后续的消息处理中,Followers会将事物请求以Request的形式转发给Leader。

Follower

当节点中状态为FOLLOWING时,将设置当前角色为Follower,包括创建Follower并启动

setFollower(makeFollower(logFactory));
follower.followLeader();

Follower#followLeader方法

void followLeader() throws InterruptedException {
  ...
  QuorumServer leaderServer = findLeader();            
  try {
    connectToLeader(leaderServer.addr, leaderServer.hostname);
    long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

    // leader zxid比自己的zxid还要小,出错了
    long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
    if (newEpoch < self.getAcceptedEpoch()) {
      LOG.error("");
      throw new IOException("Error: Epoch of leader is lower");
    }
    syncWithLeader(newEpochZxid);                
    QuorumPacket qp = new QuorumPacket();
    while (this.isRunning()) {
      readPacket(qp);
      processPacket(qp);
    }
  }
  ...
}

步骤

  1. 找到当前leader,通过投票查找

    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
      if (s.id == current.getId()) {
        s.recreateSocketAddresses();
        leaderServer = s;
        break;
      }
    }
    
  2. 连接到leader,重试连接上一步找到的leader

    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);
    for (int tries = 0; tries < 5; tries++) {
      sock.connect(addr, self.tickTime * self.syncLimit);
    }
    
  3. 向leader注册,
    这一步Follower向Leader同步投票的Epoch以及Follower的自己的最新事务id、Epoch,并接受Leader的Epoch。

  4. 同步数据
    上一步Leader收到Follower最新的zxid后,根据自己的zxid决定采用哪种方式同步数据。在Learner#syncWithLeader方法中,Leader通知Follower以何种方式进行同步

    readPacket(qp);
    if (qp.getType() == Leader.DIFF) {
     // 差异化同步
      snapshotNeeded = false;
    } else if (qp.getType() == Leader.SNAP) {
      // 全量同步
      zk.getZKDatabase().clear();
      zk.getZKDatabase().deserializeSnapshot(leaderIs);
      String signature = leaderIs.readString("signature");
      if (!signature.equals("BenWasHere")) {
        LOG.error("Missing signature. Got " + signature);
        throw new IOException("Missing signature");                   
      }
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    } else if (qp.getType() == Leader.TRUNC) {
      //截断日志
      boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
      if (!truncated) {
        System.exit(13);
      }
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    } else {
      System.exit(13);
    }
    

    Follower根据同步类型,处理本地日志文件及本地数据库

    • DIFF:差异化同步
    • SNAP:全量同步
    • TRUNC:截断日志

    然后Leader开始发送数据同步

    // 数据同步知道接收到UPTODATE类型的数据包结束
    outerLoop:
    while (self.isRunning()) {
      readPacket(qp);
      switch(qp.getType()) {
        // 投票
        case Leader.PROPOSAL:
          PacketInFlight pif = new PacketInFlight();
          pif.hdr = new TxnHeader();
          pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
          if (pif.hdr.getZxid() != lastQueued + 1) {
            
          }
          lastQueued = pif.hdr.getZxid();
          packetsNotCommitted.add(pif);
          break;
        // 提交
        case Leader.COMMIT:
          if (!writeToTxnLog) {
            pif = packetsNotCommitted.peekFirst();
            if (pif.hdr.getZxid() != qp.getZxid()) {
              LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
            } else {
              zk.processTxn(pif.hdr, pif.rec);
              packetsNotCommitted.remove();
            }
          } else {
            packetsCommitted.add(qp.getZxid());
          }
          break;
        // 只有observer才能得到这种类型的包。我们将此视为接收PROPOSAL和COMMIT。
        case Leader.INFORM:
          PacketInFlight packet = new PacketInFlight();
          packet.hdr = new TxnHeader();
          packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
          // Log warning message if txn comes out-of-order
          if (packet.hdr.getZxid() != lastQueued + 1) {
            
          }
          lastQueued = packet.hdr.getZxid();
          if (!writeToTxnLog) {
            // Apply to db directly if we haven't taken the snapshot
            zk.processTxn(packet.hdr, packet.rec);
          } else {
            packetsNotCommitted.add(packet);
            packetsCommitted.add(qp.getZxid());
          }
          break;
        // 同步完成
        case Leader.UPTODATE:
          if (isPreZAB1_0) {
            zk.takeSnapshot();
            self.setCurrentEpoch(newEpoch);
          }
          self.cnxnFactory.setZooKeeperServer(zk);                
          break outerLoop;
        case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
          File updating = new File(self.getTxnFactory().getSnapDir(),
                                   QuorumPeer.UPDATING_EPOCH_FILENAME);
          if (!updating.exists() && !updating.createNewFile()) {
            throw new IOException("Failed to create " + updating.toString());
          }
          if (snapshotNeeded) {
            zk.takeSnapshot();
          }
          self.setCurrentEpoch(newEpoch);
          if (!updating.delete()) {
            throw new IOException("Failed to delete " + updating.toString());
          }
          //需要将数据写入事务日志
          writeToTxnLog = true;
          isPreZAB1_0 = false;
          writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
          break;
      }
    }
    

    同步完成后

    发送响应

    ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
    writePacket(ack, true);
    

    开始接收客户端请求,这个zk在不同角色的节点上是不同的角色,FollowerZooKeeperServerObserverZooKeeperServer

    zk.startup();
    

    还需要补充内存数据库中snapshot与log之间的差异

  5. 不断与Leader通信,同步数据

    while (this.isRunning()) {
      readPacket(qp);
      processPacket(qp);
    }
    

    Follower#processPacket方法检查在qp中接收的数据包,并根据其内容进行分发。

    protected void processPacket(QuorumPacket qp) throws IOException{
      switch (qp.getType()) {
         // 心跳
        case Leader.PING:            
          ping(qp);            
          break;
        // 事务投票
        case Leader.PROPOSAL:            
          TxnHeader hdr = new TxnHeader();
          Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
          if (hdr.getZxid() != lastQueued + 1) {
          }
          lastQueued = hdr.getZxid();
          fzk.logRequest(hdr, txn);
          break;
        // 提交事物
        case Leader.COMMIT:
          fzk.commit(qp.getZxid());
          break;
        case Leader.UPTODATE:
          LOG.error("Received an UPTODATE message after Follower started");
          break;
        case Leader.REVALIDATE:
          revalidate(qp);
          break;
        // 通知Learner服务器已经完成了Sync操作
        case Leader.SYNC:
          fzk.sync();
          break;
        default:
          LOG.error("Invalid packet type: {} received by Observer", qp.getType());
      }
    }
    

    Follower后续还需要不断与Leader通信,进行事务投票。

至此Follower开始对外提供服务。

Leader

Follower类似,

setLeader(makeLeader(logFactory));
leader.lead();

QuorumPeer#makeLeader方法,

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
  return new Leader(this, new LeaderZooKeeperServer(logFactory,
                    this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
}

Leader内部处理请求的是LeaderZooKeeperServer

Leader#lead的主要流程

  1. 加载内存数据库

    zk.loadData();
    
  2. 创建LearnerCnxAcceptor,启动等待来自新followers的连接请求的线程。

    cnxAcceptor = new LearnerCnxAcceptor();
    cnxAcceptor.start();
    

    Leader.LearnerCnxAcceptor#run方法中

    Socket s = ss.accept();
    // start with the initLimit, once the ack is processed
    // in LearnerHandler switch to the syncLimit
    s.setSoTimeout(self.tickTime * self.initLimit);
    s.setTcpNoDelay(nodelay);
    BufferedInputStream is = new BufferedInputStream(s.getInputStream());
    // 为每个Learner创建一条线程,处理投票、数据同步
    LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
    fh.start();
    
  3. 等待Leaner响应Ack

    readyToStart = true;
    long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
    
    zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
    
    synchronized(this){
      lastProposed = zk.getZxid();
    }
    
    newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
    
    waitForEpochAck(self.getId(), leaderStateSummary);
    self.setCurrentEpoch(epoch);
    
    waitForNewLeaderAck(self.getId(), zk.getZxid());
    

    准备完毕,只需要等待过半数的Leaner的回复即可对外工作,在LeanerHandler中也会调用waitForEpochAckwaitForEpochAck唤醒Leader

  4. 对外提供服务

    startZkServer();
    
  5. 心跳,和Leaner保活

至此ZooKeeper集群模式启动完毕,整个集群开始对外提供服务。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容