zk源码阅读20:zk client之网络I/O(四) ClientCnxn.SendThread

概述

SendThread是客户端ClientCnxn内部的一个核心I/O调度线程,用于管理客户端与服务端之间的所有网络I/O操作,在Zookeeper客户端实际运行中,SendThread的作用如下:

  1. 维护了客户端与服务端之间的会话生命周期(通过一定周期频率内向服务端发送PING包检测心跳),如果会话周期内客户端与服务端出现TCP连接断开,那么就会自动且透明地完成重连操作。
  2. 管理了客户端所有的请求发送和响应接收操作,其将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。
  3. 将来自服务端的事件传递给EventThread去处理。

源码

属性

sendThread属性

意义如下

字段 意义
lastPingSentNs 上一次ping的 nano time
clientCnxnSocket 通信层ClientCnxnSocket
r 随机数生成器
isFirstConnect 是否第一次connect
rwServerAddress 读写server地址
minPingRwTimeout 最短ping 读写server的 timeout时间
maxPingRwTimeout 最长ping 读写server的 timeout时间
pingRwTimeout 默认ping 读写server的 timeout时间
saslLoginFailed sasl登录失败
RETRY_CONN_MSG 日志

函数

SendThread函数

简要介绍如下

方法 作用 备注
SendThread 构造函数
readResponse 读取server的回复,进行outgoingQueue以及pendingQueue的相关处理,事件触发等等 重要
getZkState 或者client状态
getClientCnxnSocket 获取通信层clientCnxnSocket
primeConnection 主要连接函数,用于将watches和authData传给server,允许clientCnxnSocket可读写 重要
prependChroot 根据clientPath以及chrootPath得到serverPath
sendPing ping命令,记录发出时间,生成请求,加入outgoingQueue待发送 重要
startConnect 开始连接,主要是和server的socket完成connect和accept 重要
logStartConnect log
run 线程方法,完成了连接验证,超时检测,ping命令,以及网络IO 重要
pingRwServer ping读写server 重要
cleanup socket清理以及通知两个queue失去连接 以及 清理两个队列
onConnected 读取server的connect response后,设置相关参数 重要
close 关闭socket
testableCloseSocket
clientTunneledAuthenticationInProgress 是否在验证sasl
sendPacket 发送packet

将几个重要的函数进行源码讲解


readResponse方法

可以拆成几部分,分别完成

1.处理ping命令,AuthPacket,WatcherEvent,验证sasl并返回
2.从pendingQueue取出packet进行验证(有顺序保证)
3.调用finishPacket完成AsyncCallBack处理以及watcher的注册

第一部分代码如下

    ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            replyHdr.deserialize(bbia, "header");//反序列化出 回复头
            if (replyHdr.getXid() == -2) {
                // -2 is the xid for pings
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got ping response for sessionid: 0x"
                            + Long.toHexString(sessionId)
                            + " after "
                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
                            + "ms");
                }
                return;
            }
            if (replyHdr.getXid() == -4) {
                // -4 is the xid for AuthPacket               
                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    state = States.AUTH_FAILED;                    
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                            Watcher.Event.KeeperState.AuthFailed, null) ); //加入eventThread
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got auth sessionid:0x"
                            + Long.toHexString(sessionId));
                }
                return;
            }
            if (replyHdr.getXid() == -1) {//-1代表通知类型 即WatcherEvent

                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");//反序列化WatcherEvent

                // convert from a server path to a client path
                if (chrootPath != null) {//把serverPath转化成clientPath
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                        LOG.warn("Got server path " + event.getPath()
                                + " which is too short for chroot path "
                                + chrootPath);
                    }
                }

                WatchedEvent we = new WatchedEvent(event);//WatcherEvent还原成WatchedEvent
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                eventThread.queueEvent( we );//加入eventThread
                return;
            }

            // If SASL authentication is currently in progress, construct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (clientTunneledAuthenticationInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia,"token");
                zooKeeperSaslClient.respondToServer(request.getToken(),
                  ClientCnxn.this);
                return;
            }

可以看出都是直接return的

第2,3部分代码如下

    Packet packet;//auth和ping以及正在处理的sasl没有加入pendingQueue,触发的watch也没有在pendingQueue中(是server主动发过来的),而AsyncCallBack在(见finishPacket)
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "
                            + replyHdr.getXid());
                }
                packet = pendingQueue.remove();//得到了response,从pendingQueue中移除
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(
                            KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "
                            + replyHdr.getXid() + " with err " +
                            + replyHdr.getErr() +
                            " expected Xid "
                            + packet.requestHeader.getXid()
                            + " for a packet with details: "
                            + packet );
                }

                packet.replyHeader.setXid(replyHdr.getXid());//设置replyHeader
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reading reply sessionid:0x"
                            + Long.toHexString(sessionId) + ", packet:: " + packet);
                }
            } finally {
                finishPacket(packet);
            }

