概述
前面已经分析单机模式事务请求流程的处理,责任链为PrepRequestProcessor ~ SyncRequestProcessor ~ FinalRequestProcessor流程还是比较清晰的;本节来分析下集群模式下事务请求的处理,其中涉及zab协议的实现、leader和follower之间的通信、Request在各个Processor之间的流转等等;
流程分析
0. 准备责任链
- FollowerZooKeeperServer.setupRequestProcessors和LeaderZooKeeperServer.setupRequestProcessors中进行设置;
下面按照该流程图进行分析(流程图中序号跟下面分析序号对应)
1. PrepRequestProcessor.run
跟单机模式入口一致,都是监听2181端口,PrepRequestProcessor对请求进行校验(例如:路径、版本、ACL等)之后传递到ProposalRequestProcessor;参考Zookeeper(五)-服务端单机模式-事务请求处理
2. ProposalRequestProcessor.processRequest
public void processRequest(Request request) throws RequestProcessorException {
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
// CommitProcessor
nextProcessor.processRequest(request);
// 事务请求,hdr在Request构造方法中没有使用,在PrepRequestProcessor.pRequest2Txn处理事务请求时赋值
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
try {
// 广播follower提案
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// 记录txn / snapshot日志,然后next到 AckRequestProcessor
syncProcessor.processRequest(request);
}
}
}
-
nextProcessor.processRequest(request)
Request传递到CommitProcessor; -
request.hdr != null
hdr在Request构造方法中没有使用,在PrepRequestProcessor.pRequest2Txn处理事务请求时赋值,因此不为空表示事务请求; -
zks.getLeader().propose(request)
构造Propose广播给follower; -
syncProcessor.processRequest(request)
记录txn / snapshot日志,然后next到 AckRequestProcessor;
3.1. CommitProcessor.run
public void run() {
try {
Request nextPending = null;
while (!finished) {
// toProcess初始为空,
int len = toProcess.size();
for (int i = 0; i < len; i++) {
// ToBeApplied
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
synchronized (this) {
if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() == 0) {
// committedRequests为空时阻塞
wait();
continue;
}
// First check and see if the commit came in for the pending request
// 初始时queuedRequests不为空,nextPending为空,committedRequests不为空
// 一次遍历后:nextPending = queuedRequests中取出的Request; r = committedRequests中取出的Request;
// leader同步到follower时queuedRequests始终为空
if ((queuedRequests.size() == 0 || nextPending != null) && committedRequests.size() > 0) {
Request r = committedRequests.remove();
if (nextPending != null && nextPending.sessionId == r.sessionId && nextPending.cxid == r.cxid) {
// we want to send our version of the request.the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
// 使用nextPending,因为它已设置了cnxn
toProcess.add(nextPending);
nextPending = null;
} else {
// this request came from someone else so just send the commit packet
toProcess.add(r);
}
}
}
if (nextPending != null) {
continue;
}
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
-
nextProcessor.processRequest(toProcess.get(i))
toProcess(存放需要传递到下一个Processor的Request)不为空时遍历并传递到ToBeAppliedProcessor; -
committedRequests
已经提交的请求,即leader已经收到半数follower的ACK并广播了COMMIT的请求;这里为空会wait()
,在5.1
时收到过半的提案会从outstandingProposals加入到committedRequests,之后这里继续往下执行; -
queuedRequests
是从上一个Processor(ProposalRequestProcessor)传递过来的请求链表;需要注意的是leader发送COMMIT到follower,follower执行到这时queuedRequests为空; -
nextPending
初始为空,阻塞被唤醒之后从queuedRequests中取出; -
nextPending != null && nextPending.sessionId == r.sessionId && nextPending.cxid == r.cxid
表示当前是leader在处理,不是leader同步到follower的处理;leader时需要把nextPending放入toProcess传递到下一个Processor,因为nextPending中已经设置了cnxn(用于跟follower进行通信);follower时直接把committedRequests中Request放到toProcess;
3.2. Leader.propose(广播提案)
public Proposal propose(Request request) throws XidRolloverException {
// zxid的低32位已经达到最大值,需要重新选举使高32位的epoch+1
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
request.hdr.serialize(boa, "hdr");
if (request.txn != null) {
request.txn.serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.warn("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
baos.toByteArray(), null);
// Proposal > QuorumPacket > Request
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
lastProposed = p.packet.getZxid();
// propose放入outstandingProposals
outstandingProposals.put(lastProposed, p);
// 遍历forwardingFollowers,广播提案
sendPacket(pp);
}
return p;
}
-
(request.zxid & 0xffffffffL) == 0xffffffffL
zxid的低32位已经达到最大值,需要重新选举使高32位的epoch+1 -
outstandingProposals.put(lastProposed, p)
反序列化的Request包装成Propose放入outstandingProposals; -
sendPacket(pp)
遍历forwardingFollowers,把Propose放入每一个follower对应的queuedPackets中;(上一节分析的LearnerHandler.sendPackets中发送)
4-1. Follower.followerLeader(接收提案)
while (this.isRunning()) {
// 启动后阻塞等待leader发送请求
readPacket(qp);
processPacket(qp);
}
上一节已经分析过
4-2. FollowerZooKeeperServer.logRequest
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
hdr.getType(), null, null);
request.hdr = hdr;
request.txn = txn;
request.zxid = hdr.getZxid();
if ((request.zxid & 0xffffffffL) != 0) {
// 放入pendingTxns,commit时使用
pendingTxns.add(request);
}
// 提交到SyncRequestProcessor -> SendAckRequestProcessor
syncProcessor.processRequest(request);
}
-
pendingTxns.add(request)
放入pendingTxns用于传递到CommitProcessor,图中5.2
; -
syncProcessor.processRequest(request)
Follower中传递到SyncRequestProcessor;
4-3. SyncRequestProcessor.run(记录日志)
4-4. SendAckRequestProcessor.processRequest(回复ACK)
public void processRequest(Request si) {
if(si.type != OpCode.sync){
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.hdr.getZxid(), null, null);
try {
// 回复leader ACK
learner.writePacket(qp, false);
}
......
}
}
-
learner.writePacket(qp, false)
写完事务日志,通过leaderOs回复ACK包给leader;
4-5. LearnerHandler.run(接收ACK)
case Leader.ACK:
......
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
if ((zxid & 0xffffffffL) == 0) {
// 低32位全0,即忽略NEWLEADER-ACK
return;
}
if (outstandingProposals.size() == 0) {
return;
}
if (lastCommitted >= zxid) {
// 忽略已经提交的zxid
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
return;
}
// 收到ack的sid放入该提案的ackSet中,用于过半判断
p.ackSet.add(sid);
// 判断该提案是否收到过半的Follower的ack
if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
// zxid正常应该是按顺序执行的
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
// TODO 提案加入ConcurrentLinkedQueue
toBeApplied.add(p);
}
// 广播COMMIT到follower
commit(zxid);
// 广播INFORM到observer
inform(p);
// 已经提交的Request放入committedRequests,等待CommitRequestProcessor处理
zk.commitProcessor.commit(p.request);
// LearnerSyncRequest时会放入pendingSyncs
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
}
}
-
(zxid & 0xffffffffL) == 0
低32位全为0,即忽略NEWLEADER-ACK; -
lastCommitted >= zxid
忽略已经提交的提案; -
self.getQuorumVerifier().containsQuorum(p.ackSet)
判断该提案是否收到过半的Follower的ACK; -
toBeApplied.add(p)
toBeApplied在构造ToBeAppliedRequestProcessor时传入,COMMIT之后用于校验后删除; -
commit(zxid)
广播COMMIT到follower,遍历forwardingFollowers,把COMMIT放入每一个follower对应的queuedPackets中;(上一节分析的LearnerHandler.sendPackets中发送);(对应下面5-1
) -
inform(p)
广播INFORM到observer,遍历observingLearners,把COMMIT放入每一个observer对应的queuedPackets中;(上一节分析的LearnerHandler.sendPackets中发送); -
zk.commitProcessor.commit(p.request)
已经提交的Request放入committedRequests,激活CommitRequestProcessor.run(即5.1
);
5.1. CommitProcessor.run(已发送COMMIT)
committedRequests.size不为0,wait结束,上面3.1
中已经分析;
5-1. Follower.followerLeader(接收COMMIT)
while (this.isRunning()) {
// 启动后阻塞等待leader发送请求
readPacket(qp);
processPacket(qp);
}
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
Request request = pendingTxns.remove();
commitProcessor.commit(request);
}
-
firstElementZxid != zxid
COMMIT的zxid不等于pendingTxns头元素,即commit顺序跟propose不一致,直接退出; -
commitProcessor.commit(request)
提交到commitProcessor(5-2
);
5-2. CommitProcessor.run(Follower执行COMMIT)
跟Leader执行CommitProcessor.run逻辑一致,上面3.1
中已经分析;
5-3. FinalRequestProcessor.processRequest(Follower执行)
整体逻辑在单机模式已经分析过,主要包括执行内存事务操作和构造响应两部分,但是集群模式从Leader同步过来的请求不需要构造响应;
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
if (request.cnxn == null) {
return;
}
-
Request.isQuorum
事务请求时维护committedLog(快速选举时用于数据同步);(但是这里是Follower为什么也要维护committedLog呢?) -
request.cnxn == null
前面3.1
时分析过,为空说明该Request是从Leader传递过来,说明当前是Follower,不用构造Response,直接return;
至此Follower流程就结束了
再回过头来看4.1
4.1. SyncRequestProcessor.run(Leader记录日志)
同4-3
,同样参考Zookeeper(五)-服务端单机模式-事务请求处理
4.2+4.3. AckRequestProcessor.run(Leader本身的ACK)
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null)
leader.processAck(self.getId(), request.zxid, null);
else
LOG.error("Null QuorumPeer");
}
- 处理Leader自己请求的ACK,调用Leader.processAck,相当于接收自己的ACK,具体处理流程同
4-5
;
再退一步来看5.2
5.2. ToBeAppliedRequestProcessor.processRequest
public void processRequest(Request request) throws RequestProcessorException {
// request.addRQRec(">tobe");
// 啥也没干直接传递到 FinalRequestProcessor(更新DataTree,组装客户端响应)
next.processRequest(request);
Proposal p = toBeApplied.peek();
// 从toBeApplied中移除(Leader.processAck时放入)
if (p != null && p.request != null && p.request.zxid == request.zxid) {
toBeApplied.remove();
}
}
关于toBeApplied,这里重点补充下
1.
Leader.processAck时,收到过半的ACK会把当前Proposal放入Leader的toBeApplied成员变量中;
2.
LeaderHandler.run选主后数据同步Follower时,在发送NEWLEADER之前,Leader并未停止处理请求,在commitLog和NEWLEADER之间的请求就是上一步放入toBeApplied中的;数据同步时就是通过下面startForwarding方法遍历toBeApplied构造PROPOSE和COMMIT发送给Follower;
synchronized public long startForwarding(LearnerHandler handler, long lastSeenZxid) {
// Queue up any outstanding requests enabling the receipt of new requests
if (lastProposed > lastSeenZxid) {
for (Proposal p : toBeApplied) {
if (p.packet.getZxid() <= lastSeenZxid) {
continue;
}
handler.queuePacket(p.packet);
// Since the proposal has been committed we need to send the commit message also
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, p.packet
.getZxid(), null, null);
handler.queuePacket(qp);
}
// Only participant need to get outstanding proposals
if (handler.getLearnerType() == LearnerType.PARTICIPANT) {
List<Long>zxids = new ArrayList<Long>(outstandingProposals.keySet());
Collections.sort(zxids);
for (Long zxid: zxids) {
if (zxid <= lastSeenZxid) {
continue;
}
handler.queuePacket(outstandingProposals.get(zxid).packet);
}
}
}
if (handler.getLearnerType() == LearnerType.PARTICIPANT) {
addForwardingFollower(handler);
} else {
addObserverLearnerHandler(handler);
}
return lastProposed;
}
3.
LeaderZooKeeperServer.setupRequestProcessors构造Processor链时,把Leader的toBeApplied传到ToBeAppliedRequestProcessor的toBeApplied;
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader().toBeApplied);
4.
这里ToBeAppliedRequestProcessor.processRequest判断已经收到这部分提案的ACK则从toBeApplied中删除;
5.3. FinalRequestProcessor.processRequest(Leader执行)
跟单机模式流程一样,更新内存后构造响应返回客户端(5.4
),参考Zookeeper(五)-服务端单机模式-事务请求处理
小结
本节只分析了Leader直接接收请求的流程;
1.
Follower接收事务请求时FollowerRequestProcessor.run中会构造REQUEST包发送给Leader(Learner.request
)进行处理,Leader的LearnerHandler.run中接收到REQUEST会构造Request从Leader的firstProcessor开始进行处理(ZooKeeperServer.submitRequest
),后续流程跟本节一致;
2.
Obverser跟Follower相比请求处理流程都完全一致(非事务请求直接处理/事务请求转发到给Leader处理),唯一不同是Obverser不参与事务提交和选举,与其他节点的唯一交互是接收来自leader的inform消息,更新自己本地存储(其作用主要是为了跨数据中心提升读性能)。