Zookeeper之集群数据同步源码分析

在完成领导者选举之后,确定服务器角色之后,需要进行数据同步。然后才能构建请求处理链RequestProcessor处理请求

集群数据同步

image.png

整体流程:当角色确立之后,leader调用leader.lead();方法运行,创建一个接收连接的LearnerCnxAcceptor线程,在LearnerCnxAcceptor线程内部又建立一个阻塞的LearnerCnxAcceptorHandler线程等待Learner端的连接。Learner端以follower为例,follower调用follower.followLeader();方法首先查找leader的Socket服务端,然后建立连接。当follower建立连接后,leader端会建立一个LearnerHandler线程相对应,用来处理follower与leader的数据包传输。
1、follower端封装当前zk服务器的Zxid和Leader.FOLLOWERINFO的LearnerInfo数据包发送给leader
2、leader端这时处于getEpochToPropose方法的阻塞时期,需要得到Learner端超过一半的服务器发送Epoch
3、getEpochToPropose解阻塞之后,LearnerHandler线程会把超过一半的Epoch与leader比较得到最新的newLeaderZxid,并封装成Leader.LEADERINFO包发送给Learner端
4、Learner端得到最新的Epoch,会更新当前服务器的Epoch。并把当前服务器所处的lastLoggedZxid位置封装成Leader.ACKEPOCH发送给leader
5、此时leader端处于waitForEpochAck方法的阻塞时期,需要得到Learner端超过一半的服务器发送EpochACK
6、当waitForEpochAck阻塞之后便可以在LearnerHandler线程内决定用那种方式进行同步。如果Learner端的lastLoggedZxid>leader端的,Learner端将会被删除多余的部分。如果小于leader端的,将会以不同方式进行同步
7、leader端发送Leader.NEWLEADER数据包给Learner端(6、7步骤都是另开一个线程来发送这些数据包)
8、Learner端同步之后,会在一个while循环内处理各种leader端发送数据包,包括两阶段提交的Leader.PROPOSAL、Leader.COMMIT、Leader.INFORM等。在同步数据后会处理Leader.NEWLEADER数据包,然后发送Leader.ACK给leader端
9、此时leader端处于waitForNewLeaderAck阻塞等待超过一半节点发送ACK。

当角色确立之后,需要结合着看

leader调用leader.lead();方法


image.png

follower调用follower.followLeader();方法


image.png

observer调用observer.observeLeader();方法
image.png

Leader

首先看看Leader做了些什么
org.apache.zookeeper.server.quorum.Leader#lead
1、loadData加载数据


image.png

2、创建LearnerCnxAcceptor线程,接收followers的连接


image.png

①、LearnerCnxAcceptorHandler接收连接线程
这里加了CountDownLatch根据配置的同步的地址的数量(例如:server.2=127.0.0.1:12882:13882 配置同步的端口是12882只有一个),所以serverSockets大小是一个
org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor#run
等待其他follower或observer连接
image.png

②、LearnerCnxAcceptorHandler
这里接收到连接后,便会调用latch.countDown()解阻塞
org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor.LearnerCnxAcceptorHandler#run


image.png

③、acceptConnections
在这里阻塞接收followers的连接,当有连接过来会生成一个socket对象。然后根据当前socket生成一个LearnerHandler线程
org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor.LearnerCnxAcceptorHandler#acceptConnections
每个Learner者都会开启一个LearnerHandler线程
image.png

④、LearnerHandler
org.apache.zookeeper.server.quorum.LearnerHandler#run
这里就是读取或写数据包与Learner交换数据包。如果没有数据包读取,则会阻塞当前方法ia.readRecord(qp, "packet");
image.png

3、getEpochToPropose
org.apache.zookeeper.server.quorum.Leader#lead
Leader节点在开启LearnerHandler线程之后会继续调用getEpochToPropose方法
image.png

org.apache.zookeeper.server.quorum.Leader#getEpochToPropose
判断connectingFollowers发给leader端的Epoch是否过半,如果过半则会解阻塞


image.png

如果不过半则会一直阻塞在这里,直到Follower把自己的Epoch数据包发送过来并符合过半机制
image.png