这里可以看出
auth和ping以及正在处理的sasl没有加入pendingQueue,触发的watch也没有在pendingQueue中(是server主动发过来的),而AsyncCallBack在pendingQueue中(见finishPacket)


primeConnection方法

primeConnection主要完成

  根据之前是否连接过设置sessId以及生成ConnectRequest
  根据disableAutoWatchReset将已有的watches和authData以及放入outgoingQueue准备发送
  允许clientCnxnSocket可读写,表示和server准备IO

代码如下

    void primeConnection() throws IOException {
            LOG.info("Socket connection established to "
                     + clientCnxnSocket.getRemoteSocketAddress()
                     + ", initiating session");
            isFirstConnect = false;
            long sessId = (seenRwServerBefore) ? sessionId : 0;//如果之前见过读写server就设置sessionId,否则默认0
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);//生成ConnectRequest
            synchronized (outgoingQueue) {
                // We add backwards since we are pushing into the front
                // Only send if there's a pending watch
                // TODO: here we have the only remaining use of zooKeeper in
                // this class. It's to be eliminated!
                if (!disableAutoWatchReset) {
                    List<String> dataWatches = zooKeeper.getDataWatches();
                    List<String> existWatches = zooKeeper.getExistWatches();
                    List<String> childWatches = zooKeeper.getChildWatches();
                    if (!dataWatches.isEmpty()
                                || !existWatches.isEmpty() || !childWatches.isEmpty()) {

                        Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();//根据chrootPath转化成serverPath
                        Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                        Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                        long setWatchesLastZxid = lastZxid;

                        while (dataWatchesIter.hasNext()
                                       || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
                            List<String> dataWatchesBatch = new ArrayList<String>();
                            List<String> existWatchesBatch = new ArrayList<String>();
                            List<String> childWatchesBatch = new ArrayList<String>();
                            int batchLength = 0;

                            // Note, we may exceed our max length by a bit when we add the last
                            // watch in the batch. This isn't ideal, but it makes the code simpler.
                            while (batchLength < SET_WATCHES_MAX_LENGTH) {//限定长度
                                final String watch;
                                if (dataWatchesIter.hasNext()) {
                                    watch = dataWatchesIter.next();
                                    dataWatchesBatch.add(watch);
                                } else if (existWatchesIter.hasNext()) {
                                    watch = existWatchesIter.next();
                                    existWatchesBatch.add(watch);
                                } else if (childWatchesIter.hasNext()) {
                                    watch = childWatchesIter.next();
                                    childWatchesBatch.add(watch);
                                } else {
                                    break;
                                }
                                batchLength += watch.length();
                            }

                            SetWatches sw = new SetWatches(setWatchesLastZxid,
                                    dataWatchesBatch,
                                    existWatchesBatch,
                                    childWatchesBatch);//设置watches
                            RequestHeader h = new RequestHeader();
                            h.setType(ZooDefs.OpCode.setWatches);
                            h.setXid(-8);
                            Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
                            outgoingQueue.addFirst(packet);//加入发送队列
                        }
                    }
                }

                for (AuthData id : authInfo) {
                    outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                            OpCode.auth), null, new AuthPacket(0, id.scheme,
                            id.data), null, null));//authInfo加入发送队列
                }
                outgoingQueue.addFirst(new Packet(null, null, conReq,
                            null, null, readOnly));//ConnectRequest确保在发送队列的第一个
            }
            clientCnxnSocket.enableReadWriteOnly();//开启读写,这样outgoingQueue内容就可以发出去了
            if (LOG.isDebugEnabled()) {
                LOG.debug("Session establishment request sent on "
                        + clientCnxnSocket.getRemoteSocketAddress());
            }
        }

