1.zookeeper启动方式
最近对项目进行重构,主要是针对业务模式进行重新的梳理和设计,在集群部署方面也需要进行相应调整,在设计方案的过程中也对一些开源组件的集群方案进行总结,本文主要是对zookeeper的集群启动方式和选举流程进行梳理和理解,从源码的角度进行分析。
以下是集群启动的流程图:
zookeeper服务启动方式分为单机启动方式和集群启动方式,服务启动入口都是QuorumPeerMain,main方法如下:
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
//初始化参数和运行服务
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
System.exit(ExitCode.INVALID_INVOCATION.getValue());
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
System.exit(ExitCode.INVALID_INVOCATION.getValue());
} catch (DatadirException e) {
LOG.error("Unable to access datadir, exiting abnormally", e);
System.err.println("Unable to access datadir, exiting abnormally");
System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue());
} catch (AdminServerException e) {
LOG.error("Unable to start AdminServer, exiting abnormally", e);
System.err.println("Unable to start AdminServer, exiting abnormally");
System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue());
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
}
LOG.info("Exiting normally");
System.exit(ExitCode.EXECUTION_FINISHED.getValue());
}
主要的逻辑处理方法是initializeAndRun(args)启动参数通过args传入,主要方法代码如下:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
String path = "/Users/fzz/Desktop/zookeeperData/zoo.cfg";
//args[0]是配置文件zoo.cfg路径,从配置文件加载参数
args = new String[1];
args[0] = path;//此段代码是本地运行代码,实际zookeeper中是在启动的时候传入的参数
if (args.length == 1) {
config.parse(args[0]);
}
//config.parse(path);
// Start and schedule the the purge task
//创建定时任务,根据配置文件的参数获取对应的信息,定时任务的功能就是清除旧的快照信息。
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),//zk节点数据目录
config.getDataLogDir(),//配置服务器存储事物日志文件
config.getSnapRetainCount(),//相邻两次数据快照之前事物操作次数
config.getPurgeInterval());//定期清理事物日志和快照文件的间隔时间
purgeMgr.start();
if (args.length == 1 && config.isDistributed()) {
//集群方式启动
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode");
// there is only server in the quorum -- run as standalone
//单机模式启动
ZooKeeperServerMain.main(args);
}
}
以上代码是我本地二次开发做了一些调整,args[0]表示的是配置文件zoo.cfg路径,一般的启动也都是通过配置文件的方式,启动的时候将配置参数加载到内存中。DatadirCleanupManager是一个定时任务,定时清理旧的快照数据。如果是配置文件的方式启动并且配置的是多server节点,也就是config.isDistributed()为true,将会以集群的方式启动,文章直接介绍集群的激动方式,单机启动方式流程比较简单有兴趣可以自己查看代码。
集群启动的函数为runFromConfig(config)函数,里面的主要逻辑创建数据管理器FileTxnSnapLog、创建QuorumPeer实例,quorumPeer实例是zookeeperServer的托管程序,zookeeper运行期间会一直检查服务的状态,服务状态发生变化的时候会根据状态来发起新一轮的选举过程。
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
MetricsProvider metricsProvider;
try {
metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
config.getMetricsProviderClassName(),
config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
try {
ServerMetrics.metricsProviderInitialized(metricsProvider);
//主要负责创建和客户端交互的io和worker线程
//有NIOServerCnxnFactory和NettyServerCnxnFactory
//默认NioServerCnxnFactory
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
//默认使用NIOServerCnxnFactory,nio不支持ssl
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
if (config.getSecureClientPortAddress() != null) {
//ssl默认使用NettyServerCsxnFactory
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
//quorumPeer实例,在集群模式下才会创建,zookeeper运行期间会不断检测服务状态,状态改变之后判断是否进行新一轮的选举过程
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier() != null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if (quorumPeer.isQuorumSaslAuthEnabled()) {
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.initialize();
if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
}
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
} finally {
if (metricsProvider != null) {
try {
metricsProvider.stop();
} catch (Throwable error) {
LOG.warn("Error while stopping metrics", error);
}
}
}
}
quorumPeer启动方法包括加载初始化内存数据库和选举过程,如下:
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//创建内存数据库zkDataBase
loadDataBase();
//
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//开始leader选举
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
zookeeper实现的协议是zab协议,因此选举的时候不同节点会相互通信,选出节点支持数最多的节点成为主节点。
选举过程如下:
1.首先服务会获取最新的数据信息
2.启动ServerCnxnFactory主线程
3.进入选举方法
3.1)选举信息包括一下参数
this.version = 0x0;
this.id = id;//被推举的服务器id,配置文件中的myid
this.zxid = zxid;//本轮次选举将要被推举的leader的事务id
this.electionEpoch = -1;//选举时钟,没选举一次需要+1,在接收到其他选票的时候判断与自己是否在同一轮次的选举中
this.peerEpoch = peerEpoch;//本轮选举被推举的leader的epoch
this.state = ServerState.LOOKING;//当前服务器的服务状态,只有在looking的时候才会进行选举
synchronized public void startLeaderElection() {
try {
//looking状态表示正在选举过程
if (getPeerState() == ServerState.LOOKING) {
//组建当前投票信息
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
this.electionAlg = createElectionAlgorithm(electionType);
}
quorumPeer实例的执行run服务,首先会注册一些jmx服务,然后循环不断的判断选举结果,getPeerState()用来获取当前服务的状态。
LOOKING状态:会再次发起投票;
OBSERVING状态:等待接收leader选举结果,需要解释的是OBSERVING状态的节点是不需要参与投票的,只处理非事务请求,事务请求会转交给leader服务器。
FOLLOWING状态:从节点,更新当前服务的主节点信息,开启从节点启动流程;
LEADING状态:主节点,更新当前主节点信息,开启主节点启动流程。
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
//开始选举
startLeaderElection();
}
//发送投票信息 setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
// Add delay jitter before we switch to LOOKING
// state to reduce the load of ObserverMaster
if (isRunning()) {
Observer.waitForObserverElectionDelay();
}
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
}
另外咋选举过程中会创建一个QuorumCnxManager管理对象,如下面代码
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
//在选举的过程中创建改管理类,负责服务器之间的选举通信
QuorumCnxManager qcm = createCnxnManager();
//获取就得实例
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
//如果存在就得实例,则删除旧的时候,重新创建新的实例
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start();
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
QuorumCnxManager管理类封装了SendWorker和RecvWorker工作线程,负责不同服务之间的通信。
SendWorker工作过程:send工作工程中会讲需要发送的信息保存到一个阻塞队列中,
public void run() {
threadCnt.incrementAndGet();
try {
/**
* If there is nothing in the queue to send, then we
* send the lastMessage to ensure that the last message
* was received by the peer. The message could be dropped
* in case self or the peer shutdown their connection
* (and exit the thread) prior to reading/processing
* the last message. Duplicate messages are handled correctly
* by the peer.
*
* If the send queue is non-empty, then we have a recent
* message than that stored in lastMessage. To avoid sending
* stale message, we should send the message in the send queue.
*/
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
//队列中没有数据就从,lastMessageSent中找
//更新最后的信息到lastMessageSent
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid={}", sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
//先从queueSendMap中获取sid,服务器id
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq != null) {
//如果不为空,则将当前的信息发送
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " + "server " + sid);
break;
}
//判断获取的b是否空,不为空存到lastMessageSent队列中
if (b != null) {
lastMessageSent.put(sid, b);
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue", e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid
+ " my id = " + QuorumCnxManager.this.mySid
+ " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
}
RecvWorker接收过程:
RecvWorker会创建一个接收服务,不断接收其他服务传输过来的信息,将将信息保存到一个阻塞队列中addToRecvQueue。
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
* message
*/
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException("Received packet with invalid packet: " + length);
}
/**
* Allocates a new ByteBuffer to receive the message
*/
final byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid
+ ", my id = " + QuorumCnxManager.this.mySid
+ ", error = ", e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
closeSocket(sock);
}
}
选举流程为:
首先说明一下选举的规则,比较规则为:选举轮次 >ZXID >SID按照优先级排序,先比较选举轮次在比较ZXID,最后比较SID,谁最大选谁为leader。
1.在每次投票的时候都会对当前选举轮次进行自增,同一轮次的选票才是有效选票;
2.组装选票信息
每次选举都需要设置选举信息,需要指出的一点就是,如果是第一次进行选举,服务都会选择自己是leader;
3.将选票发送出去
当前服务器会讲选票发送到所有能够参与选举的服务器,通过上诉的接收的发送类来实现接收和发送;并且不断轮询自己的状态,根据自己的状态来判断下一步选择(如上文所提);
4.通过接收服务接收其他服务的信息;
5.判断接收到的有效投票的选举轮次
5.1)如果外部的轮次大就更新自己的轮次,并且情况其他服务的投票信息,然后将初始化投票信息和当前接收到的投票信息比较;
5.2)如果外部投票信息的轮次小于本身的轮次信息,则直接忽略外部的投票信息;
5.2)如果外部投票信息根本身的投票信息一样,则进行信息比较;
6、每个服务根据比较的结果推选leader,选票过半的为leader节点;接收到本轮次的所有选票之后,如果返回的状态不再接收投票那么说明一件选举出leader节点,所有的节点更新服务状态,根据服务状态leader节点还是follower节点。
以上便是zookeeper集群启动过程和leader选举过程。
zookeeper实现的zab协议,能够提供分区容错性和数据一致性,保证了cp,结合重构项目的目标,单机和集群部署方案采用的也是不同部署方案,通过配置文件配置来实现单机还是集群部署。在集群模式下的备选方案中对集群组建流程上也借鉴了zookeeper的集群启动方式,这也是一种比较常用的组建方式,大家也可以借鉴一下,研究一下大神的思考方式和代码实现方式。
感谢您的阅读,我是神奇的胖子 。一个热爱生活、相信生活的人。