4、发送的Epoch过半之后,把当前zxid设置到zk
image.png

5、waitForEpochAck
等待EpochAck
image.png

org.apache.zookeeper.server.quorum.Leader#waitForEpochAck
是electingFollowers符合过半后唤醒,在Follower发送过半的Leader.ACKEPOCH数据包
image.png

image.png

6、等待EpochAck解阻塞后
把得到最新的epoch更新到当前服务,设置当前leader节点的zab状态是SYNCHRONIZATION
image.png

7、等待NewLeaderAck
等待同步数据结束
org.apache.zookeeper.server.quorum.Leader#waitForNewLeaderAck
image.png

image.png

8、startZkServer
启动zk服务端,并设置ZAB是ZabState.BROADCAST
image.png

org.apache.zookeeper.server.quorum.Leader#startZkServer
reconfigEnabled开启
image.png

启动ZkServer服务端,更新currentVote、设置数据树dataTree.lastProcessedZxid
image.png

9、ping和不超过过半停止运行
image.png

image.png

image.png

LearnerHandler

LearnerHandler线程是对应于Learner连接Leader端后,建立的一个与Learner端交换数据的线程。每一个Learner端都会创建一个
org.apache.zookeeper.server.quorum.LearnerHandler#run
1、readRecord读取数据包
不断从learner节点读数据,如果没读到将会阻塞readRecord


image.png

2、FOLLOWERINFO或OBSERVERINFO的数据包
如果数据包类型不是Leader.FOLLOWERINFO或Leader.OBSERVERINFO将会返回


image.png

3、获取到learnerInfoData
image.png

4、得到followerInfo和lastAcceptedEpoch
image.png

5、获取newEpoch通过getEpochToPropose方法
image.png

6、发送Leader.LEADERINFO数据包和waitForEpochAck


image.png

7、syncFollower同步
needSnap表示是否需要快照同步
image.png

org.apache.zookeeper.server.quorum.LearnerHandler#syncFollower
如果是新机器启动isPeerNewEpochZxid=0
image.png

8、数据树DataTree同步到其他节点
反序列化serializeSnapshot 数据树DataTree到其他节点
image.png

9、Leader.NEWLEADER数据包
把Leader.NEWLEADER数据包放入到queuedPackets
image.png

10、startSendingPackets开始发送数据包
org.apache.zookeeper.server.quorum.LearnerHandler#startSendingPackets
image.png

11、waitForNewLeaderAck


image.png

12、发送Leader.ACK数据包
image.png

image.png

Follower

然后再看看Follower节点做了些那些事情
org.apache.zookeeper.server.quorum.Follower#followLeader
1、这里首先会寻找领导者leader的findLeader


image.png

org.apache.zookeeper.server.quorum.Learner#findLeader
根据投票信息currentVote得到那台是领导者


image.png

连接到leader端
image.png

2、registerWithLeader
在建立连接后,会把follower的信息发送给lead。这些信息包括zxid(就是epoch届数)和sid。目的就是为了统一Epoch

org.apache.zookeeper.server.quorum.Learner#registerWithLeader


image.png

image.png

当有数据包传输,LearnerHandler的readRecord将会被解阻塞。会获取到这个数据包
org.apache.zookeeper.server.quorum.LearnerHandler#run
image.png

获取到follower节点的followerInfo信息之后,便会调用到getEpochToPropose方法。
org.apache.zookeeper.server.quorum.LearnerHandler#run
image.png

3、把得到的Epoch数据包发送
此时数据包类型参数是Leader.LEADERINFO,newLeaderZxid最新的Leader的Zxid
image.png

4、读取lead返回的Leader.LEADERINFO数据包
org.apache.zookeeper.server.quorum.Learner#registerWithLeader
image.png

5、向leader写Leader.ACKEPOCH数据包
org.apache.zookeeper.server.quorum.Learner#registerWithLeader


image.png

6、syncWithLeader
设置当前Follower的ZAB状态是SYNCHRONIZATION
image.png

org.apache.zookeeper.server.quorum.Learner#syncWithLeader
image.png