主要注意:
1.sessId:如果之前连接过,那么重连用之前的sessionId,否则默认0,重连参见ClientCnxn.SendThread#startConnect的调用
2.什么时候连接会有watches需要去注册?重连且disableAutoWatchReset为false的时候
3.ConnectRequest是放在outgoingQueue第一个的,确保最先发出去的是连接请求(保证了第一个response是被ClientCnxnSocket#readConnectResult处理)


sendPing方法

这个就是个异步调用

    private void sendPing() {//ping命令,记录发出时间,生成请求,加入outgoingQueue待发送
            lastPingSentNs = System.nanoTime();
            RequestHeader h = new RequestHeader(-2, OpCode.ping);
            queuePacket(h, null, null, null, null, null, null, null, null);
        }

注意一点,在run方法会将outgoingQueue的内容发送出去,在ClientCnxnSocketNIO#doIO中,
ping命令的packet是没有进入pendingQueue的


startConnect方法
干的事情很简单

    1.根据hostProvider或者已经设置的读写服务器地址确定server 地址
    2.sasl相关处理
    3.调用clientCnxnSocket.connect

源码如下

    private void startConnect() throws IOException {//开始连接
            state = States.CONNECTING;

            InetSocketAddress addr;
            if (rwServerAddress != null) {
                addr = rwServerAddress;//有rwServerAddress 就设置
                rwServerAddress = null;
            } else {
                addr = hostProvider.next(1000);//没有就从服务器地址列表取出来一个
            }

            setName(getName().replaceAll("\\(.*\\)",
                    "(" + addr.getHostName() + ":" + addr.getPort() + ")"));//设置线程名字
            if (ZooKeeperSaslClient.isEnabled()) {//如果开启了sasl,这部分不清楚,忽略
                try {
                    String principalUserName = System.getProperty(
                            ZK_SASL_CLIENT_USERNAME, "zookeeper");
                    zooKeeperSaslClient =
                        new ZooKeeperSaslClient(
                                principalUserName+"/"+addr.getHostName());
                } catch (LoginException e) {
                    // An authentication error occurred when the SASL client tried to initialize:
                    // for Kerberos this means that the client failed to authenticate with the KDC.
                    // This is different from an authentication error that occurs during communication
                    // with the Zookeeper server, which is handled below.
                    LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
                      + "SASL authentication, if Zookeeper server allows it.");
                    eventThread.queueEvent(new WatchedEvent(
                      Watcher.Event.EventType.None,
                      Watcher.Event.KeeperState.AuthFailed, null));
                    saslLoginFailed = true;
                }
            }
            logStartConnect(addr);//log

            clientCnxnSocket.connect(addr);//socket连接
        }

run方法

这个方法很重要

  1.clientCnxnSocket相关初始化
  2.不断检测clientCnxnSocket是否和服务器处于连接状态,没有连接则进行连接
  3.检测是否超时:当处于连接状态时,检测是否读超时,当处于未连接状态时,检测是否连接超时
  4.不断的发送ping通知,服务器端每接收到ping请求,就会从当前时间重新计算session过期时间,所以当客户端按照一定时间间隔不断的发送ping请求,就能保证客户端的session不会过期
  5.如果当前是只读的话,不断去找有没有支持读写的server
  6.不断进行IO操作,发送请求队列中的请求和读取服务器端的响应数据
  7.!state.isAlive()时,进行相关清理工作

第1部分,clientCnxnSocket相关初始化

            clientCnxnSocket.introduce(this,sessionId);//clientCnxnSocket初始化
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = System.currentTimeMillis();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds

第2部分,不断检测clientCnxnSocket是否和服务器处于连接状态,没有连接则进行连接

        while (state.isAlive()) {
                try {
                    if (!clientCnxnSocket.isConnected()) {//如果clientCnxnSocket的SelectionKey为null
                        if(!isFirstConnect){//如果不是第一次连接就sleep一下
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
                        startConnect();//开始连接
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

第3部分,检测是否超时:当处于连接状态时,检测是否读超时,当处于未连接状态时,检测是否连接超时

    //检测是否超时,分为读超时和连接超时
                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                   LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    // An authentication error occurred during authentication with the Zookeeper Server.
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else {
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }

                            if (sendAuthEvent == true) {
                                eventThread.queueEvent(new WatchedEvent(
                                      Watcher.Event.EventType.None,
                                      authState,null));
                            }
                        }
                        //如果已经连接上,预计读超时时间 - 距离上次读已经过去的时间
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                        //如果没连接上,预计连接时间 - 上次读已经过去的时间
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    
                    if (to <= 0) {//代表读超时或连接超时
                        String warnInfo;
                        warnInfo = "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv()
                            + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId);
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }

第4部分,不断的发送ping通知

    if (state.isConnected()) {
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small 
                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                                ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        System.out.println("org.apache.zookeeper.ClientCnxn.SendThread.run readTimeout = " + readTimeout);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {//readTimeout已经过了近一半的时间,或者距离上次发送请求已过过了10s
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {//如果预计下次ping的时间 < 实际距离下次ping的时间
                                to = timeToNextPing;
                            }
                        }
                    }

第5部分,如果当前是只读的话,不断去找有没有支持读写的server

                // If we are in read-only mode, seek for read/write server
                    if (state == States.CONNECTEDREADONLY) {//如果是只读的话
                        long now = System.currentTimeMillis();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout =
                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }

第6部分,不断进行IO操作,发送请求队列中的请求和读取服务器端的响应数据

      clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);//在特定时间内,根据两个queue进行网络传输

这个看clientCnxnSocket内的源码,已经在之前讲过了

第7部分,!state.isAlive()时,进行相关清理工作

    //下面是state is not alive的情况
            cleanup();
            clientCnxnSocket.close();//关闭socket
            if (state.isAlive()) {//???什么时候会出现这种情况
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                        Event.KeeperState.Disconnected, null));
            }
            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                    "SendThread exited loop for session: 0x"
                           + Long.toHexString(getSessionId()));

