来源: https://www.cnblogs.com/f1194361820/p/5519227.html
https://www.cnblogs.com/duanxz/p/3783266.html
zookeeper结构图
创建ZooKeeper对象时,应会创建一个ClientCnxn(代表了客户端连接对象)。与此同时启动了两个线程:SendThread、EventThread。两个队列:outgoingQueue和pendingQueue。
模块:
我们可以认为ZK的Client由三个主要模块组成:Zookeeper, WatcherManager, ClientCnxn
Zookeeper是ZK Client端的真正接口,用户可以操作的最主要的类,当用户创建一个Zookeeper实例以后,几乎所有的操作都被这个实例包办了,用户不用关心怎么连接到Server,Watcher什么时候被触发等等令人伤神的问题。
WatcherManager,顾名思义,它是用来管理Watcher的,Watcher是ZK的一大特色功能,允许多个Client对一个或多个ZNode进行监控,当ZNode有变化时能够通知到监控这个ZNode的各个Client。我们把一个ZK Client简单看成一个Zookeeper实例,那么这个实例内部的WatcherManager就管理了ZK Client绑定的所有Watcher。
ClientCnxn是管理所有网络IO的模块,所有和ZK Server交互的信息和数据都经过这个模块,包括给ZK Server发送Request,从ZK Server接受Response,以及从ZK Server接受Watcher Event。ClientCnxn完全管理了网络,从外部看来网络操作是透明的。
线程:
每当我们创建一个Zookeeper实例的时候,会有两个线程被创建:SendThread和EventThread。所以当我们使用ZK Client端的时候应该尽量只创建一个Zookeeper实例并反复使用。大量的创建销毁Zookeeper实例不仅会反复的创建和销毁线程,而且会在Server端创建大量的Session。
SendThread是真正处理网络IO的线程,所有通过网络发送和接受的数据包都在这个线程中处理。这个线程的主体是一个while循环:
while (zooKeeper.state.isAlive()) {
try {
if (sockKey == null) {
// don’t re-establish connection if we are closing
if (closing) {
break;
}
startConnect();
lastSend = now;
lastHeard = now;
}
… ….
selector.select(to);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
now = System.currentTimeMillis();
for (SelectionKey k : selected) {
… …
if (doIO()) {
lastHeard = now;
}
… …
}
}
catch() {
… …
}
}
这里用了java的nio功能,当selector侦测到事件发生的时候就会触发一次循环,主要的操作会在doIO()里面完成:
boolean doIO() throws InterruptedException, IOException {
boolean packetReceived = false;
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException(“Socket is null!”);
}
if (sockKey.isReadable()) {
… …
}
if (sockKey.isWritable()) {
… …
}
if (outgoingQueue.isEmpty()) {
disableWrite();
} else {
enableWrite();
}
return packetReceived;
}
这个过程大概是这样的:
如果有数据可读,则读取数据包,如果数据包是先前发出去的Request的Response,那么这个数据包一定在Pending Queue里面。将它从Pending Queue里面移走,并将此信息添加到Waiting Event Queue 里面,如果数据包是一个Watcher Event,将此信息添加到Waiting Event Queue里面。
如果OutgoingQueue里面有数据需要发送,则发送数据包并把数据包从Outgoing Queue移至Pending Queue,意思是数据我已经发出去了,但还要等待Server端的回复,所以这个请求现在是Pending 的状态。
另外一个线程EventThread是用来处理Event的。前面提到SendThread从Server收到数据的时候会把一些信息添加到Event Thread里面,比如Finish Event和Watcher Event。EventThread就是专门用来处理这些Event的,收到Finish Event的时候会把相对应的Package置成Finish状态,这样等待结果的Client函数就能得以返回。收到Watcher Event的时候会联系WatcherManager找到相对应的Watcher,从WatcherManager里面移除这个Watcher(因为每个Watcher只会被通知一次) 并回调Watcher的process函数。所以所有Watcher的process函数是运行在EventThread里面的。
保持连接:
到目前为止应该已经大概介绍了ZK Client端的大致结构和处理流程。还剩下一个问题就是当网络出问题时ZK Client是如何处理的。其实这个过程并不复杂,大概是执行以下步骤:
网络发生故障,网络操作抛出的异常被捕获。
确认网络操作失败,清除当前与Server相关的网络资源,包括Socket等等。
在Server列表中逐个尝试链接Server。
这个过程从外界看来是透明的,外界并不会觉察到ZK Client已经悄悄地更换了一个连接的Server。
注意点, SendThread 处理流程
版本 zk3.5.5
public void run() {
....
// don't re-establish connection if we are closing
// determine whether we need to send an AuthFailed event.
// An authentication error occurred during authentication with the Zookeeper Server.
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this); // 主要是这里,这里会发送
...
}
clientCnxnSocket 是一个借口, 有两个实现类ClientCnxnSocketNIO 和 ClientCnxnSocketNetty
如何选择NIO的,(注意这里默认选择NIO)
ClientCnxnSocketNIO .doTransport()
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut); // nio的selector 选择器选择
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue,
sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
ClientCnxnSocketNetty,
ClientCnxnSocketNetty implements ClientCnxnSocket abstract methods.
It's responsible for connecting to server, reading/writing network traffic and
being a layer between network data and higher level packets.
这是负责为联机阅读/写作到服务器,和网络交流数据包。
@Override
void doTransport(int waitTimeOut,
List<Packet> pendingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
try {
if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
Packet head = null;
if (needSasl.get()) {
if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
return;
}
} else {
head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
}
// check if being waken up on closing.
if (!sendThread.getZkState().isAlive()) {
// adding back the packet to notify of failure in conLossPacket().
addBack(head);
return;
}
// channel disconnection happened
if (disconnected.get()) {
addBack(head);
throw new EndOfStreamException("channel for sessionid 0x"
+ Long.toHexString(sessionId)
+ " is lost");
}
if (head != null) {
doWrite(pendingQueue, head, cnxn);
}
} finally {
updateNow();
}
}
同步调用:
同步调用,就是客户端成功发送请求后,才继续执行。例如:zk.create(path,data,acl,createMode);
这行代码会发起一个同步调用。一个线程A执行这个create时,会创建一个表示create动作的packet,放到数据发送队列outgoingQueue。之后当线程A就开始等待,直到SendThread线程从outgoingQueue队列取出该packet,并将其成功发送(已收到服务端的回应为准)。然后线程A才继续执行。
异步调用:
异步调用,就是客户端不会管请求是否发送成功,都会继续执行。例如:zk.create(path,data,acl,createMode,stringCallback);
这行代码会发起一个异步调用。一个线程A执行这个create时,会创建一个表示create动作的packet,放到数据发送队列outgoingQueue。线程A接着就去执行下一行代码了,而不会去管数据packet是否由SendThread线程发送到服务端了。
SendThread的职责:
1:创建一个长连接,用于会话保持
通过周期性的发送ping packet到当前连接的ZooKeeper服务器实例。这个过程,我们通常称为心跳。每当客户端与服务端的连接断开后,会自动重新连接到下一个服务器。如果断开的是最后与一个服务器的连接,那么会重新连接到第一个服务器。
2:使用这个长连接与服务器通信
1)发送客户调用
不断的从outgoingQueue取出packet发给服务端。当发送的是Client的同步调用的packet,则在发送packet后,立即通知客户端同步调用线程继续执行。当发送的是Client的异步调用,则会将packet发给服务端,并保存到pendingQueue。当从服务端发回响应后,生成一个packet 完成事件交给EventThread,由Event执行CallBack调用。
2)处理服务端响应
对服务端响应反序列化后,根据响应分类进行处理如下:
Ping的响应:不做处理
认证失败的响应:创建认证失败的WatchedEvent,并将event交给EventThread处理。
服务端的数据变更通知:生成相应的数据变更WatchedEvent,并将event交给EventThread处理。
-
服务端对调用的回应:不论是同步调用还是异步调用,服务端都会给出回应。收到此类回应后,先是将watcher放到watcherManager中。然后对同步、异步做后续处理。
如果是同步调用,则通知发起调用的线程继续处理。 如果是异步调用,则将该packet交由EventThread来处理。例如对create、delete、exists、getData、getChildren方法调用的响应。
EventThread做了什么事呢?
从上述描述中,也可以看出EventThread用于对接收到的packet或者event进行处理:
- 如果是event,则从WatcherManager中取出相应的Watcher进行处理。
- 如果是packet,则执行相关联的AsyncCallback。
通过源码的阅读,知道在使用ZooKeeper客户端时要注意以下两点:
- 在进行Watcher回调时,会从WatchManager取出与相关path关联的多个Watcher(此时WatchManager中就不会再有这个path相关的Watcher了),然后串行的调用这多个Watcher#process方法。所以在编程时,会根据业务的需要,有可能会反复注册Watcher。
- 另外因为多个Watcher的调用是串行的,所以不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调。
好了,对于ZK Client的介绍大概就这么多了,希望这样的介绍对于大家学习和使用Zookeeper有一些帮助。对于文章中没有介绍或者没有说清楚的地方需要进一步查看源码来解决。
PS: 若你觉得可以、还行、过得去、甚至不太差的话,可以“关注”或者“点赞”一下,就此谢过!