每探索一门新技术的时候,我们都会从方法的入口开始探索,对于zookeeper也一样,zookeeper在启动时候是通过QuorumPeerMain来作为启动入口类。我们有必要知道在启动类启动时,zookeeper做了哪些初始化和准备工作。
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
//zookeeper启动入口,初始化参数配置并启动zookeeper
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.exit(2);
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.exit(3);
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.exit(4);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
我们看到main方法里通过调用initializeAndRun开始进行初始化配置和启动,所以我们跟进去看下该方法的具体行为是做了什么?
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
// 1、解析zoo.cfg配置文件,读取配置文件中的KEY-value键值对,初始化QuorumPeerConfig
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
//创建文件清理管理器,该清理器主要负责定期清理内存快照文件和日志文件
//snapRetainCount:至少保留文件个数
//purgeInterval:定时任务间隔时间,只有purgeInterval大于0,该文件清理器才会开启文件清理的定时任务
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
//quorumVerifier!=null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1);
//上面表示如果是集群配置的话,服务器将会集群方式启动
/**zoo.cfg集群配置servers配置如下:
servers.1=127.0.0.1:2287:3387
servers.2=127.0.0.1:2288:3388
servers.3=127.0.0.1:2289:3389
***/
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);//集群方式启动
} else {
ZooKeeperServerMain.main(args);//单机版启动
}
}
然后我们看下,当zookeeper以集群方式启动时,具体做了哪些准备工作?
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
try {
//注册日志管理(不用管)
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
//我们可以在zoo.cfg里通过clientPortAddress属性来配置,如果没有配置clientPortAddress的话,默认采用clientPort端口地址
if (config.getClientPortAddress() != null) {
//创建ServerCnxnFactory实例,ServerCnxnFactory主要负责跟客户端进行网络通信,接收客户端网络请求,比如提交事务等,默认采用NIO。
// 可以通过配置系统参数zookeeper.serverCnxnFactory来配置它实际实现的方式
cnxnFactory = ServerCnxnFactory.createFactory();
//根据端口号和maxClientCnxns初始化配置ServerCnxnFactory ,这两个参数都可以通过zoo.cfg配置,分别是:
//clientPortAddress||ClientPort:客户端发送请求的端口
//maxClientCnxns:最大并发数,用来控制客户端高并发
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(),false);
}
//和ServerCnxnFactory 一样,通过在ZOO.CFG中配置secureClientPortAddress或secureClientPort,可都不配置
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
quorumPeer = getQuorumPeer();
//创建内存快照文件和事务文件日志文件的管理器FileTxnSnapLog,该管理器提供了zookeeper上层服务跟底层数据库存储的对接入口,
// 他提供了一系列接口,用来访问日志文件和内存快站文件(具体提供了哪些访问底层存储文件的方法,我们后面会单独抽出来探究一番)
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//配置选举算法,不过现在一般都不配,默认采用3
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());//设置心跳时间
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());//设置最小超时时间
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());//设置最小超时时间
quorumPeer.setInitLimit(config.getInitLimit());//初时心跳次数
quorumPeer.setSyncLimit(config.getSyncLimit());//同步心跳次数
quorumPeer.setConfigFileName(config.getConfigFilename());
//创建内存数据库ZKDatabase实例,创建该数据库内存实例时会注入一个DataTree,
// DataTree为内存数据库内真正作为保存内存数据的数据结构,维护了整个内存数据库中节点的数据结构,
// 同时在创建DataTree时候会初始化创建根路径/zookeeper和配额管理节点/zookeeper/quota
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
//初始化QuorumPeer的一些其它属性,包括quorumCnxnThreadsSize(线程池QuerumServer manager线程池的初始线程数)
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
...
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
//
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
//启动quorumPeer并开始投票选举
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
我们再来看下它是如何创建ServerCnxnFactory呢?
//创建ServerCnxnFactory实例,ServerCnxnFactory主要负责跟客户端进行网络通信,接收客户端网络请求,比如提交事务等,默认采用NIO。
// 可以通过配置系统参数zookeeper.serverCnxnFactory来配置它实际实现的方式
static public ServerCnxnFactory createFactory() throws IOException {
//获取系统参数zookeeper.serverCnxnFactory配置
String serverCnxnFactoryName = System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {//没有配置则默认采用NIOServerCnxnFactory
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
}
try {
//创建NIOServerCnxnFactory
ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.getDeclaredConstructor().newInstance();
LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
return serverCnxnFactory;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}
我么再看下quorumPeer.start做了哪些工作
public synchronized void start() {
//判断集群配置是否包含本机myid
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//恢复内存数据库的数据
loadDataBase();
//启动网络通信服务,并监听客户端的事务请求
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//开始进行leader选举
startLeaderElection();
//启动当前线程(QuorumPeer本身也是一个线程).具体run做了哪些,我们后面会单独讨论,只需要知道它主要用来统计票数,更新节点服务器的状态
super.start();
}
总结
通过上面的代码,大概可以知道,zookeeper服务器在启动时候主要做了以下工作:
1、解析zoo.cfg配置文件,并初始化QuorumPeerConfig.
2、创建文件清理器,根据配置参数purgeInterval决定是否开启任务定时清理内存快照文件和日志文件。
3、创建并开启用于和客户端进行网络通信的ServerCnxnFactory,默认是NIO的方式。
4、创建日志快照和内存快照的管理器FileTxnSnapLog,用来访问日志文件和内存快站文件。
5、创建并初始化内存数据库ZKDatabase
6、恢复内存数据库数据
7、开始执行leader选举。