zookeeper单机版服务器启动

单机版服务器的启动流程图如下:


image.png

上面的过程可以分为预启动和初始化

启动过程详解

预启动
image.png

1. 统一由QuorumPeerMain作为启动类
无论单机或集群,在zkServer.cmd和zkServer.sh中都配置了QuorumPeerMain作为启动入口类。

2. 解析配置文件zoo.cfg
zoo.cfg配置运行时的基本参数,如tickTime、dataDir、clientPort等参数。

3. 创建并启动历史文件清理器DatadirCleanupManager
从3.4.0版本开始,zk增加了对事务日志和快照数据文件的定时清理。

4. 判断当前是集群模式还是单机模式
根据步骤2解析出的集群服务器地址列表来判断当前是集群模式还是单机模式,如果是单机模式,就委托给ZookeeperServerMain进行启动处理。
5. 再次进行配置文件zoo.cfg的解析
6. 创建服务器实例ZookeeperServer
zk服务器首先会进行服务器实例的创建,然后对服务器实例进行初始化,包括连接器,内存数据库和请求处理器等组件的初始化。

protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]); // 解析配置文件
        }

        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start(); // 打开清理线程

        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args); // 单机模式启动
        }
    }
    public static void main(String[] args) {
        ZooKeeperServerMain main = new ZooKeeperServerMain();
        try {
            ...
            main.initializeAndRun(args);
            ...
}
protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException e) {
            LOG.warn("Unable to register log4j JMX control", e);
        }

        ServerConfig config = new ServerConfig();
        if (args.length == 1) {
            config.parse(args[0]); 
        } else {
            config.parse(args); //解析配置文件
        }

        runFromConfig(config); //创建实例ZookeeperServer 完成初始化工作
    }

初始化

image.png
/**
     * Run from a ServerConfig.
     * @param config ServerConfig to use.
     * @throws IOException
     */
    public void runFromConfig(ServerConfig config) throws IOException {
        LOG.info("Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            // Note that this thread isn't going to be doing anything else,
            // so rather than spawning another thread, we will just call
            // run() in this thread.
            // create a file logger url from the command line args
            final ZooKeeperServer zkServer = new ZooKeeperServer(); // 会创建统计器
            // Registers shutdown handler which will be used to know the
            // server error or shutdown state changes.
            final CountDownLatch shutdownLatch = new CountDownLatch(1);
            zkServer.registerServerShutdownHandler(
                    new ZooKeeperServerShutdownHandler(shutdownLatch));

            txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                    config.dataDir)); // 创建数据管理器
            zkServer.setTxnLogFactory(txnLog);
            zkServer.setTickTime(config.tickTime); // 设置服务器tickTime
            zkServer.setMinSessionTimeout(config.minSessionTimeout);// 设置会话超时时间
            zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
            cnxnFactory = ServerCnxnFactory.createFactory(); // 根据配置系统属性zookeper.serverCnxnFactory来指定使用Zookeeper自己实现的NIO还是使用Netty框架作为Zookeeper服务端网络连接工厂, 默认NIOServerCnxnFactory
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());// 初始化主线程,打开select 并bing端口,打开NIO的Accept通知
            cnxnFactory.startup(zkServer); 
            // Watch status of ZooKeeper server. It will do a graceful shutdown
            // if the server is not running or hits an internal error.
            shutdownLatch.await();
            shutdown();

            cnxnFactory.join();
            if (zkServer.canShutdown()) {
                zkServer.shutdown(true);
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Server interrupted", e);
        } finally {
            if (txnLog != null) {
                txnLog.close();
            }
        }
    }

1. 创建服务器统计器ServerStats
ServerStats是zk服务器运行时的统计器

 public ZooKeeperServer() {
        serverStats = new ServerStats(this);
        listener = new ZooKeeperServerListenerImpl(this);
    }

2. 创建zk数据管理器FileTxnSnapLog
FileTxnSnapLog是zk上层服务器和底层数据存储的对接层,提供了一系列操作数据文件的接口,如事务日志文件和快照数据文件,Zookeeper根据zoo.cfg文件中解析出的快照数据目录dataDir和事务日志目录dataLogDir来创建FileTxnSnapLog。
3. 设置服务器tickTime和会话超时时间限制。

