单机版服务器的启动流程图如下:
上面的过程可以分为预启动和初始化
启动过程详解
预启动
1. 统一由QuorumPeerMain作为启动类
无论单机或集群,在zkServer.cmd和zkServer.sh中都配置了QuorumPeerMain作为启动入口类。
2. 解析配置文件zoo.cfg
zoo.cfg配置运行时的基本参数,如tickTime、dataDir、clientPort等参数。
3. 创建并启动历史文件清理器DatadirCleanupManager
从3.4.0版本开始,zk增加了对事务日志和快照数据文件的定时清理。
4. 判断当前是集群模式还是单机模式
根据步骤2解析出的集群服务器地址列表来判断当前是集群模式还是单机模式,如果是单机模式,就委托给ZookeeperServerMain进行启动处理。
5. 再次进行配置文件zoo.cfg的解析
6. 创建服务器实例ZookeeperServer
zk服务器首先会进行服务器实例的创建,然后对服务器实例进行初始化,包括连接器,内存数据库和请求处理器等组件的初始化。
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]); // 解析配置文件
}
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start(); // 打开清理线程
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args); // 单机模式启动
}
}
public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
...
main.initializeAndRun(args);
...
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args); //解析配置文件
}
runFromConfig(config); //创建实例ZookeeperServer 完成初始化工作
}
初始化
/**
* Run from a ServerConfig.
* @param config ServerConfig to use.
* @throws IOException
*/
public void runFromConfig(ServerConfig config) throws IOException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// Note that this thread isn't going to be doing anything else,
// so rather than spawning another thread, we will just call
// run() in this thread.
// create a file logger url from the command line args
final ZooKeeperServer zkServer = new ZooKeeperServer(); // 会创建统计器
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
config.dataDir)); // 创建数据管理器
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(config.tickTime); // 设置服务器tickTime
zkServer.setMinSessionTimeout(config.minSessionTimeout);// 设置会话超时时间
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
cnxnFactory = ServerCnxnFactory.createFactory(); // 根据配置系统属性zookeper.serverCnxnFactory来指定使用Zookeeper自己实现的NIO还是使用Netty框架作为Zookeeper服务端网络连接工厂, 默认NIOServerCnxnFactory
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());// 初始化主线程,打开select 并bing端口,打开NIO的Accept通知
cnxnFactory.startup(zkServer);
// Watch status of ZooKeeper server. It will do a graceful shutdown
// if the server is not running or hits an internal error.
shutdownLatch.await();
shutdown();
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
1. 创建服务器统计器ServerStats
ServerStats是zk服务器运行时的统计器
public ZooKeeperServer() {
serverStats = new ServerStats(this);
listener = new ZooKeeperServerListenerImpl(this);
}
2. 创建zk数据管理器FileTxnSnapLog
FileTxnSnapLog是zk上层服务器和底层数据存储的对接层,提供了一系列操作数据文件的接口,如事务日志文件和快照数据文件,Zookeeper根据zoo.cfg文件中解析出的快照数据目录dataDir和事务日志目录dataLogDir来创建FileTxnSnapLog。
3. 设置服务器tickTime和会话超时时间限制。
4.创建ServerCnxnFactory。
通过配置系统属性zookeper.serverCnxnFactory来指定使用Zookeeper自己实现的NIO还是使用Netty框架作为Zookeeper服务端网络连接工厂。
5.5. 初始化ServerCnxnFactory。
@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
thread.setDaemon(true);
maxClientCnxns = maxcc;
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
6. 启动ServerCnxnFactory主线程
@Override
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
start(); // 启动IO线程
setZooKeeperServer(zks);
zks.startdata(); // 从log和snapshot恢复database和session,并重新生成一个最新的snapshot文件
zks.startup(); // 启动sessionTracker线程,初始化IO请求的处理链。并启动每个process线程
}
7. 恢复本地数据。
启动时,需要从本地快照数据文件和事务日志文件进行数据恢复。
public void startdata()
throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
}
if (!zkDb.isInitialized()) {
loadData();
}
}
8. 创建并启动会话管理器。
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker(); //Zookeeper会创建会话管理器SessionTracker进行会话管理。
setupRequestProcessors();// 启动processor线程,
registerJMX(); // 注册jms
setState(State.RUNNING);
notifyAll();
}
9. 初始化Zookeeper的请求处理链
Zookeeper请求处理方式为责任链模式的实现。会有多个请求处理器依次处理一个客户端请求,在服务器启动时,会将这些请求处理器串联成一个请求处理链。
10.注册JMX服务。
Zookeeper会将服务器运行时的一些信息以JMX的方式暴露给外部。
单机版配置解析
public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
main.initializeAndRun(args);
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
runFromConfig(config);
}
单机版服务器启动时,接收启动参数,分为两种情况,
- 只有一个参数: 表示为一个配置文件地址
- 有2~4个参数: 分别表示端口、dataDir、tickTime、maxClientCnxns
/**
* Server configuration storage.
*
* We use this instead of Properties as it's typed.
*
*/
public class ServerConfig {
////
//// If you update the configuration parameters be sure
//// to update the "conf" 4letter word
////
protected InetSocketAddress clientPortAddress; // 暴露给zk client的端口
protected String dataDir; // 快照日志目录
protected String dataLogDir; // 事务日志目录
protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
protected int maxClientCnxns;
/** defaults to -1 if not set explicitly */
protected int minSessionTimeout = -1; 用于限制客户端会话超时检测的最短时间
/** defaults to -1 if not set explicitly */
protected int maxSessionTimeout = -1;
}
如果配置没有显式设置minSessionTimeout 或者 maxSessionTimeout
那么最终,ZooKeeperServer获取最短,最长的sessionTimeout时,如下
public int getMinSessionTimeout() {
return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
}
public int getMaxSessionTimeout() {
return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
}
当启动时传入一个参数,也即使配置文件地址时:
public void parse(String path) throws ConfigException {//只有一个参数:表示为一个配置文件地址
QuorumPeerConfig config = new QuorumPeerConfig();
config.parse(path);
// let qpconfig parse the file and then pull the stuff we are
// interested in
readFrom(config);
}
/**
* Parse a ZooKeeper configuration file
* @param path the patch of the configuration file
* @throws ConfigException error processing configuration
*/
public void parse(String path) throws ConfigException {
File configFile = new File(path);
LOG.info("Reading configuration from: " + configFile);
try {
if (!configFile.exists()) {
throw new IllegalArgumentException(configFile.toString()
+ " file is missing");
}
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
cfg.load(in);
} finally {
in.close();
}
parseProperties(cfg);
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
}
}
public void parseProperties(Properties zkProp)
throws IOException, ConfigException {
int clientPort = 0;
String clientPortAddress = null;
for (Entry<Object, Object> entry : zkProp.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
if (key.equals("dataDir")) {
dataDir = value;
} else if (key.equals("dataLogDir")) {
dataLogDir = value;
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
} else if (key.equals("tickTime")) {
tickTime = Integer.parseInt(value);
}
public void readFrom(QuorumPeerConfig config) {
clientPortAddress = config.getClientPortAddress();
dataDir = config.getDataDir();
dataLogDir = config.getDataLogDir();
tickTime = config.getTickTime();
maxClientCnxns = config.getMaxClientCnxns();
minSessionTimeout = config.getMinSessionTimeout();
maxSessionTimeout = config.getMaxSessionTimeout();
}
如果是2-4个参数:分别表示端口、dataDir、tickTime、maxClientCnxns
public void parse(String[] args) {
if (args.length < 2 || args.length > 4) {//这种parse接收参数的个数为2-4,分别表示端口、dataDir、tickTime、maxClientCnxns;
throw new IllegalArgumentException("Invalid args:"
+ Arrays.toString(args));
}
clientPortAddress = new InetSocketAddress(Integer.parseInt(args[0]));
dataDir = args[1];
dataLogDir = dataDir;
if (args.length == 3) {
tickTime = Integer.parseInt(args[2]);
}
if (args.length == 4) {
maxClientCnxns = Integer.parseInt(args[3]);
}
}
服务器异常报警
状态监听器
ZooKeeperServerListener以及实现类ZooKeeperServerListenerImpl:
void notifyStopping(String threadName, int errorCode);//用于在线程发生严重问题时,通知server
private final ZooKeeperServer zkServer;
ZooKeeperServerListenerImpl(ZooKeeperServer zkServer) {
this.zkServer = zkServer;
}
@Override
public void notifyStopping(String threadName, int exitCode) {
LOG.info("Thread {} exits, error code {}", threadName, exitCode);
zkServer.setState(State.ERROR);//ZooKeeperServer设置对应状态即可
}
这里只是调用 zkServer.setState(State.ERROR),具体监听到ERROR时该怎么干,是在zkserver中完成的事情,这里只监听和通知,
/**
* Represents critical thread. When there is an uncaught exception thrown by the
* thread this will exit the system.
*/
public class ZooKeeperCriticalThread extends ZooKeeperThread {
private static final Logger LOG = LoggerFactory
.getLogger(ZooKeeperCriticalThread.class);
private final ZooKeeperServerListener listener;
public ZooKeeperCriticalThread(String threadName,
ZooKeeperServerListener listener) {
super(threadName);//父类是个thread
this.listener = listener;//设置监听器实例
}
/**
* This will be used by the uncaught exception handler and make the system
* exit.
*
* @param threadName
* - thread name
* @param e
* - exception object
*/
@Override
protected void handleException(String threadName, Throwable e) {//处理异常
LOG.error("Severe unrecoverable error, from thread : {}", threadName, e);
listener.notifyStopping(threadName, ExitCode.UNEXPECTED_ERROR);//观察者模式,调用监听器的对应逻辑
}
}
例子说明
1.异常触发
SessionTrackerImpl#run
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
2.调用ZooKeeperCriticalThread#handleException
@Override
protected void handleException(String threadName, Throwable e) {
LOG.error("Severe unrecoverable error, from thread : {}", threadName, e);
listener.notifyStopping(threadName, ExitCode.UNEXPECTED_ERROR); // 调用监听器的处理逻辑
}
- 监听器通知stop:ZooKeeperServerListener#notifyStopping
@Override
public void notifyStopping(String threadName, int exitCode) {
LOG.info("Thread {} exits, error code {}", threadName, exitCode);
zkServer.setState(State.ERROR); // 设置ZookeeperServer对应的状态
}
4.zkServer设置状态:ZooKeeperServer#setState
protected void setState(State state) {
this.state = state;
// Notify server state changes to the registered shutdown handler, if any.
if (zkShutdownHandler != null) {
zkShutdownHandler.handle(state); //注册过handler,就进行处理
} else {
LOG.debug("ZKShutdownHandler is not registered, so ZooKeeper server "
+ "won't take any action on ERROR or SHUTDOWN server state changes");
}
}
5.zkShutdownHandler处理state:ZooKeeperServerShutdownHandler#handle
/**
* This will be invoked when the server transition to a new server state.
*
* @param state new server state
*/
void handle(State state) {
if (state == State.ERROR || state == State.SHUTDOWN) {
shutdownLatch.countDown();
}
}
6.计时器到0,停止server
这一部分代码在zkServer外层,在ZooKeeperServerMain#runFromConfig中
final ZooKeeperServer zkServer = new ZooKeeperServer();
// Registers shutdown handler which will be used to know the
// server error or shutdown state changes.
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
config.dataDir));
txnLog.setServerStats(zkServer.serverStats());
zkServer.setTxnLogFactory(txnLog);
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zkServer);
// Watch status of ZooKeeper server. It will do a graceful shutdown
// if the server is not running or hits an internal error.
shutdownLatch.await(); // 当countdown之后,这边返回,优雅关闭
shutdown();
cnxnFactory.join();
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
}
也就是启动时,zkServer有一个计时器为1,当遇到严重异常时,计时器-1变为0,就调用ZooKeeperServerMain#shutdown
小结
遇到严重异常时,server的关闭流程调用链总结
遇到严重异常 -> ZooKeeperCriticalThread#handleException -> ZooKeeperServerListener#notifyStopping -> ZooKeeperServer#setState -> ZooKeeperServerShutdownHandler#handle -> 计数器为0 -> ZooKeeperServerMain#shutdown
ZooKeeperCriticalThread,ZooKeeperServerListener,ZooKeeperServerShutdownHandler三个类的作用总结
- ZooKeeperCriticalThread:表明了哪些入口的异常可以算作是严重的异常,能够让server关闭的,并且handleException方法完成对ZooKeeperServerListener的调用
- ZooKeeperServerListener:表示接收到异常时,通知zk状态变更为ERROR
- ZooKeeperServerShutdownHandler:用于处理zk为ERROR和SHUTDOWN状态时,计数器-1