这个地方有点不理解,为什么还会出现 if (state.isAlive()) 的情况


pingRwServer方法

这个方法是client连接了只读的server时,不断根据hostProvider找到一个可读写的server

      private void pingRwServer() throws RWServerFoundException {//找到读写server,更新rwServerAddress
            String result = null;
            InetSocketAddress addr = hostProvider.next(0);
            LOG.info("Checking server " + addr + " for being r/w." +
                    " Timeout " + pingRwTimeout);

            Socket sock = null;
            BufferedReader br = null;
            try {
                sock = new Socket(addr.getHostName(), addr.getPort());
                sock.setSoLinger(false, -1);
                sock.setSoTimeout(1000);
                sock.setTcpNoDelay(true);
                sock.getOutputStream().write("isro".getBytes());
                sock.getOutputStream().flush();
                sock.shutdownOutput();
                br = new BufferedReader(
                        new InputStreamReader(sock.getInputStream()));
                result = br.readLine();
            } catch (ConnectException e) {
                // ignore, this just means server is not up
            } catch (IOException e) {
                // some unexpected error, warn about it
                LOG.warn("Exception while seeking for r/w server " +
                        e.getMessage(), e);
            } finally {
                if (sock != null) {
                    try {
                        sock.close();
                    } catch (IOException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                if (br != null) {
                    try {
                        br.close();
                    } catch (IOException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
            }

            if ("rw".equals(result)) {
                pingRwTimeout = minPingRwTimeout;
                // save the found address so that it's used during the next
                // connection attempt
                rwServerAddress = addr;
                throw new RWServerFoundException("Majority server found at "
                        + addr.getHostName() + ":" + addr.getPort());
            }
        }

主要看最后的if条件就够了
注意的是,如果更新了rwServerAddress 会抛异常,run方法处理异常,会清理后进行重连,来让client重连到读写server上去

异常处理

onConnected方法

这个方法是收到了zk server的连接回复后的一些参数设置,以及zk state的状态改变

    void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {//读取server的connect response后,设置相关参数
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            if (negotiatedSessionTimeout <= 0) {
                state = States.CLOSED;

                eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.Expired, null));
                eventThread.queueEventOfDeath();

                String warnInfo;
                warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
                    + Long.toHexString(sessionId) + " has expired";
                LOG.warn(warnInfo);
                throw new SessionExpiredException(warnInfo);
            }
            if (!readOnly && isRO) {//如果client不允许只读,但是目前是只读
                LOG.error("Read/write client got connected to read-only server");
            }
            readTimeout = negotiatedSessionTimeout * 2 / 3;/
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();//更新hostProvider循环列表的index
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;//根据isRO设置state
            seenRwServerBefore |= !isRO;//是否见过读写server
            LOG.info("Session establishment complete on server "
                    + clientCnxnSocket.getRemoteSocketAddress()
                    + ", sessionid = 0x" + Long.toHexString(sessionId)
                    + ", negotiated timeout = " + negotiatedSessionTimeout
                    + (isRO ? " (READ-ONLY mode)" : ""));
            KeeperState eventState = (isRO) ?
                    KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    eventState, null));//加入watcherEvent
        }

思考

