zk的数据同步,主要有以下三种方式,diff,snap,trunc。
learner侧的代码
首先,同步的入口是QuorumPeer的run方法:
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
实例化follower对象,然后执行同步方法followLeader方法
QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
"""
注册同步请求,FOLLOWERINFO,如上图1的1
"""
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
"check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check"
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
其中registerWithLeader方法很重要,相当于同步协议的handshake。初始化了,follower的最大zxid。还有epoch .
protected long registerWithLeader(int pktType) throws IOException{
"
* Send follower info, including last zxid and sid
"
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
"
* Add sid to payload
"
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
writePacket(qp, true);
readPacket(qp);
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
"这里接收到来自leader的同步协议handshake的响应。LEADERINFO "
if (qp.getType() == Leader.LEADERINFO) {
" we are connected to a 1.0 server so accept the new epoch and read the next packet "
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
if (newEpoch > self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
" since we have already acked an epoch equal to the leaders, we cannot ack
// again, but we still need to send our lastZxid to the leader so that we can
// sync with it if it does assume leadership of the epoch.
// the -1 indicates that this reply should not count as an ack for the new epoch "
wrappedEpochBytes.putInt(-1);
} else {
throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
}
" 发送ACKEPOCH同步他的初始zxid 。有点像tcp协议的初始化ISN。 "
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
return ZxidUtils.makeZxid(newEpoch, 0);
} else {
if (newEpoch > self.getAcceptedEpoch()) {
self.setAcceptedEpoch(newEpoch);
}
if (qp.getType() != Leader.NEWLEADER) {
LOG.error("First packet should have been NEWLEADER");
throw new IOException("First packet should have been NEWLEADER");
}
return qp.getZxid();
}
}
紧接着调用syncWithLeader方法
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
"如果是snapshot的话,先清空自己的dataTree,然后从snapshot日志文件中反序列化,
//然后设定lastProcessedZxid "
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
" The leader is going to dump the database
// clear our own database and read "
zk.getZKDatabase().clear();
zk.getZKDatabase().deserializeSnapshot(leaderIs);
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got " + signature);
throw new IOException("Missing signature");
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
} else if (qp.getType() == Leader.TRUNC) {
" we need to truncate the log to the lastzxid of the leader "
LOG.warn("Truncating log to get in sync with the leader 0x"
+ Long.toHexString(qp.getZxid()));
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
" not able to truncate the log "
LOG.error("Not able to truncate the log "
+ Long.toHexString(qp.getZxid()));
System.exit(13);
}
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
}
else {
LOG.error("Got unexpected packet from leader "
+ qp.getType() + " exiting ... " );
System.exit(13);
}
zk.createSessionTracker();
long lastQueued = 0;
// in Zab V1.0 (ZK 3.4+) we might take a snapshot when we get the NEWLEADER message, but in pre V1.0
// we take the snapshot on the UPDATE message, since Zab V1.0 also gets the UPDATE (after the NEWLEADER)
// we need to make sure that we don't take the snapshot twice.
boolean isPreZAB1_0 = true;
//If we are not going to take the snapshot be sure the transactions are not applied in memory
// but written out to the transaction log
boolean writeToTxnLog = !snapshotNeeded;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
readPacket(qp);
"如果是DIff的方式同步,那么leader会不断发送PROPOSAL,COMMIT数据包。这里涉及到二阶段提交。packetsNotCommitted加一条 "
switch(qp.getType()) {
case Leader.PROPOSAL:
PacketInFlight pif = new PacketInFlight();
pif.hdr = new TxnHeader();
pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(pif.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
if (!writeToTxnLog) {
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
/*
* Only observer get this type of packet. We treat this
* as receiving PROPOSAL and COMMMIT.
*/
PacketInFlight packet = new PacketInFlight();
packet.hdr = new TxnHeader();
packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
// Log warning message if txn comes out-of-order
if (packet.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(packet.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
if (!writeToTxnLog) {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
"退出循环,说明已经同步完成,break之后,zk.startup(); "
case Leader.UPTODATE:
if (isPreZAB1_0) {
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop;
case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery
// means this is Zab 1.0
// Create updatingEpoch file and remove it after current
// epoch is set. QuorumPeer.loadDataBase() uses this file to
// detect the case where the server was terminated after
// taking a snapshot but before setting the current epoch.
File updating = new File(self.getTxnFactory().getSnapDir(),
QuorumPeer.UPDATING_EPOCH_FILENAME);
if (!updating.exists() && !updating.createNewFile()) {
throw new IOException("Failed to create " +
updating.toString());
}
if (snapshotNeeded) {
zk.takeSnapshot();
}
self.setCurrentEpoch(newEpoch);
if (!updating.delete()) {
throw new IOException("Failed to delete " +
updating.toString());
}
writeToTxnLog = true; "Anything after this needs to go to the transaction log, not applied directly in memory "
isPreZAB1_0 = false;
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
zk.startup();
leader侧的代码
入口是org.apache.zookeeper.server.quorum.QuorumPeer#run方法的leader.lead();
1 首先是loadData加载数据
if(zkDb.isInitialized()){
setZxid(zkDb.getDataTreeLastProcessedZxid());
}
else {
setZxid(zkDb.loadDataBase()); "加载数据"
}
org.apache.zookeeper.server.ZKDatabase#loadDataBase
public long loadDataBase() throws IOException {
long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
initialized = true;
return zxid;
}
我们看到这里有个commitProposalPlaybackListener 。 这个东西就是加载的时候,确定minZxid,maxZxid的地方。也就是图1 中的leader框中的那个队列
private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
public void onTxnLoaded(TxnHeader hdr, Record txn){
addCommittedProposal(hdr, txn);
}
};
"这个方法是在快速恢复最新事物的时候调用FileTxnSnapLog#fastForwardFromEdits"
public void addCommittedProposal(Request request) {
WriteLock wl = logLock.writeLock();
try {
wl.lock();
"LinkedList<Proposal> committedLog 是个链表,最大长度是500"
if (committedLog.size() > commitLogCount=500) {
committedLog.removeFirst();
minCommittedLog = committedLog.getFirst().packet.getZxid();
}
if (committedLog.size() == 0) {
minCommittedLog = request.zxid;
maxCommittedLog = request.zxid;
}
2 启动learnerHandler线程
org.apache.zookeeper.server.quorum.LearnerHandler#run方法358行,只有leader收到过半的LEARNERINFO,才会发送leaderInfo .
private HashSet<Long> connectingFollowers = new HashSet<Long>();
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized(connectingFollowers) {
#### 省略n行代码
#### 只有超过一半的connectingFollowers ,才会notifyAll唤醒线程
if (connectingFollowers.contains(self.getId()) &&
verifier.containsQuorum(connectingFollowers)) {
waitingForNewEpoch = false;
self.setAcceptedEpoch(epoch);
connectingFollowers.notifyAll();
} else {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(waitingForNewEpoch && cur < end) {
connectingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
if (waitingForNewEpoch) {
throw new InterruptedException("Timeout while waiting for epoch from quorum");
}
}
return epoch;
}
}
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
#### 等待接收learner的ACKEPOCH包
####
####
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
前面 图1 的第一步是 Leaner类的registerWithLeader方法。发送LearnerInfo.
现在是图1 的第二步 ,发送leaderInfo
然后图1 第三步 接受到ACKEPOCH包,同样的要过半之后,才走后面的流程
private HashSet<Long> electingFollowers = new HashSet<Long>();
private boolean electionFinished = false;
public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
synchronized(electingFollowers) {
#### 省略n行代码
QuorumVerifier verifier = self.getQuorumVerifier();
#### 过半唤醒线程
if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
electionFinished = true;
electingFollowers.notifyAll();
} else {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(!electionFinished && cur < end) {
electingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
同步
#### org.apache.zookeeper.server.quorum.LearnerHandler#run
/* the default to send to the follower */ "默认是SNAP"
int packetToSend = Leader.SNAP;
long zxidToSend = 0;
long leaderLastZxid = 0;
/** the packets that the follower needs to get updates from **/
long updates = peerLastZxid;
/* we are sending the diff check if we have proposals in memory to be able to
* send a diff to the
*/
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
#### 拿到最小的minZxid 和最大的maxZxid
#### test
####
####
final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
LOG.info("Synchronizing with Follower sid: " + sid
+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
"这里的proposals 的packetType都是PROPOSAL"
LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
"这里的peerLastZxid 就是前面ACKEPOCH包,发送过来的follower或者observer的lastZxid"
if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
// Follower is already sync with us, send empty diff
"已经同步,发送空的DIFF包"
LOG.info("leader and follower are in sync, zxid=0x{}",
Long.toHexString(peerLastZxid));
packetToSend = Leader.DIFF;
zxidToSend = peerLastZxid;
} else if (proposals.size() != 0) {
LOG.debug("proposal size is {}", proposals.size());
"如果peerLastZxid介于leader的【minCommittedLog ,maxCommittedLog 】之间"
if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
LOG.debug("Sending proposals to follower");
// as we look through proposals, this variable keeps track of previous
// proposal Id.
"因为我们遍历proposals队列,prevProposalZxid 记录上一个proposalZxid"
long prevProposalZxid = minCommittedLog;
// Keep track of whether we are about to send the first packet.
// Before sending the first packet, we have to tell the learner
// whether to expect a trunc or a diff
"先告诉对端是TRUNC还是DIFF同步"
boolean firstPacket=true;
// If we are here, we can use committedLog to sync with
// follower. Then we only need to decide whether to
// send trunc or not
packetToSend = Leader.DIFF;
zxidToSend = maxCommittedLog;
for (Proposal propose: proposals) {
// skip the proposals the peer already has
"小于对端的peerLastZxid,直接略过"
if (propose.packet.getZxid() <= peerLastZxid) {
prevProposalZxid = propose.packet.getZxid();
continue;
} else {
// If we are sending the first packet, figure out whether to trunc
// in case the follower has some proposals that the leader doesn't
if (firstPacket) {
firstPacket = false;
// Does the peer have some proposals that the leader hasn't seen yet
if (prevProposalZxid < peerLastZxid) {
// send a trunc message before sending the diff
packetToSend = Leader.TRUNC;
zxidToSend = prevProposalZxid;
updates = zxidToSend;
}
}
"一个PROPOSAL,一个COMMIT包,间隔着发过去,同步"
queuePacket(propose.packet);
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
queuePacket(qcommit);
}
}
"如果对端的zxid大于leader的maxCommittedLog,发送TRUNC指令"
} else if (peerLastZxid > maxCommittedLog) {
LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
Long.toHexString(maxCommittedLog),
Long.toHexString(updates));
packetToSend = Leader.TRUNC;
zxidToSend = maxCommittedLog;
updates = zxidToSend;
} else {
LOG.warn("Unhandled proposal scenario");
}
} else {
// just let the state transfer happen
LOG.debug("proposals is empty");
}
LOG.info("Sending " + Leader.getPacketType(packetToSend));
leaderLastZxid = leader.startForwarding(this, updates);
} finally {
rl.unlock();
}
4 发送newLeader 包,如果是SNAP,序列化snapshot文件,并且发送给learner
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
ZxidUtils.makeZxid(newEpoch, 0), null, null);
if (getVersion() < 0x10000) {
oa.writeRecord(newLeaderQP, "packet");
} else {
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
//Need to set the zxidToSend to the latest zxid
if (packetToSend == Leader.SNAP) {
zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
}
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
if (packetToSend == Leader.SNAP) {
LOG.info("Sending snapshot last zxid of peer is 0x"
+ Long.toHexString(peerLastZxid) + " "
+ " zxid of leader is 0x"
+ Long.toHexString(leaderLastZxid)
+ "sent zxid of db as 0x"
+ Long.toHexString(zxidToSend));
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
}
bufferedOutput.flush();
最后真的网络底层去同步数据
// Start sending packets
new Thread() {
public void run() {
Thread.currentThread().setName(
"Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption",e);
}
}
}.start();
5 等待learner的ACK
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.ACK){
LOG.error("Next packet was supposed to be an ACK");
return;
}
LOG.info("Received NEWLEADER-ACK message from " + getSid());
leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
6 启动zkServer()
"Leader.lead()"
startZkServer();
"LearnerHandler#run()"
/*
* Wait until leader starts up
*/
synchronized(leader.zk){
while(!leader.zk.isRunning() && !this.isInterrupted()){
leader.zk.wait(20);
}
}
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
7 可以响应客户端的请求,正常提供服务
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
附上类图:
最后总结:
1 集群启动时,当选举完成之后,进入数据同步过程。由QuorumPeer线程,进入同步过程。
2 learner侧,首先和leader建立连接。然后注册同步请求,FOLLOWERINFO或者OBSERVERINFO,给leader发送peerLastProcessZxid,还有epoch.
3 然后leader侧,等待过半的LEARNERINFO,然后才发送leaderInfo
4 registerWithLeader方法中发送ACKEPOCH
5 leader侧 waitForEpochAck ,过半之后,进行同步
6 先获取leader的最小的minZxid 和最大的maxZxid,然后作比较,分别进行DIFF,SNAP或者TRUNC同步
7 等待learner的ACK
8 启动实例,响应客户端的请求,正常提供服务