4.创建ServerCnxnFactory。
通过配置系统属性zookeper.serverCnxnFactory来指定使用Zookeeper自己实现的NIO还是使用Netty框架作为Zookeeper服务端网络连接工厂。

5.5. 初始化ServerCnxnFactory。

 @Override
    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
        configureSaslLogin();

        thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
        thread.setDaemon(true);
        maxClientCnxns = maxcc;
        this.ss = ServerSocketChannel.open();
        ss.socket().setReuseAddress(true);
        LOG.info("binding to port " + addr);
        ss.socket().bind(addr);
        ss.configureBlocking(false);
        ss.register(selector, SelectionKey.OP_ACCEPT);
    }

6. 启动ServerCnxnFactory主线程

@Override
    public void startup(ZooKeeperServer zks) throws IOException,
            InterruptedException {
        start(); // 启动IO线程
        setZooKeeperServer(zks);
        zks.startdata(); // 从log和snapshot恢复database和session,并重新生成一个最新的snapshot文件
        zks.startup();  // 启动sessionTracker线程,初始化IO请求的处理链。并启动每个process线程
    }

7. 恢复本地数据。
启动时,需要从本地快照数据文件和事务日志文件进行数据恢复。

 public void startdata() 
    throws IOException, InterruptedException {
        //check to see if zkDb is not null
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }  
        if (!zkDb.isInitialized()) {
            loadData();
        }
    }

8. 创建并启动会话管理器。

public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker(); //Zookeeper会创建会话管理器SessionTracker进行会话管理。
        setupRequestProcessors();// 启动processor线程, 

        registerJMX(); // 注册jms

        setState(State.RUNNING);
        notifyAll();
    }

9. 初始化Zookeeper的请求处理链
Zookeeper请求处理方式为责任链模式的实现。会有多个请求处理器依次处理一个客户端请求,在服务器启动时,会将这些请求处理器串联成一个请求处理链。

10.注册JMX服务。
Zookeeper会将服务器运行时的一些信息以JMX的方式暴露给外部。

单机版配置解析

  public static void main(String[] args) {
        ZooKeeperServerMain main = new ZooKeeperServerMain();
        try {
            main.initializeAndRun(args);
}
protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException e) {
            LOG.warn("Unable to register log4j JMX control", e);
        }

        ServerConfig config = new ServerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        } else {
            config.parse(args);
        }

        runFromConfig(config);
    }

单机版服务器启动时,接收启动参数,分为两种情况,

  • 只有一个参数: 表示为一个配置文件地址
  • 有2~4个参数: 分别表示端口、dataDir、tickTime、maxClientCnxns
/**
 * Server configuration storage.
 *
 * We use this instead of Properties as it's typed.
 *
 */
public class ServerConfig {
    ////
    //// If you update the configuration parameters be sure
    //// to update the "conf" 4letter word
    ////
    protected InetSocketAddress clientPortAddress; // 暴露给zk client的端口
    protected String dataDir;  // 快照日志目录
    protected String dataLogDir; // 事务日志目录
    protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME; 
    protected int maxClientCnxns;
    /** defaults to -1 if not set explicitly */
    protected int minSessionTimeout = -1; 用于限制客户端会话超时检测的最短时间
    /** defaults to -1 if not set explicitly */
    protected int maxSessionTimeout = -1;
}

如果配置没有显式设置minSessionTimeout 或者 maxSessionTimeout
那么最终,ZooKeeperServer获取最短,最长的sessionTimeout时,如下

 public int getMinSessionTimeout() {
        return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
    }

    public int getMaxSessionTimeout() {
        return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
    }