run方法主要干的几件事情

  1.clientCnxnSocket相关初始化
  2.不断检测clientCnxnSocket是否和服务器处于连接状态,没有连接则进行连接
  3.检测是否超时:当处于连接状态时,检测是否读超时,当处于未连接状态时,检测是否连接超时
  4.不断的发送ping通知,服务器端每接收到ping请求,就会从当前时间重新计算session过期时间,所以当客户端按照一定时间间隔不断的发送ping请求,就能保证客户端的session不会过期
  5.如果当前是只读的话,不断去找有没有支持读写的server
  6.不断进行IO操作,发送请求队列中的请求和读取服务器端的响应数据
  7.!state.isAlive()时,进行相关清理工作

主要连接primeConnection干了什么,如果保证第一个发出去的请求是connect请求

  根据之前是否连接过设置sessId以及生成ConnectRequest
  根据disableAutoWatchReset将已有的watches和authData以及放入outgoingQueue准备发送
  允许clientCnxnSocket可读写,表示和server准备IO

队列里面第一个就是connect请求

sessId和sessionId

根据seenRwServerBefore判断
没有连接过 sessId就是0
有连接过 则sessId就是上次连接的sessionId
用sessId和全局的sessionPasswd去连接

哪些回复是不存在于pendingQueue当中的

auth和ping以及正在处理的sasl没有加入pendingQueue,触发的watch也没有在pendingQueue中

startConnect和primeConnection区别是什么

两者的区别在于NIO的SelectionKey
前者限于connect和accept
后者已经连接完成,开始了write和read,准备开始和zk server完成socket io

pingRwServer和sendPing两个函数区别是什么

前者是目前client只连接了只读的zk server,会不断地调用,更新rwServerAddress
后者是不论client处于什么模式,都要进行的心跳验证

clientCnxnSocket.isConnected()和isFirstConnect为什么有这两个参数

isFirstConnect代表client 第一次连接,如果不是第一次连接,就sleep一段时间,然后从hostProvider选出下一个server addr

大体连接过程

首先与ZooKeeper服务器建立连接,有两层连接要建立。

1.客户端与服务器端的TCP连接
2.在TCP连接的基础上建立session关联

建立TCP连接之后,客户端发送ConnectRequest请求,申请建立session关联,此时服务器端会为该客户端分配sessionId和密码,同时开启对该session是否超时的检测。

当在sessionTimeout时间内,即还未超时,此时TCP连接断开,服务器端仍然认为该sessionId处于存活状态。
此时,客户端会选择下一个ZooKeeper服务器地址进行TCP连接建立,TCP连接建立完成后,拿着之前的sessionId和密码发送ConnectRequest请求,如果还未到该sessionId的超时时间,则表示自动重连成功。
对客户端用户是透明的,一切都在背后默默执行,ZooKeeper对象是有效的。

如果重新建立TCP连接后,已经达到该sessionId的超时时间了(服务器端就会清理与该sessionId相关的数据),则返回给客户端的sessionTimeout时间为0,sessionid为0,密码为空字节数组。
客户端接收到该数据后,会判断协商后的sessionTimeout时间是否小于等于0,如果小于等于0,则使用eventThread线程先发出一个KeeperState.Expired事件,通知相应的Watcher。
然后结束EventThread线程的循环,开始走向结束。此时ZooKeeper对象就是无效的了,必须要重新new一个新的ZooKeeper对象,分配新的sessionId了。

client一开始连接到了ReadOnly的server,后续找到rwServerAddress如何完成的重新连接

ClientCnxn.SendThread#run接收到RWServerFoundException异常,然后调用了


相关清理操作

cleanUp调用后使得ClientCnxnSocketNIO#isConnected为false
因此ClientCnxn.SendThread#run方法又进入了连接的操作

                      if (!clientCnxnSocket.isConnected()) {//如果clientCnxnSocket的SelectionKey为null
                        if(!isFirstConnect){//如果不是第一次连接就sleep一下
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
                        startConnect();//开始连接
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

问题

SendThread#run的疑惑

为什么while(state.isAlive()) break出去之后
还有 if (state.isAlive())

备注

这是后续看server的网络IO代码后需要搞清楚的

怎么保证server的处理和发送顺序
什么时候server是只可读的,什么时候是读写的
server如何分配的sessionId和pwd
server是如何区分连接请求的不同sessId的,后续待看

refer

https://my.oschina.net/pingpangkuangmo/blog/486780 run方法以及大体过程
http://www.cnblogs.com/leesf456/p/6098255.html 概念
http://www.voidcn.com/blog/aBOUNTWINTER/article/p-6400711.html 概念
http://shift-alt-ctrl.iteye.com/blog/1846971 RwServer,seenRwServerBefore读写server相关说明
《paxos到zk》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容