Zab系列博客
Raft Vs Zab
https://www.jianshu.com/p/24307e7ca9da
Zab系列1 核心概念
https://www.jianshu.com/p/76e5dba31ea4
Zab系列2 角色和存储
https://www.jianshu.com/p/d80f9250ffd1
Zab系列3 选举
https://www.jianshu.com/p/0d2390c242f6
Zab系列4 zookeeper特性
https://www.jianshu.com/p/08b62ca1fe4e
Zab系列5 选举恢复(源码分析)
https://www.jianshu.com/p/b6acd99921b7
Zab系列6 zk单机版工作原理
https://www.jianshu.com/p/ed45982b18b4
Zab系列7 集群工作原理Leader篇
https://www.jianshu.com/p/59240c36ba1b
Zab系列8 集群工作原理Follower篇
https://www.jianshu.com/p/8d7c7f1b2838
Zab系列9 消息顺序性
https://www.jianshu.com/p/0aa96b6a2070
概述
Follower完成一个事务请求完整的流程:
- 通过请求转发器,把所有事务请求转发给Leader
- 每个Follower在身份确定之后,开启一个循环,监听和处理Leader发过来的消息(Follower.followLeader()),最终调用processPacket(QuorumPacket)
- follower监听到leader的proposal消息的时候,会将该proposal进行持久化,持久化完成之后,反馈一个ACK消息给leader
- 当leader统计ack过半的时候,再发一个commit的消息过来,follower监听到之后,会把该proposal在本地提交,更新 lastProcessZxid,把事务结果apply到自己的内存树当中
Follower的监听方法followLeader()
循环监听leader的消息,并且处理
void followLeader() throws InterruptedException {
QuorumServer leaderServer = findLeader();
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
}
根据消息的类型的不同,处理的方式不同
- ping消息
- Proposal消息
- commit消息
protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
lastQueued = hdr.getZxid();
case Leader.COMMIT:
fzk.commit(qp.getZxid());
核心流程代码
- 当follower监听到Leader的Proposal消息时,最终会调用 syncProcessor.processRequest(request);
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
pendingTxns.add(request);
syncProcessor.processRequest(request);
}
syncProcessor完成持久化之后之后,根据Follower初始化时,把下一个处理器绑定为SendAckRequestProcessor。
SendAckRequestProcessor再调用learner.writePacket,告知到leader
public void processRequest(Request si) {
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null);
learner.writePacket(qp, false);
}
- leader收集到ack时触发Leader.tryToCommit,当收到过半的Ack后,再广播commit
synchronized public boolean tryToCommit() {
if (!p.hasAllQuorums()) {
return false;
}
commit(zxid);
inform(p);
zk.commitProcessor.commit(p.request);
FollowerZooKeeperServer的责任链拼装方式
有两条责任链:
- 第一条既可以处理非事务请求,又可以处理leaderCommit之后的请求。firstProcessor :FollowerRequestProcessor -->CommitProcessor-->FinalRequestProcessor
- 第二条是用来申明SyncRequestProcessor的下一个处理器为SendAckRequestProcessor的,这个在leader发起proposal时会用到
FollowerZooKeeperServer.setupRequestProcessors()
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
参考
这个大佬很牛逼,一篇博客讲明白了整个事务操作的流程,以及各个Processor的各个细节
https://www.jianshu.com/p/64f6e8124625
大佬视角很高,简单明了
https://www.jianshu.com/p/a8b5783eec63