当启动时传入一个参数,也即使配置文件地址时:

 public void parse(String path) throws ConfigException {//只有一个参数:表示为一个配置文件地址
        QuorumPeerConfig config = new QuorumPeerConfig();
        config.parse(path);

        // let qpconfig parse the file and then pull the stuff we are
        // interested in
        readFrom(config);
    }
  /**
     * Parse a ZooKeeper configuration file
     * @param path the patch of the configuration file
     * @throws ConfigException error processing configuration
     */
    public void parse(String path) throws ConfigException {
        File configFile = new File(path);

        LOG.info("Reading configuration from: " + configFile);

        try {
            if (!configFile.exists()) {
                throw new IllegalArgumentException(configFile.toString()
                        + " file is missing");
            }

            Properties cfg = new Properties();
            FileInputStream in = new FileInputStream(configFile);
            try {
                cfg.load(in);
            } finally {
                in.close();
            }

            parseProperties(cfg);
        } catch (IOException e) {
            throw new ConfigException("Error processing " + path, e);
        } catch (IllegalArgumentException e) {
            throw new ConfigException("Error processing " + path, e);
        }
    }
 public void parseProperties(Properties zkProp)
    throws IOException, ConfigException {
        int clientPort = 0;
        String clientPortAddress = null;
        for (Entry<Object, Object> entry : zkProp.entrySet()) {
            String key = entry.getKey().toString().trim();
            String value = entry.getValue().toString().trim();
            if (key.equals("dataDir")) {
                dataDir = value;
            } else if (key.equals("dataLogDir")) {
                dataLogDir = value;
            } else if (key.equals("clientPort")) {
                clientPort = Integer.parseInt(value);
            } else if (key.equals("clientPortAddress")) {
                clientPortAddress = value.trim();
            } else if (key.equals("tickTime")) {
                tickTime = Integer.parseInt(value);
            }
  public void readFrom(QuorumPeerConfig config) {
      clientPortAddress = config.getClientPortAddress();
      dataDir = config.getDataDir();
      dataLogDir = config.getDataLogDir();
      tickTime = config.getTickTime();
      maxClientCnxns = config.getMaxClientCnxns();
      minSessionTimeout = config.getMinSessionTimeout();
      maxSessionTimeout = config.getMaxSessionTimeout();
    }

如果是2-4个参数:分别表示端口、dataDir、tickTime、maxClientCnxns

 public void parse(String[] args) {
        if (args.length < 2 || args.length > 4) {//这种parse接收参数的个数为2-4,分别表示端口、dataDir、tickTime、maxClientCnxns;
            throw new IllegalArgumentException("Invalid args:"
                    + Arrays.toString(args));
        }

        clientPortAddress = new InetSocketAddress(Integer.parseInt(args[0]));
        dataDir = args[1];
        dataLogDir = dataDir;
        if (args.length == 3) {
            tickTime = Integer.parseInt(args[2]);
        }
        if (args.length == 4) {
            maxClientCnxns = Integer.parseInt(args[3]);
        }
    }

服务器异常报警

状态监听器

ZooKeeperServerListener以及实现类ZooKeeperServerListenerImpl:

    void notifyStopping(String threadName, int errorCode);//用于在线程发生严重问题时,通知server
    private final ZooKeeperServer zkServer;

    ZooKeeperServerListenerImpl(ZooKeeperServer zkServer) {
        this.zkServer = zkServer;
    }

    @Override
    public void notifyStopping(String threadName, int exitCode) {
        LOG.info("Thread {} exits, error code {}", threadName, exitCode);
        zkServer.setState(State.ERROR);//ZooKeeperServer设置对应状态即可
    }

这里只是调用 zkServer.setState(State.ERROR),具体监听到ERROR时该怎么干,是在zkserver中完成的事情,这里只监听和通知,

  /**
 * Represents critical thread. When there is an uncaught exception thrown by the
 * thread this will exit the system.
 */
public class ZooKeeperCriticalThread extends ZooKeeperThread {
    private static final Logger LOG = LoggerFactory
            .getLogger(ZooKeeperCriticalThread.class);
    private final ZooKeeperServerListener listener;

    public ZooKeeperCriticalThread(String threadName,
            ZooKeeperServerListener listener) {
        super(threadName);//父类是个thread
        this.listener = listener;//设置监听器实例
    }

    /**
     * This will be used by the uncaught exception handler and make the system
     * exit.
     * 
     * @param threadName
     *            - thread name
     * @param e
     *            - exception object
     */
    @Override
    protected void handleException(String threadName, Throwable e) {//处理异常
        LOG.error("Severe unrecoverable error, from thread : {}", threadName, e);
        listener.notifyStopping(threadName, ExitCode.UNEXPECTED_ERROR);//观察者模式,调用监听器的对应逻辑
    }
}
例子说明

1.异常触发
SessionTrackerImpl#run

        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }

2.调用ZooKeeperCriticalThread#handleException

  @Override
    protected void handleException(String threadName, Throwable e) {
        LOG.error("Severe unrecoverable error, from thread : {}", threadName, e);
        listener.notifyStopping(threadName, ExitCode.UNEXPECTED_ERROR); // 调用监听器的处理逻辑
    }
  1. 监听器通知stop:ZooKeeperServerListener#notifyStopping
  @Override
    public void notifyStopping(String threadName, int exitCode) {
        LOG.info("Thread {} exits, error code {}", threadName, exitCode);
        zkServer.setState(State.ERROR); // 设置ZookeeperServer对应的状态
    }

4.zkServer设置状态:ZooKeeperServer#setState

protected void setState(State state) {
        this.state = state;
        // Notify server state changes to the registered shutdown handler, if any.
        if (zkShutdownHandler != null) {
            zkShutdownHandler.handle(state); //注册过handler,就进行处理
        } else {
            LOG.debug("ZKShutdownHandler is not registered, so ZooKeeper server "
                    + "won't take any action on ERROR or SHUTDOWN server state changes");
        }
    }

5.zkShutdownHandler处理state:ZooKeeperServerShutdownHandler#handle

    /**
     * This will be invoked when the server transition to a new server state.
     *
     * @param state new server state
     */
    void handle(State state) {
        if (state == State.ERROR || state == State.SHUTDOWN) {
            shutdownLatch.countDown();
        }
    }

6.计时器到0,停止server
这一部分代码在zkServer外层,在ZooKeeperServerMain#runFromConfig中

  final ZooKeeperServer zkServer = new ZooKeeperServer();
            // Registers shutdown handler which will be used to know the
            // server error or shutdown state changes.
            final CountDownLatch shutdownLatch = new CountDownLatch(1);
            zkServer.registerServerShutdownHandler(
                    new ZooKeeperServerShutdownHandler(shutdownLatch));

            txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                    config.dataDir));
            txnLog.setServerStats(zkServer.serverStats());
            zkServer.setTxnLogFactory(txnLog);
            zkServer.setTickTime(config.tickTime);
            zkServer.setMinSessionTimeout(config.minSessionTimeout);
            zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());
            cnxnFactory.startup(zkServer);
            // Watch status of ZooKeeper server. It will do a graceful shutdown
            // if the server is not running or hits an internal error.
            shutdownLatch.await(); // 当countdown之后,这边返回,优雅关闭
            shutdown();

            cnxnFactory.join();
            if (zkServer.canShutdown()) {
                zkServer.shutdown(true);
            }
        }

也就是启动时,zkServer有一个计时器为1,当遇到严重异常时,计时器-1变为0,就调用ZooKeeperServerMain#shutdown

小结

遇到严重异常时,server的关闭流程调用链总结
遇到严重异常 -> ZooKeeperCriticalThread#handleException -> ZooKeeperServerListener#notifyStopping -> ZooKeeperServer#setState -> ZooKeeperServerShutdownHandler#handle -> 计数器为0 -> ZooKeeperServerMain#shutdown

ZooKeeperCriticalThread,ZooKeeperServerListener,ZooKeeperServerShutdownHandler三个类的作用总结

  • ZooKeeperCriticalThread:表明了哪些入口的异常可以算作是严重的异常,能够让server关闭的,并且handleException方法完成对ZooKeeperServerListener的调用
  • ZooKeeperServerListener:表示接收到异常时,通知zk状态变更为ERROR
  • ZooKeeperServerShutdownHandler:用于处理zk为ERROR和SHUTDOWN状态时,计数器-1
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容