image.png

Leader.TRUNC删除超过的数据zxid
image.png

7、Leader.NEWLEADER数据包
发送Leader.ACK数据包给leader
image.png

数据同步

image.png

同步数据可以从CommittedLog(提交的日志)、TxnLog日志记录、ZKDatabase zk内存数据数DataTree进行同步
TxnLog日志记录和zk内存数据数DataTree是全的,而CommittedLog是只有一部分
org.apache.zookeeper.server.quorum.LearnerHandler#syncFollower


image.png

这里首先判断这台Learner是否第一次启动,里面是否没有任何数据。如果没有isPeerNewEpochZxid=0
把同步的位置peerLastZxid赋值给currentZxid。默认情况下needSnap=true进行快照同步


image.png

获取CommittedLog最大maxCommittedLog和最小的minCommittedLog位置,并且获取DataTree的lastProcessedZxid
image.png

如果CommittedLog是空的,则把DataTree的lastProcessedZxid赋值给最大maxCommittedLog和最小的minCommittedLog
1、forceSnapSync和lastProcessedZxid == peerLastZxid
image.png

forceSnapSync=true是强制快照同步,这里不符合。
lastProcessedZxid == peerLastZxid说明Leader节点与需要同步Learner节点的相等,则不需要同步。Leader.DIFF类型数据包放入queuedPackets队列中
2、peerLastZxid>maxCommittedLog和minCommittedLog<peerLastZxid<maxCommittedLog


image.png

如果Learner节点需要同步的的位置比Leader节点的maxCommittedLog还大,说明Learner节点数据比较多(可能peerLastZxid=120),要把这部分多的给删除掉(100到120)。发送Leader.TRUNC数据包到queuedPackets队列中,把超过maxCommittedLog的都给删了
如果同步的位置peerLastZxid在maxCommittedLog和minCommittedLog之间(peerLastZxid=85),可以通过CommittedLog直接进行同步。只需要同步peerLastZxid到maxCommittedLog最大位置zxid的数据(85到100)。
image.png

3、peerLastZxid<minCommittedLog
如果同步的位置比CommittedLog最小的minCommittedLog还要小(peerLastZxid=60),那么将会要从TxnLog日志中同步peerLastZxid到minCommittedLog和CommittedLog的所有日志。如果TxnLog日志也不全,那么只能通过快照同步needSnap=true,这里没有把needSnap设置为false
image.png

org.apache.zookeeper.server.quorum.LearnerHandler#run
image.png

needSnap=true就是把数据树DataTree反序列封装成Leader.SNAP数据包发送给Learner节点。

committedLog

committedLog表示处理过的写请求。committedLog不是一个无限的队列commitLogCount表示存储多少个默认commitLogCount=DEFAULT_COMMIT_LOG_COUNT=500,用两个值minCommittedLog、maxCommittedLog表示这个区间[minCommittedLog,maxCommittedLog],如果超过commitLogCount大小committedLog将会被替换。committedLog的添加是在FinalRequestProcessor处理的
1、processRequest
org.apache.zookeeper.server.FinalRequestProcessor#processRequest


image.png

2、applyRequest
这里也是更新ZkDataBase,触发Watch,集群模式下把Request添加到committedLog
org.apache.zookeeper.server.FinalRequestProcessor#applyRequest


image.png

3、processTxn
org.apache.zookeeper.server.ZooKeeperServer#processTxn(org.apache.zookeeper.server.Request)
image.png

boolean quorumRequest = request.isQuorum();
image.png

image.png

4、addCommittedProposal
org.apache.zookeeper.server.ZKDatabase#addCommittedProposal(org.apache.zookeeper.server.Request)
把协议Proposal添加到committedLog队列中,


image.png

总结:

Zookeeper集群数据同步是在确立角色之后进行的,其关键步骤是通过Epoch和EpochACK的过半验证阻塞之后进行的。在同步之后也需要收到NewLeaderAck超过一半才能正常启动初始化zookeeper的Leader端。
同步的原理是根据Zxid所处的位置来寻找最佳的同步方式

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350