说在前面
在 zookeeper ZAB Leader Elect 源码分析 ,已经详细的分析了zookeeper的选主过程,接下来的文章会分析Leader和Follower的初始化过程
初始化示例图
当leader被选出来之后,leader和follower进入集群形成和数据同步状态,包含以下几个过程
1. 连接连接
leader会根据zoo.cfg里面配置ip的第一个端口启动连接监听器LearnerCnxAcceptorHandler来监控follower的连接请求
新epoch的确定
新形成的集群需要一个新的epoch来表示大家目前是不是工作共一个circle中
数同步过程
当集群新的epoch确定之后,集群就开始进行数据恢复,数据恢复完成之后,follower和leader的数据处理引擎启动,之后集群就可以向外提供服务了
下面进行源码的详解
Leader
经过几轮投票之后,Leader被成功的选了出来,Leading 所对应的QuorumPeer类就会进入Leading过程
while (running) {
switch (getPeerState()) {
case LOOKING: .....
case FOLLOWING: .....
case OBSERVING: ......
case LEADING:
LOG.info("LEADING");
try {
//生成Leader对象
setLeader(makeLeader(logFactory));
//作为集群老大开始领导大家
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
}
}
QuorumPeer 创建Leader对象
Leader对象继承自LearnerMaster,Leader的属性太多,我们就不在这里介绍了,在后面如果遇到对应的属性我们再做详细解析
new Leader
// LeaderZooKeeperServer 继承ZookeeperServer,它代表的是节点在Leader角色下的zookeeper服务
public Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new BufferStats();
//获得Leader节点的监听ip地址端口
Set<InetSocketAddress> addresses;
if (self.getQuorumListenOnAllIPs()) {
addresses = self.getQuorumAddress().getWildcardAddresses();
} else {
addresses = self.getQuorumAddress().getAllAddresses();
}
addresses.stream()
.map(address -> createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum()))
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(serverSockets::add);
if (serverSockets.isEmpty()) {
throw new IOException("Leader failed to initialize any of the following sockets: " + addresses);
}
this.zk = zk;
}
new LeaderZooKeeperServer
LeaderZooKeeperServer的初始化过程最后会初始化ZookeeperServer,下面的代码我在zookeeper单机版server端启动源码分析中有解析过,这里不在重复
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
serverStats = new ServerStats(this);
this.txnLogFactory = txnLogFactory;
this.txnLogFactory.setServerStats(this.serverStats);
this.zkDb = zkDb;
this.tickTime = tickTime;
setMinSessionTimeout(minSessionTimeout);
setMaxSessionTimeout(maxSessionTimeout);
this.listenBacklog = clientPortListenBacklog;
this.reconfigEnabled = reconfigEnabled;
listener = new ZooKeeperServerListenerImpl(this);
readResponseCache = new ResponseCache(Integer.getInteger(
GET_DATA_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));
getChildrenResponseCache = new ResponseCache(Integer.getInteger(
GET_CHILDREN_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));
this.initialConfig = initialConfig;
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
this.initLargeRequestThrottlingSettings();
LOG.info(
"Created server with"
+ " tickTime {}"
+ " minSessionTimeout {}"
+ " maxSessionTimeout {}"
+ " clientPortListenBacklog {}"
+ " datadir {}"
+ " snapdir {}",
tickTime,
getMinSessionTimeout(),
getMaxSessionTimeout(),
getClientPortListenBacklog(),
txnLogFactory.getDataDir(),
txnLogFactory.getSnapDir());
}
后面我们解析Leader数据处理链的时候还会解析,LeaderZooKeeperServer现在我们暂且不表。
Leader.lead
Leader创建完之后通过lead方法让自己进入Lead的过程,我们看下lead()方法的源代码,这个方法比较长我们一段一段的来看,先看第一段
//设置zab的状态为discovery
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
self.tick.set(0);
//zookeeperServer去加载本地数据,正常的情况下由于在选主的时候zk本地的数据已经加载完成了,这里的loadData只是会做一个本地镜像
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from
// new followers.
//创建leader端的连接监听器线程管理类,用来创建等待follower的连接的LearnerCnxAcceptorHandler连接监听器
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
//lead方法会在这个地方wait,等待有过半数follower到来,然后生成新的epoch
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
LearnerCnxAcceptor
Leader在lead过程中会创建LearnerCnxAcceptor我们看下LearnerCnxAcceptor的run实现
public void run() {
if (!stop.get() && !serverSockets.isEmpty()) {
ExecutorService executor = Executors.newFixedThreadPool(serverSockets.size());
CountDownLatch latch = new CountDownLatch(serverSockets.size());
//根据leader绑定的ip来创建LearnerCnxAcceptorHandler
serverSockets.forEach(serverSocket ->
executor.submit(new LearnerCnxAcceptorHandler(serverSocket, latch)));
try {
latch.await();
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping in LearnerCnxAcceptor.", ie);
} finally {
closeSockets();
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.error("not all the LearnerCnxAcceptorHandler terminated properly");
}
} catch (InterruptedException ie) {
LOG.error("Interrupted while terminating LearnerCnxAcceptor.", ie);
}
}
}
}
LearnerCnxAcceptorHandler
Leader端用来接收follower连接请求的线程
我看下LearnerCnxAcceptorHandler的run方法
public void run() {
try {
Thread.currentThread().setName("LearnerCnxAcceptorHandler-" + serverSocket.getLocalSocketAddress());
while (!stop.get()) {
//acceptConnections实现了Leader ServerSocket监听follower的连接请求
acceptConnections();
}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e);
if (fail.compareAndSet(false, true)) {
handleException(getName(), e);
halt();
}
} finally {
latch.countDown();
}
}
LearnerCnxAcceptorHandler.acceptConnections
private void acceptConnections() throws IOException {
Socket socket = null;
boolean error = false;
try {
//接收follower的连接请求
socket = serverSocket.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
//设置socket的超时时间
socket.setSoTimeout(self.tickTime * self.initLimit);
socket.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(socket.getInputStream());
//创建LearnerHandler线程来表示和处理follower发送来的请求
LearnerHandler fh = new LearnerHandler(socket, is, Leader.this);
fh.start();
} catch (SocketException e) {
error = true;
if (stop.get()) {
LOG.warn("Exception while shutting down acceptor.", e);
} else {
throw e;
}
} catch (SaslException e) {
LOG.error("Exception while connecting to quorum learner", e);
error = true;
} catch (Exception e) {
error = true;
throw e;
} finally {
// Don't leak sockets on errors
if (error && socket != null && !socket.isClosed()) {
try {
socket.close();
} catch (IOException e) {
LOG.warn("Error closing socket: " + socket, e);
}
}
}
}
LearnerHandler 是follower在Leader端的表示
我们暂时先不做详细介绍,等下面讲Follower的时候我们再解析
Follower
上面讲解了master初始化的一部分,为什么不继续讲解呢,因为这个时候只有结合follower端的动作才能更好的理解,下面我们进入follower初始化过程的解析。
在master被选出来之后非master节点会把自己设置成follower节点(这里我们不讲observer)然后进入followLeader阶段,下面是follower节点QuorumPeer following代码片段
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
创建Follower
创建follower首先会创建FollowerZooKeeperServer,FollowerZooKeeperServer继承自ZooKeeperServer,代表了follower节点的zookeeper实例,但是它有自己独特的请求处理链在后续讲解请求处理的时候,我会详细解析
new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
follower.followLeader
Follower实例被创建起来之后通followLeader来与Leader进行交互
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken, QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
long connectionTime = 0;
boolean completedSync = false;
try {
//把zab的状态由election改变成discovery
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
//根据投票的结果找到Leader节点
QuorumServer leaderServer = findLeader();
try {
//建立到Leader节点的socket连接
connectToLeader(leaderServer.addr, leaderServer.hostname);
connectionTime = System.currentTimeMillis();
//向Leader节点注册自己,获取到新的epoch,至此,follower已经成为zookeeper集群中一个合法的follower节点了
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
}
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch "
+ ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch "
+ ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
long startTime = Time.currentElapsedTime();
try {
//follower设置leader的adress和sid
self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
// follower更改zab状态为synchronization,进入数据同步状态
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
//与leader同步数据
syncWithLeader(newEpochZxid);
//同步完数据之后,follower修改zab状态为broadcast,zab协议的最后一个过程完成,节点可以向外提供服务了
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
} finally {
long syncTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
}
if (self.getObserverMasterPort() > 0) {
LOG.info("Starting ObserverMaster");
om = new ObserverMaster(self, fzk, self.getObserverMasterPort());
om.start();
} else {
om = null;
}
// create a reusable packet to reduce gc impact todo: at this point follower is ready to serve
QuorumPacket qp = new QuorumPacket();
//follower接下来会进入如下的循环体中,接受来自leader的消息然后处理
while (this.isRunning()) {
//接受来自leader的消息
readPacket(qp);
//处理消息:关于消息的处理,我们放在后面解析
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
closeSocket();
// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
if (om != null) {
om.stop();
}
zk.unregisterJMX(this);
if (connectionTime != 0) {
long connectionDuration = System.currentTimeMillis() - connectionTime;
LOG.info(
"Disconnected from leader (with address: {}). Was connected for {}ms. Sync state: {}",
leaderAddr,
connectionDuration,
completedSync);
messageTracker.dumpToLog(leaderAddr.toString());
}
}
}
connectToLeader
Follower连接Leader,我们直接看实现
protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {
this.leaderAddr = multiAddr;
Set<InetSocketAddress> addresses;
if (self.isMultiAddressReachabilityCheckEnabled()) {
// even if none of the addresses are reachable, we want to try to establish connection
// see ZOOKEEPER-3758
addresses = multiAddr.getAllReachableAddressesOrAll();
} else {
addresses = multiAddr.getAllAddresses();
}
ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
CountDownLatch latch = new CountDownLatch(addresses.size());
AtomicReference<Socket> socket = new AtomicReference<>(null);
//根据leader的地址创建LeaderConnector,LeaderConnector是真正建立到Leader连接的线程类
addresses.stream().map(address -> new LeaderConnector(address, socket, latch)).forEach(executor::submit);
try {
latch.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while trying to connect to Leader", e);
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.error("not all the LeaderConnector terminated properly");
}
} catch (InterruptedException ie) {
LOG.error("Interrupted while terminating LeaderConnector executor.", ie);
}
}
if (socket.get() == null) {
throw new IOException("Failed connect to " + multiAddr);
} else {
sock = socket.get();
}
self.authLearner.authenticate(sock, hostname);
//通过LeaderConnector我们创建了到Leader的socket连接,
// leaderIs封装了到leader socket的输入流
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
// leaderOs封装了到leader socket的输出流
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}
LeaderConnector
LeaderConnector的作用就是follower建立到Leader的socket连接,它是一个线程类,当到到Leader的socket建立完成之后这个类的使命也就完成了,其线程也就结束了,这个类的实现没有什么好说了的,在这里就不解析了。
Follower.registerWithLeader
Follower向Leader注册自己的信息
protected long registerWithLeader(int pktType) throws IOException {
/*
* Send follower info, including last zxid and sid
*/
//follower从db中获取已经处理最大的事物id
long lastLoggedZxid = self.getLastLoggedZxid();
//QuorumPacket是Leader的Follower之间消息序列化载体
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
/*
* Add sid to payload
*/
//LearningInfo表示的是本follower
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
//把follower的信息(sid,协议版本号,verify的版本号)序列化到ByteArrayOutputStream中,然后写入QuorumPacket的data中
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
//把QuorumPacket通过socket发送给服务端
writePacket(qp, true);
//等待服务端的返回,leader会返回新的epoch给到客户端
readPacket(qp);
//从leader的返回中获得新的epoch
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
if (qp.getType() == Leader.LEADERINFO) {
// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte[] epochBytes = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
if (newEpoch > self.getAcceptedEpoch()) {
//把新的epoch写入到本机的acceptedEpoch文件中,表示follower在当前新circle中了
wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
//如果新的epoch和acceptedEpoch相同,那么做不任何处理
// since we have already acked an epoch equal to the leaders, we cannot ack
// again, but we still need to send our lastZxid to the leader so that we can
// sync with it if it does assume leadership of the epoch.
// the -1 indicates that this reply should not count as an ack for the new epoch
wrappedEpochBytes.putInt(-1);
} else {
//如果follower的acceptedEpoch大于新的epoch那么表示本机所在circle大于新集群设定的circle,直接报错
throw new IOException("Leaders epoch, "
+ newEpoch
+ " is less than accepted epoch, "
+ self.getAcceptedEpoch());
}
//向leader发送对新的epoch的ack(包括自己的事物id,自己当前的epoch)
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
return ZxidUtils.makeZxid(newEpoch, 0);
} else {
if (newEpoch > self.getAcceptedEpoch()) {
self.setAcceptedEpoch(newEpoch);
}
if (qp.getType() != Leader.NEWLEADER) {
LOG.error("First packet should have been NEWLEADER");
throw new IOException("First packet should have been NEWLEADER");
}
return qp.getZxid();
}
}
到这里我停一下,follower既然向leader发送了消息,我们看下leader端是如何处理的
上面我提到服务端创建线程类LearnerHandler来处理follower的请求,我们现在看下LearnerHandler的run方法
LearnerHandler.run
LearnerHandler处理和follower之间的所有通信数据,代码很长,下面我分段进行讲解,我先看第一段,接受follower发送来的epoch
public void run() {
try {
//把代表follower的LearnerHandler加入到learnerMaster的follower列表中
learnerMaster.addLearnerHandler(this);
tickOfNextAckDeadline = learnerMaster.getTickOfInitialAckDeadline();
//初始化输入流和输出流
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
QuorumPacket qp = new QuorumPacket();
//通过输入流读取来自follower发送的来的一个epoch信息
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp.toString());
return;
}
if (learnerMaster instanceof ObserverMaster && qp.getType() != Leader.OBSERVERINFO) {
throw new IOException("Non observer attempting to connect to ObserverMaster. type = " + qp.getType());
}
//获取消息体
byte[] learnerInfoData = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
//从消息体中获得follower的sid
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
//获取protocolVersion
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
//获取configVersion
long configVersion = bbsid.getLong();
if (configVersion > learnerMaster.getQuorumVerifierVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
this.sid = learnerMaster.getAndDecrementFollowerCounter();
}
//根据sid获取follower配置的ip和端口信息
String followerInfo = learnerMaster.getPeerInfo(this.sid);
if (followerInfo.isEmpty()) {
LOG.info(
"Follower sid: {} not in the current config {}",
this.sid,
Long.toHexString(learnerMaster.getQuorumVerifierVersion()));
} else {
LOG.info("Follower sid: {} : info : {}", this.sid, followerInfo);
}
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
learnerMaster.registerLearnerHandlerBean(this, sock);
//getEpochFromZxid 获取客户端发送的epoch
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
// learnerMaster.getEpochToPropose:leader QuorumPeer lead在getEpochToPropose方法上会等待过半数的以上的participant到来,才会进行执行lead剩余的代码
//每一个follower到来,只要他们在同一个轮次,这个follower就会更改leader中participant个数的状态,如果follower加上leader的数量过半了,那么leader的QuorumPeer线程就会跳出等待,继续执行,同时返回新的epoch,表示集群进入新的工作轮次
long newEpoch = learnerMaster.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
第二段 Leader发送新生成的epoch给到follower
//根据新生成的epoch生成leader新zxid
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// fake the message
learnerMaster.waitForEpochAck(this.getSid(), ss);
} else {
byte[] ver = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
//创建消息发送体 发送新的leader zxid给follower
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
//通过把socket把消息发送出去
oa.writeRecord(newEpochPacket, "packet");
messageTracker.trackSent(Leader.LEADERINFO);
bufferedOutput.flush();
//读取follower对leader新生成的epoch的ack
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
messageTracker.trackReceived(ackEpochPacket.getType());
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error("{} is not ACKEPOCH", ackEpochPacket.toString());
return;
}
这个时候我再回到Leader的lead方法,当主线程从getEpochToPropose返回后发生了什么
//获得新生成的epoch
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
//epoch生成zxid
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized (this) {
lastProposed = zk.getZxid();
}
//生成宣称自己是leader的报文
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(), null, null);
if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of {}", Long.toHexString(newLeaderProposal.packet.getZxid()));
}
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
QuorumVerifier curQV = self.getQuorumVerifier();
if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
try {
LOG.debug(String.format("set lastSeenQuorumVerifier to currentQuorumVerifier (%s)", curQV.toString()));
QuorumVerifier newQV = self.configFromString(curQV.toString());
newQV.setVersion(zk.getZxid());
self.setLastSeenQuorumVerifier(newQV, true);
} catch (Exception e) {
throw new IOException(e);
}
}
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
//等待过半数的participant对新epoch的ack信息
waitForEpochAck(self.getId(), leaderStateSummary);
这个时候leader在等待有过半数的participant对新epoch的ack
我们在回到LearnerHandler看下当读取到follower发送来的epoch的ack后发生了什么
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
//当收到follower发送来的对新epoch的ack后调用leader的waitForEpochAck方法
learnerMaster.waitForEpochAck(this.getSid(), ss);
waitForEpochAck
我们分析下waitForEpochAck,其实它的代码比较简单就是判断有没没有过半数的participant对新epoch进行ack,如果有,那么这个这个新epoch就在整个集群中生效了,成为合法的大家公认的在本轮次使用的epoch
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
// electingFollowers 保存了已经对新epoch进行ack的participant数量
synchronized (electingFollowers) {
if (electionFinished) {
return;
}
if (ss.getCurrentEpoch() != -1) {
if (ss.isMoreRecentThan(leaderStateSummary)) {
throw new IOException("Follower is ahead of the leader, leader summary: "
+ leaderStateSummary.getCurrentEpoch()
+ " (current epoch), "
+ leaderStateSummary.getLastZxid()
+ " (last zxid)");
}
if (ss.getLastZxid() != -1 && isParticipant(id)) {
electingFollowers.add(id);
}
}
QuorumVerifier verifier = self.getQuorumVerifier();
//如果有过半数的participant对new epoch进行了ack,那么本轮的选举正式完成
if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
electionFinished = true;
electingFollowers.notifyAll();
} else {
//如果还没有过半数的participant对new epoch 进行ack那么线程进入wait等待
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit() * self.getTickTime();
while (!electionFinished && cur < end) {
electingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!electionFinished) {
throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
}
}
}
}
我们再回到follower端,当follower发送完对new epoch的ack后就会进入zab synchronization阶段,我看下 follower端syncWithLeader的实现,同样这个方法比较长我们分段分析
protected void syncWithLeader(long newLeaderZxid) throws Exception {
//预生产对leader的LEADERINFO的ack报文
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
QuorumVerifier newLeaderQV = null;
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
boolean syncSnapshot = false;
//读取来自leader的数据同步指令
readPacket(qp);
这个时候我们再回到LearnerHandler,当new epoch被过半数的participant接受后,LearnerHandler进入syncFollower
LearnerHandler syncFollower
syncFollower方法根据follower的f_zxid和leader自己已经处理的zxid 来决定如何恢复follower的数据
boolean syncFollower(long peerLastZxid, LearnerMaster learnerMaster) {
/*
* When leader election is completed, the leader will set its
* lastProcessedZxid to be (epoch < 32). There will be no txn associated
* with this zxid.
*
* The learner will set its lastProcessedZxid to the same value if
* it get DIFF or SNAP from the learnerMaster. If the same learner come
* back to sync with learnerMaster using this zxid, we will never find this
* zxid in our history. In this case, we will ignore TRUNC logic and
* always send DIFF if we have old enough history
*/
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
// Keep track of the latest zxid which already queued
long currentZxid = peerLastZxid;
boolean needSnap = true;
//获取leader的zk数据库
ZKDatabase db = learnerMaster.getZKDatabase();
boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
ReentrantReadWriteLock lock = db.getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
//leader目前已经处理的最大事物id
long maxCommittedLog = db.getmaxCommittedLog();
//leader目前已经处理的同时还在事物队列中的最小事物id
long minCommittedLog = db.getminCommittedLog();
//leader目前已经处理的且被ack的最新事物id
//当maxCommittedLog>lastProcessedZxid时候
//说当前leader 有一些事物在log中还没有被集群其他机器ack
long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
LOG.info("Synchronizing with Learner sid: {} maxCommittedLog=0x{}"
+ " minCommittedLog=0x{} lastProcessedZxid=0x{}"
+ " peerLastZxid=0x{}",
getSid(),
Long.toHexString(maxCommittedLog),
Long.toHexString(minCommittedLog),
Long.toHexString(lastProcessedZxid),
Long.toHexString(peerLastZxid));
if (db.getCommittedLog().isEmpty()) {
/*
* It is possible that committedLog is empty. In that case
* setting these value to the latest txn in learnerMaster db
* will reduce the case that we need to handle
*
* Here is how each case handle by the if block below
* 1. lastProcessZxid == peerZxid -> Handle by (2)
* 2. lastProcessZxid < peerZxid -> Handle by (3)
* 3. lastProcessZxid > peerZxid -> Handle by (5)
*/
minCommittedLog = lastProcessedZxid;
maxCommittedLog = lastProcessedZxid;
}
/*
* 下面的英文注释给出了syncFollower要处理的几种数据同步方案
* Here are the cases that we want to handle
*
* 1. Force sending snapshot (for testing purpose)
//如果follower端的事物id已经和master 事物id相同,那么直接给follower发送一个空的diff状态
* 2. Peer and learnerMaster is already sync, send empty diff
//如果follower的zxid大于master的zxid,那么发送trunc消息,指示follower删除对应zxid所关联的事物,
//但是如果follower发送来的zxid为newEpochZxid说明这个follower本身还没有处理过任何事物。那么不能发送trunc
* 3. Follower has txn that we haven't seen. This may be old leader
* so we need to send TRUNC. However, if peer has newEpochZxid,
* we cannot send TRUNC since the follower has no txnlog
//如果follower的zxid在leader的committedLog 范围之内,我们需要发送diff
* 4. Follower is within committedLog range or already in-sync.
* We may need to send DIFF or TRUNC depending on follower's zxid
* We always send empty DIFF if follower is already in-sync
//如果follower的zxid小于minCommittedLog那么需要使用leader的commitLog和磁盘上的事物信息来恢复follower端的数据,
//如果失败,直接使用snap的方式同步数据
* 5. Follower missed the committedLog. We will try to use on-disk
* txnlog + committedLog to sync with follower. If that fail,
* we will send snapshot
*/
//下面分析下上面提到的5条follower数据同步准则
if (forceSnapSync) {
// Force learnerMaster to use snapshot to sync with follower
LOG.warn("Forcing snapshot sync - should not see this in production");
} else if (lastProcessedZxid == peerLastZxid) {
// Follower is already sync with us, send empty diff
//follower端的zxid和leader的zxid相同,那么直接给follower发送一个diff指令,后面不会发送任何数据
LOG.info(
"Sending DIFF zxid=0x{} for peer sid: {}",
Long.toHexString(peerLastZxid),
getSid());
queueOpPacket(Leader.DIFF, peerLastZxid);
needOpPacket = false;
needSnap = false;
} else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
//如果follower的zxid大于leader已经提交的最大的zxid,那么向follower端发送trunc命令,
//指示follower端把自己数据库中多存储的事物给删除了,只保留到事物id为maxCommittedLog的那些事物
// Newer than committedLog, send trunc and done
LOG.debug(
"Sending TRUNC to follower zxidToSend=0x{} for peer sid:{}",
Long.toHexString(maxCommittedLog),
getSid());
queueOpPacket(Leader.TRUNC, maxCommittedLog);
currentZxid = maxCommittedLog;
needOpPacket = false;
needSnap = false;
} else if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
//如果follower的fzxid在maxCommittedLog和minCommittedLog之前,那么说明follower当前的事物落后于leader,需要把leader在(fzxid,maxCommittedLog]之间事物发送给follower
// Follower is within commitLog range
LOG.info("Using committedLog for peer sid: {}", getSid());
Iterator<Proposal> itr = db.getCommittedLog().iterator();
//把follower端缺失的事物保存到带同步队列中
currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
needSnap = false;
} else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
// Use txnlog and committedLog to sync
//如果follower的fzxid小于minCommittedLog,那么说明follower落后于leader比较多,需要把leader在(fzxid,maxCommittedLog]之间的事物发送给follower,
//这些事物有一部分[minCommittedLog,maxCommittedLog]是leader缓存在内存中committedLog,另一部分(fzxid,minCommittedLog)事物在log中,需要从这两个部分中去分别恢复数据
// Calculate sizeLimit that we allow to retrieve txnlog from disk
long sizeLimit = db.calculateTxnLogSizeLimit();
// This method can return empty iterator if the requested zxid
// is older than on-disk txnlog
//根据fzxid和sizeLimit来获取事物log文件迭代器
Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
if (txnLogItr.hasNext()) {
LOG.info("Use txnlog and committedLog for peer sid: {}", getSid());
//获取第一个需要同步都follower的zxid
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);
if (currentZxid < minCommittedLog) {
//如果currentZxid小于minCommittedLog,那么直接发snap给到follower,不需要通过diff的方式了
LOG.info(
"Detected gap between end of txnlog: 0x{} and start of committedLog: 0x{}",
Long.toHexString(currentZxid),
Long.toHexString(minCommittedLog));
currentZxid = peerLastZxid;
// Clear out currently queued requests and revert
// to sending a snapshot.
queuedPackets.clear();
needOpPacket = true;
} else {
LOG.debug("Queueing committedLog 0x{}", Long.toHexString(currentZxid));
Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
//queueCommittedProposals正常会把log中的比fzxid大的事物id都放入带同步到follower队列中
//但是这里面也可能会出现一些异常的情况,就是fzxid在log事物中,那么这个时候需要向follower发送一个trunc命令,当然fzxid出现在log事物中那么向follower发送diff命令。
//还有一种情况是log的事物id大于fzxid单是发现他们不是同一个epoch,这个时候需要对follower做snap 全量数据的同步
currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
needSnap = false;
}
}
// closing the resources
if (txnLogItr instanceof TxnLogProposalIterator) {
TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
txnProposalItr.close();
}
} else {
LOG.warn(
"Unhandled scenario for peer sid: {} maxCommittedLog=0x{}"
+ " minCommittedLog=0x{} lastProcessedZxid=0x{}"
+ " peerLastZxid=0x{} txnLogSyncEnabled={}",
getSid(),
Long.toHexString(maxCommittedLog),
Long.toHexString(minCommittedLog),
Long.toHexString(lastProcessedZxid),
Long.toHexString(peerLastZxid),
txnLogSyncEnabled);
}
if (needSnap) {
currentZxid = db.getDataTreeLastProcessedZxid();
}
LOG.debug("Start forwarding 0x{} for peer sid: {}", Long.toHexString(currentZxid), getSid());
leaderLastZxid = learnerMaster.startForwarding(this, currentZxid);
} finally {
rl.unlock();
}
if (needOpPacket && !needSnap) {
//直接给follower发送snap来做全量数据恢复
// This should never happen, but we should fall back to sending
// snapshot just in case.
LOG.error("Unhandled scenario for peer sid: {} fall back to use snapshot", getSid());
needSnap = true;
}
//返回是不是需要做snap数据同步
return needSnap;
}
我们在回到LeaderHandler的主线程,看看在确定了是不是需要按照snap的方式恢复follower数据后发生了什么
//syncFollower上面我们已经解析了,会返回是不是需要按照snap的方式去恢复follower的数据
//如果needSnap是false,那么leader可能是按照diff的方式把待恢复的事物放在了queuedPackets中,
boolean needSnap = syncFollower(peerLastZxid, learnerMaster);
// syncs between followers and the leader are exempt from throttling because it
// is importatnt to keep the state of quorum servers up-to-date. The exempted syncs
// are counted as concurrent syncs though
//对于observer同步数据时候 需要限流
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
/* if we are not truncating or sending a diff just send a snapshot */
if (needSnap) {
//如果needSnap为true
// syncThrottler 是同步限流器
syncThrottler = learnerMaster.getLearnerSnapSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
try {
//获得leader最新已经处理的zxid
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
//向follower发送snap同步数据指令
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
messageTracker.trackSent(Leader.SNAP);
bufferedOutput.flush();
LOG.info(
"Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
+ "send zxid of db as 0x{}, {} concurrent snapshot sync, "
+ "snapshot sync was {} from throttle",
Long.toHexString(peerLastZxid),
Long.toHexString(leaderLastZxid),
Long.toHexString(zxidToSend),
syncThrottler.getSyncInProgress(),
exemptFromThrottle ? "exempt" : "not exempt");
// Dump data to peer
//leader把本地数据库发送给follower
learnerMaster.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
//数据冲刷到网络上
bufferedOutput.flush();
} finally {
ServerMetrics.getMetrics().SNAP_COUNT.add(1);
}
} else {
syncThrottler = learnerMaster.getLearnerDiffSyncThrottler();
syncThrottler.beginSync(exemptFromThrottle);
ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
ServerMetrics.getMetrics().DIFF_COUNT.add(1);
}
LOG.debug("Sending NEWLEADER message to {}", sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
//下面是Leader向follower发送 我是Leader的通知,并且等待follower的ack
// we got here, so the version was set
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
// Start thread that blast packets in the queue to learner
//把待发送到follower的暂存在queuedPackets的数据使用单独的线程发送出去
startSendingPackets();
//读取follower端的发送回来的ack
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {
LOG.error("Next packet was supposed to be an ACK, but received packet: {}", packetToString(qp));
return;
}
LOG.debug("Received NEWLEADER-ACK message from {}", sid);
//等待有过半数的participant确认了Leader的NEWLEADER请求,
//这个时候leader的quorumPeer也在等待follower对自己NEWLEADER的确认
learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
syncLimitCheck.start();
// sync ends when NEWLEADER-ACK is received
syncThrottler.endSync();
if (needSnap) {
ServerMetrics.getMetrics().INFLIGHT_SNAP_COUNT.add(syncThrottler.getSyncInProgress());
} else {
ServerMetrics.getMetrics().INFLIGHT_DIFF_COUNT.add(syncThrottler.getSyncInProgress());
}
syncThrottler = null;
// now that the ack has been processed expect the syncLimit
sock.setSoTimeout(learnerMaster.syncTimeout());
/*
* Wait until learnerMaster starts up
*/
//当有过半数的participant确认了Leader的NEWLEADER地位,那么LearnerHandler就开始等待Leader事物处理引擎的启动
learnerMaster.waitForStartup();
//当leader的server启动完成后,下面的代码就是开始处理具体的业务请求了,我们在后面会解析
Follower.syncWithLeader
下面我在说回Follower,看看follower的syncWithLeader后面发生了什么,这个方法也是巨长,我们先分析数据同步的这一块
protected void syncWithLeader(long newLeaderZxid) throws Exception {
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
QuorumVerifier newLeaderQV = null;
// In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
// For SNAP and TRUNC the snapshot is needed to save that history
boolean snapshotNeeded = true;
boolean syncSnapshot = false;
//读取来自Leader发送过来的数据同步指令
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
//得到leader 发送的diff数据同步指令
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
self.setSyncMode(QuorumPeer.SyncMode.DIFF);
snapshotNeeded = false;
} else if (qp.getType() == Leader.SNAP) {
//得到leader 发送的snap数据同步指令
self.setSyncMode(QuorumPeer.SyncMode.SNAP);
LOG.info("Getting a snapshot from leader 0x{}", Long.toHexString(qp.getZxid()));
// The leader is going to dump the database
// db is clear as part of deserializeSnapshot()
//follower本地zk数据库直接从leader发送来的snap数据流反序列结果
zk.getZKDatabase().deserializeSnapshot(leaderIs);
// ZOOKEEPER-2819: overwrite config node content extracted
// from leader snapshot with local config, to avoid potential
// inconsistency of config node content during rolling restart.
if (!self.isReconfigEnabled()) {
LOG.debug("Reset config node content from local config after deserialization of snapshot.");
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
}
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got {}", signature);
throw new IOException("Missing signature");
}
//设置本地库最新处理的事物id
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
// immediately persist the latest snapshot when there is txn log gap
syncSnapshot = true;
} else if (qp.getType() == Leader.TRUNC) {
//接受leader发送来的trunc指令,进行无效事物的删除
//we need to truncate the log to the lastzxid of the leader
self.setSyncMode(QuorumPeer.SyncMode.TRUNC);
LOG.warn("Truncating log to get in sync with the leader 0x{}", Long.toHexString(qp.getZxid()));
//删除无效的事物id
boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log 0x{}", Long.toHexString(qp.getZxid()));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
} else {
LOG.error("Got unexpected packet from leader: {}, exiting ... ", LearnerHandler.packetToString(qp));
ServiceUtils.requestSystemExit(ExitCode.QUORUM_PACKET_ERROR.getValue());
}
//初始化本地zk库的/zookeeper/config节点的值
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
//创建LearnerSessionTracker,关于 SessionTracker的作用我在之前的源码中有专门解析
zk.createSessionTracker();
long lastQueued = 0;
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean isPreZAB1_0 = true;
//If we are not going to take the snapshot be sure the transactions are not applied in memory
// but written out to the transaction log
boolean writeToTxnLog = !snapshotNeeded;
TxnLogEntry logEntry;
// we are now going to start getting transactions to apply followed by an UPTODATE
//这个循环就是处理接受来自leader用来恢复follower数据的proposal,LEADERINFO和UPTODATE
outerLoop:
while (self.isRunning()) {
//等待leader发送来的消息,接受到的第一个消息应该为UPTODATE
readPacket(qp);
switch (qp.getType()) {
case Leader.PROPOSAL:
PacketInFlight pif = new PacketInFlight();
logEntry = SerializeUtils.deserializeTxn(qp.getData());
pif.hdr = logEntry.getHeader();
pif.rec = logEntry.getTxn();
pif.digest = logEntry.getDigest();
if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn(
"Got zxid 0x{} expected 0x{}",
Long.toHexString(pif.hdr.getZxid()),
Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
if (pif.hdr.getType() == OpCode.reconfig) {
SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}
packetsNotCommitted.add(pif);
break;
//正常来说每一个proposal后面都会有一个commit指令
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));
boolean majorChange = self.processReconfig(
qv,
ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(),
true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
if (!writeToTxnLog) {
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn(
"Committing 0x{}, but next proposal is 0x{}",
Long.toHexString(qp.getZxid()),
Long.toHexString(pif.hdr.getZxid()));
} else {
//把proposal应用到本地数据库
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
case Leader.INFORMANDACTIVATE:
PacketInFlight packet = new PacketInFlight();
if (qp.getType() == Leader.INFORMANDACTIVATE) {
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
logEntry = SerializeUtils.deserializeTxn(remainingdata);
packet.hdr = logEntry.getHeader();
packet.rec = logEntry.getTxn();
packet.digest = logEntry.getDigest();
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) packet.rec).getData()));
boolean majorChange = self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
} else {
logEntry = SerializeUtils.deserializeTxn(qp.getData());
packet.rec = logEntry.getTxn();
packet.hdr = logEntry.getHeader();
packet.digest = logEntry.getDigest();
// Log warning message if txn comes out-of-order
if (packet.hdr.getZxid() != lastQueued + 1) {
LOG.warn(
"Got zxid 0x{} expected 0x{}",
Long.toHexString(packet.hdr.getZxid()),
Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
}
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.UPTODATE:
//接受到来自leader的UPTODATE的信息
LOG.info("Learner received UPTODATE message");
if (newLeaderQV != null) {
boolean majorChange = self.processReconfig(newLeaderQV, null, null, true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
if (isPreZAB1_0) {
zk.takeSnapshot(syncSnapshot);
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
//跳出循环,完成数据同步,准备启动本地的zk服务
break outerLoop;
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
// means this is Zab 1.0
//接受到了leader的 NEWLEADER 信号
LOG.info("Learner received NEWLEADER message");
if (qp.getData() != null && qp.getData().length > 1) {
try {
//从leader发送来的消息中获得机器机器的信息
QuorumVerifier qv = self.configFromString(new String(qp.getData()));
self.setLastSeenQuorumVerifier(qv, true);
newLeaderQV = qv;
} catch (Exception e) {
e.printStackTrace();
}
}
if (snapshotNeeded) {
zk.takeSnapshot(syncSnapshot);
}
//设置currentEpoch
self.setCurrentEpoch(newEpoch);
writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
isPreZAB1_0 = false;
//给leader的 NEWLEADER发送ack消息
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
//发送对leader UPTODATE的ack
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
//启动follower的执行引擎,
zk.startup();
follower执行引擎启动
在收到了来自服务端的UPTODATE的消息后,follower就会进入启动执行引擎的过程:初始化请求处理链,启动sessionTracker
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor(getFollower()));
syncProcessor.start();
}
follower 处理来自leader的请求
followLeader方法处理来自leader请求的代码片段,我会在zk集群请求处理过程中去解析
while (this.isRunning()) {
//读取leader发送来的消息
readPacket(qp);
//处理接受的leader消息
processPacket(qp);
}
Leader 执行引擎启动
当leader收到过半数的对自己LEADERINFO的ack之后,那么就会启动zk的执行引擎包括创建session tracker,初始化请求处理链,下面是leader请求处理链的初始化过程,我会在下一篇解析zk处理请求的时候详细解析他们
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
//处理节点为container类型的线程类
setupContainerManager();
}
Leader QuorumPeer
leader 执行引擎启动完成之后,leader主线程进入定期检查各个follower是不是处于同步的状态任务
LearnerHandler 请求处理
当Leader启动完成之后,对应的LearnerHandler进入请求处理代码,我会在下一遍文章中去解析这个过程
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = learnerMaster.getTickOfNextAckDeadline();
packetsReceived.incrementAndGet();
ByteBuffer bb;
long sessionId;
int cxid;
int type;
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
LOG.debug("Received ACK from Observer {}", this.sid);
}
syncLimitCheck.updateAck(qp.getZxid());
learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
case Leader.PING:
// Process the touches
ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
learnerMaster.touch(sess, to);
}
break;
case Leader.REVALIDATE:
ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
learnerMaster.revalidateSession(qp, this);
break;
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if (type == OpCode.sync) {
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
learnerMaster.submitLearnerRequest(si);
requestsReceived.incrementAndGet();
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break;
}
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch (IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
} catch (SyncThrottleException e) {
LOG.error("too many concurrent sync.", e);
syncThrottler = null;
} catch (Exception e) {
LOG.error("Unexpected exception in LearnerHandler.", e);
throw e;
} finally {
if (syncThrottler != null) {
syncThrottler.endSync();
syncThrottler = null;
}
String remoteAddr = getRemoteAddress();
LOG.warn("******* GOODBYE {} ********", remoteAddr);
messageTracker.dumpToLog(remoteAddr);
shutdown();
}
至此就完成了对zk Leader,Follower初始化的分析