对于客户端消息,zk创建了一系列的RequestProcessor
来对消息进行链式处理。zk服务承担不同角色时,消息处理链是不同的。
一、Standalone
PrepRequestProcessor ->SyncRequestProcessor ->FinalRequestProcessor
PrepRequestProcessor:
主要工作是做check,并在请求中填充一些必须的数据:
校验session是否合法,检查是否有acl权限,对于创建目录的请求,检查父节点是否是临时节点等。。
对于写操作,给请求附加上
TxnHeader
(带请求时间,zxid等)。
...
SyncRequestProcessor
逻辑比较多,如在PrepRequestProcessor
所说的,对于会改变数据内容的请求或者事务请求,zk会在request头部增加一个TxnHeader。
1.对于带TxnHeader的消息,会附加到日志中,并增加到SyncRequestProcessor#toFlush
队列中,当toFlush超限,或者从上游RequestProcessor中获取不到新的消息时,开始做flush。
flush:
将日志整体flush到磁盘
逐一将消息转给下个RequestProcessor
2.对于存粹的读请求,可以直接放行给下个RequestProcessor,但是有要求,toFlush
需要是空的,这个比较好理解,如果toFlush
不为空,那么可能之前有写操作改变数据库内容,导致脏读。
3.当日志数量超过一个随机值,(用随机值的目的是为了让zk集群不同时dump快照),创建一个线程固化一个快照到磁盘。
FinalRequestProcessor
这是最后一步了,主要操作就是对zk数据库的增删改查。注意到,单机系统中,可以直接改,但是在集群中,leader需要等过半节点确认过了之后才可以修改。同时附加下面两部操作:
1.维护一个内存日志队列,作用是便于leader给掉线不久的节点同步DIFF数据。
2.组装response报文返回给客户端
二、Leader
LeaderRequestProcessor->PrepRequestProcessor ->ProposalRequestProcessor -> CommitProcessor-> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor
ProposalRequestProcessor
提交议案的处理器,主要是针对写请求;对于写操作,将这个请求广播给所有follower。
同时ProposalRequestProcessor内部有一个SyncRequestProcessor->AckRequestProcessor处理链,处理follower的回复,当ack过半时,commit这次request,CommitProcessor才会继续执行。
CommitProcessor
顾名思义,这个处理器时处理需要决议的请求内容,对于读请求直接放行。其内部有3个队列:
CommitProcessor#queuedRequests
消息队列,存放所有从ProposalRequestProcessor到达的消息。
CommitProcessor#pendingRequests
等待队列,CommitProcessor#queuedRequests中的请求分为两类,读请求直接放行,写请求会放到这个队列中去。
CommitProcessor#committedRequests
存放已提交的消息,和上文中的ProposalRequestProcessor联系起来,当收到过半follower的ack回复时,会有如下的函数调用发生:AckRequestProcessor#processRequest->Leader#tryToCommit->CommitProcessor#commit.
Leader.ToBeAppliedRequestProcessor
代码很简单,就是把CommitProcessor已确认的消息清除了,不太明白有什么作用,可能就是做个记录。
三、Follower
Follower在确定了自己为follower之后,会从leader处同步log(全量的情况是snapshot),同步完成以后,再启动下面的处理器链:
FollowerRequestProcessor->CommitProcessor->FinalRequestProcessor
FollowerRequestProcessor
主要任务就是调用Learner#request
将写请求转发给leader
四、实例举例
假设有一条客户端的写请求发给了Follower节点A。 A会在FollowerRequestProcessor#run中将请求发给leader,Leader接收到这条写请求,在ProposalRequestProcessor中将消息进行广播,follower节点收到消息以后,在选举线程调用FollowerZooKeeperServer#logRequest
将日志写到本地,然后SyncRequestProcessor对日志进行提交后通过下个处理器SendAckRequestProcessor
给leader返回ack。
leader收到过半回复,对这条日志进行提交固化到磁盘。同时同步给follower们,过程如下:产生一条携带原始zxid的Leader.COMMIT
类型的消息,先放入LearnerHandler#queuedPackets
(每个follower均会有一个LearnerHandler实例对接),LearnerHandler#run
将队列中的记录发给follower.
follower对消息进行提交,存放到CommitProcessor#committedRequests
,接着follower的处理链CommitProcessor调用下个处理器将消息存入zkdb中,最后返回给客户端结果。