在前面的文章中,我们分析了Cluster模式的启动过程,以及Zab的实现.
在这篇文章中,我们会详细介绍,ZooKeeper是如何处理每个请求的.
过程
在ZooKeeper源码解析(3)-Cluster启动过程解析这篇文章中,我们介绍了,Cluster模式在启动时,会创建一个用户创建ServerCnxn的ServerCnxnFactory.默认情况下是NIOServerCnxnFactory.
这不,在这里,我们再一次用到了NIOServerCnxnFactory.
在NIOServerCnxnFactory的run()方法中,
我们可以看到,NIOServerCnxnFactory会根据不同的事件来做不同的事:
- 如果是SelectionKey.OP_ACCEPT事件,代表客户端请求建立一个新的连接,那么就创建一个NIOServerCnxn并且将其注册到Selector中
- 如果是SelectionKey.OP_READ | SelectionKey.OP_WRITE事件,那么就找到对应的NIOServerCnxn并执行其doIO()方法.
我们来具体看一下NIOServerCnxn的doIO()方法的实现.
在这个方法中,会判断目前发生的到底是SelectionKey.OP_READ事件还是SelectionKey.OP_WRITE事件.
我们先看一下,当发生的是SelectionKey.OP_READ事件时,会如何处理.
其实这里还是蛮难理解的.特别是readLength()这个函数.
其实这段代码的功能,我在注释里已经写的很清楚了.
这段代码的功能,总的来说,就是:
- 判断客户端执行的是不是monitor command
- 如果是monitor command,那么在readLength()函数中就进行处理
- 如果是其他的命令,那么就处理用户请求
这里,我们需要清楚什么是monitor command.
monitor command就是监控命令,就是能够查看Server此时状态的一些命令.这些命令都是四个字符的.
从ZooKeeper Administrator's Guide中,我们可以看到,有这么一些Monitor Command:
全部的Monitor Command请参考这个列表.
我们可以看到,其中有两个很重要的变量,一个是incomingBuffer,一个是lenBuffer.
我们看一下其定义:
我们可以看到,它们开始都是一个capacity为4的ByteBuffer.
我们可以看到,当有读事件时,先将数据的前四个字节读入incomingBuffer.
我们可以看到,下面有一个if block,判断语句为是否incomingBuffer == lenBuffer,而从上面的定义中,我们看到,incomingBuffer就是== lenBuffer.
其实这里的重点是readLength()方法.
在readLength()方法中,我们可以看到,如果前面的条件都通过,那么最后会重新初始化incomingBuffer.
其实这段代码,只要记住我上面说的功能,就很容易理解了.
这里也吐槽一下,起的名字这么难理解,而且感觉简单的事情让它给搞复杂了.
如果是Monitor Command,那么在readLength()方法以及后续的方法中,就对它进行处理了.
而如果不是Monitor Command,就要进入到readPayload()方法中处理.
readPayload()方法的主体如下:
我们可以看到,进行如下处理:
- 如果此时Server还未初始化完成,那么就调用readConnectRequest()方法对ConnectRequest进行处理,然后将initialized设置为true.当然,这些都是readConnectRequest()方法内部做的事情.
- 如果Server已经初始化完成,那么就调用readRequest()方法进行处理
我们可以看到,在Server未初始化完成之前,还是可以处理ConnectRequest的.
这里我们不深入去看readConnectRequest()的源码,请读者自行查看源码来理解.
我们重点关注readRequest()方法.
在这个方法内部,会调用ZooKeeperServer的processPacket()方法.
而在processPacket()方法内部,会调用submitRequest()方法,让ZooKeeperServer中的RequestProcessor对请求进行处理.
我们看一下ZooKeeperServer的submitRequest()方法的主体:
那么,RequestProcessor是什么时候被配置的呢?
它是在启动ZooKeeperServer的时候,配置的.
我们看一下ZooKeeper默认配置的RequestProcessor都有哪些.
我们可以看到,其中并没有用于进行Zab算法提议的RequestProcessor.
还记得我们在ZooKeeper源码解析(3)-Cluster启动过程解析这篇文章中,提到的,Server在发现自己是Leader之后,会启动一个LeaderZooKeeperServer吗?
没错,就是这里.
我们看看,LeaderZooKeeperServer都设置了哪些RequestProcessor.
我们可以看到,其中就有用于Zab算法的提议的RequestProcessor.
关于不同的RequestProcessor的介绍,我们会在下一篇文章中介绍.
我们这里主要关注PreRequestProcessor.
我们可以看到,它的processRequest()方法的实现,特别简单:
就是非常简单的添加到submittedRequests这个集合中.
那么何时来处理呢?
其实PreRequestProcessor是一个线程,在它启动后,它就会不断从submittedRequest中读取数据,并处理:
主要的处理函数就是pRequest()方法,其实现如下:
上面我们主要介绍了读事件的处理过程,那么写事件呢?
过程也很简单,就是添加一些监控信息,然后将数据发送给客户端就好了.