概述
SendThread是客户端ClientCnxn内部的一个核心I/O调度线程,用于管理客户端与服务端之间的所有网络I/O操作,在Zookeeper客户端实际运行中,SendThread的作用如下:
1. 维护了客户端与服务端之间的会话生命周期(通过一定周期频率内向服务端发送PING包检测心跳),如果会话周期内客户端与服务端出现TCP连接断开,那么就会自动且透明地完成重连操作。
2. 管理了客户端所有的请求发送和响应接收操作,其将上层客户端API操作转换成相应的请求协议并发送到服务端,并完成对同步调用的返回和异步调用的回调。
3. 将来自服务端的事件传递给EventThread去处理。
源码
属性
意义如下
字段 | 意义 |
---|---|
lastPingSentNs | 上一次ping的 nano time |
clientCnxnSocket | 通信层ClientCnxnSocket |
r | 随机数生成器 |
isFirstConnect | 是否第一次connect |
rwServerAddress | 读写server地址 |
minPingRwTimeout | 最短ping 读写server的 timeout时间 |
maxPingRwTimeout | 最长ping 读写server的 timeout时间 |
pingRwTimeout | 默认ping 读写server的 timeout时间 |
saslLoginFailed | sasl登录失败 |
RETRY_CONN_MSG | 日志 |
函数
简要介绍如下
方法 | 作用 | 备注 |
---|---|---|
SendThread | 构造函数 | |
readResponse | 读取server的回复,进行outgoingQueue以及pendingQueue的相关处理,事件触发等等 | 重要 |
getZkState | 或者client状态 | |
getClientCnxnSocket | 获取通信层clientCnxnSocket | |
primeConnection | 主要连接函数,用于将watches和authData传给server,允许clientCnxnSocket可读写 | 重要 |
prependChroot | 根据clientPath以及chrootPath得到serverPath | |
sendPing | ping命令,记录发出时间,生成请求,加入outgoingQueue待发送 | 重要 |
startConnect | 开始连接,主要是和server的socket完成connect和accept | 重要 |
logStartConnect | log | |
run | 线程方法,完成了连接验证,超时检测,ping命令,以及网络IO | 重要 |
pingRwServer | ping读写server | 重要 |
cleanup | socket清理以及通知两个queue失去连接 以及 清理两个队列 | |
onConnected | 读取server的connect response后,设置相关参数 | 重要 |
close | 关闭socket | |
testableCloseSocket | ||
clientTunneledAuthenticationInProgress | 是否在验证sasl | |
sendPacket | 发送packet |
将几个重要的函数进行源码讲解
readResponse方法
可以拆成几部分,分别完成
1.处理ping命令,AuthPacket,WatcherEvent,验证sasl并返回
2.从pendingQueue取出packet进行验证(有顺序保证)
3.调用finishPacket完成AsyncCallBack处理以及watcher的注册
第一部分代码如下
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");//反序列化出 回复头
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) ); //加入eventThread
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {//-1代表通知类型 即WatcherEvent
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");//反序列化WatcherEvent
// convert from a server path to a client path
if (chrootPath != null) {//把serverPath转化成clientPath
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);//WatcherEvent还原成WatchedEvent
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent( we );//加入eventThread
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (clientTunneledAuthenticationInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia,"token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
可以看出都是直接return的
第2,3部分代码如下
Packet packet;//auth和ping以及正在处理的sasl没有加入pendingQueue,触发的watch也没有在pendingQueue中(是server主动发过来的),而AsyncCallBack在(见finishPacket)
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();//得到了response,从pendingQueue中移除
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
packet.replyHeader.setXid(replyHdr.getXid());//设置replyHeader
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
这里可以看出
auth和ping以及正在处理的sasl没有加入pendingQueue,触发的watch也没有在pendingQueue中(是server主动发过来的),而AsyncCallBack在pendingQueue中(见finishPacket)
primeConnection方法
primeConnection主要完成
根据之前是否连接过设置sessId以及生成ConnectRequest
根据disableAutoWatchReset将已有的watches和authData以及放入outgoingQueue准备发送
允许clientCnxnSocket可读写,表示和server准备IO
代码如下
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", initiating session");
isFirstConnect = false;
long sessId = (seenRwServerBefore) ? sessionId : 0;//如果之前见过读写server就设置sessionId,否则默认0
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
sessionTimeout, sessId, sessionPasswd);//生成ConnectRequest
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
// TODO: here we have the only remaining use of zooKeeper in
// this class. It's to be eliminated!
if (!disableAutoWatchReset) {
List<String> dataWatches = zooKeeper.getDataWatches();
List<String> existWatches = zooKeeper.getExistWatches();
List<String> childWatches = zooKeeper.getChildWatches();
if (!dataWatches.isEmpty()
|| !existWatches.isEmpty() || !childWatches.isEmpty()) {
Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();//根据chrootPath转化成serverPath
Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
long setWatchesLastZxid = lastZxid;
while (dataWatchesIter.hasNext()
|| existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
List<String> dataWatchesBatch = new ArrayList<String>();
List<String> existWatchesBatch = new ArrayList<String>();
List<String> childWatchesBatch = new ArrayList<String>();
int batchLength = 0;
// Note, we may exceed our max length by a bit when we add the last
// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batchLength < SET_WATCHES_MAX_LENGTH) {//限定长度
final String watch;
if (dataWatchesIter.hasNext()) {
watch = dataWatchesIter.next();
dataWatchesBatch.add(watch);
} else if (existWatchesIter.hasNext()) {
watch = existWatchesIter.next();
existWatchesBatch.add(watch);
} else if (childWatchesIter.hasNext()) {
watch = childWatchesIter.next();
childWatchesBatch.add(watch);
} else {
break;
}
batchLength += watch.length();
}
SetWatches sw = new SetWatches(setWatchesLastZxid,
dataWatchesBatch,
existWatchesBatch,
childWatchesBatch);//设置watches
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setWatches);
h.setXid(-8);
Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
outgoingQueue.addFirst(packet);//加入发送队列
}
}
}
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));//authInfo加入发送队列
}
outgoingQueue.addFirst(new Packet(null, null, conReq,
null, null, readOnly));//ConnectRequest确保在发送队列的第一个
}
clientCnxnSocket.enableReadWriteOnly();//开启读写,这样outgoingQueue内容就可以发出去了
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request sent on "
+ clientCnxnSocket.getRemoteSocketAddress());
}
}
主要注意:
1.sessId:如果之前连接过,那么重连用之前的sessionId,否则默认0,重连参见ClientCnxn.SendThread#startConnect的调用
2.什么时候连接会有watches需要去注册?重连且disableAutoWatchReset为false的时候
3.ConnectRequest是放在outgoingQueue第一个的,确保最先发出去的是连接请求(保证了第一个response是被ClientCnxnSocket#readConnectResult处理)
sendPing方法
这个就是个异步调用
private void sendPing() {//ping命令,记录发出时间,生成请求,加入outgoingQueue待发送
lastPingSentNs = System.nanoTime();
RequestHeader h = new RequestHeader(-2, OpCode.ping);
queuePacket(h, null, null, null, null, null, null, null, null);
}
注意一点,在run方法会将outgoingQueue的内容发送出去,在ClientCnxnSocketNIO#doIO中,
ping命令的packet是没有进入pendingQueue的
startConnect方法
干的事情很简单
1.根据hostProvider或者已经设置的读写服务器地址确定server 地址
2.sasl相关处理
3.调用clientCnxnSocket.connect
源码如下
private void startConnect() throws IOException {//开始连接
state = States.CONNECTING;
InetSocketAddress addr;
if (rwServerAddress != null) {
addr = rwServerAddress;//有rwServerAddress 就设置
rwServerAddress = null;
} else {
addr = hostProvider.next(1000);//没有就从服务器地址列表取出来一个
}
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));//设置线程名字
if (ZooKeeperSaslClient.isEnabled()) {//如果开启了sasl,这部分不清楚,忽略
try {
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =
new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostName());
} catch (LoginException e) {
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);//log
clientCnxnSocket.connect(addr);//socket连接
}
run方法
这个方法很重要
1.clientCnxnSocket相关初始化
2.不断检测clientCnxnSocket是否和服务器处于连接状态,没有连接则进行连接
3.检测是否超时:当处于连接状态时,检测是否读超时,当处于未连接状态时,检测是否连接超时
4.不断的发送ping通知,服务器端每接收到ping请求,就会从当前时间重新计算session过期时间,所以当客户端按照一定时间间隔不断的发送ping请求,就能保证客户端的session不会过期
5.如果当前是只读的话,不断去找有没有支持读写的server
6.不断进行IO操作,发送请求队列中的请求和读取服务器端的响应数据
7.!state.isAlive()时,进行相关清理工作
第1部分,clientCnxnSocket相关初始化
clientCnxnSocket.introduce(this,sessionId);//clientCnxnSocket初始化
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = System.currentTimeMillis();
final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
第2部分,不断检测clientCnxnSocket是否和服务器处于连接状态,没有连接则进行连接
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {//如果clientCnxnSocket的SelectionKey为null
if(!isFirstConnect){//如果不是第一次连接就sleep一下
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
startConnect();//开始连接
clientCnxnSocket.updateLastSendAndHeard();
}
第3部分,检测是否超时:当处于连接状态时,检测是否读超时,当处于未连接状态时,检测是否连接超时
//检测是否超时,分为读超时和连接超时
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
//如果已经连接上,预计读超时时间 - 距离上次读已经过去的时间
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
//如果没连接上,预计连接时间 - 上次读已经过去的时间
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {//代表读超时或连接超时
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
第4部分,不断的发送ping通知
if (state.isConnected()) {
//1000(1 second) is to prevent race condition missing to send the second ping
//also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
System.out.println("org.apache.zookeeper.ClientCnxn.SendThread.run readTimeout = " + readTimeout);
//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {//readTimeout已经过了近一半的时间,或者距离上次发送请求已过过了10s
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {//如果预计下次ping的时间 < 实际距离下次ping的时间
to = timeToNextPing;
}
}
}
第5部分,如果当前是只读的话,不断去找有没有支持读写的server
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {//如果是只读的话
long now = System.currentTimeMillis();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
第6部分,不断进行IO操作,发送请求队列中的请求和读取服务器端的响应数据
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);//在特定时间内,根据两个queue进行网络传输
这个看clientCnxnSocket内的源码,已经在之前讲过了
第7部分,!state.isAlive()时,进行相关清理工作
//下面是state is not alive的情况
cleanup();
clientCnxnSocket.close();//关闭socket
if (state.isAlive()) {//???什么时候会出现这种情况
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
这个地方有点不理解,为什么还会出现 if (state.isAlive()) 的情况
pingRwServer方法
这个方法是client连接了只读的server时,不断根据hostProvider找到一个可读写的server
private void pingRwServer() throws RWServerFoundException {//找到读写server,更新rwServerAddress
String result = null;
InetSocketAddress addr = hostProvider.next(0);
LOG.info("Checking server " + addr + " for being r/w." +
" Timeout " + pingRwTimeout);
Socket sock = null;
BufferedReader br = null;
try {
sock = new Socket(addr.getHostName(), addr.getPort());
sock.setSoLinger(false, -1);
sock.setSoTimeout(1000);
sock.setTcpNoDelay(true);
sock.getOutputStream().write("isro".getBytes());
sock.getOutputStream().flush();
sock.shutdownOutput();
br = new BufferedReader(
new InputStreamReader(sock.getInputStream()));
result = br.readLine();
} catch (ConnectException e) {
// ignore, this just means server is not up
} catch (IOException e) {
// some unexpected error, warn about it
LOG.warn("Exception while seeking for r/w server " +
e.getMessage(), e);
} finally {
if (sock != null) {
try {
sock.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
if (br != null) {
try {
br.close();
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}
}
}
if ("rw".equals(result)) {
pingRwTimeout = minPingRwTimeout;
// save the found address so that it's used during the next
// connection attempt
rwServerAddress = addr;
throw new RWServerFoundException("Majority server found at "
+ addr.getHostName() + ":" + addr.getPort());
}
}
主要看最后的if条件就够了
注意的是,如果更新了rwServerAddress 会抛异常,run方法处理异常,会清理后进行重连,来让client重连到读写server上去
onConnected方法
这个方法是收到了zk server的连接回复后的一些参数设置,以及zk state的状态改变
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {//读取server的connect response后,设置相关参数
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
String warnInfo;
warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired";
LOG.warn(warnInfo);
throw new SessionExpiredException(warnInfo);
}
if (!readOnly && isRO) {//如果client不允许只读,但是目前是只读
LOG.error("Read/write client got connected to read-only server");
}
readTimeout = negotiatedSessionTimeout * 2 / 3;/
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();//更新hostProvider循环列表的index
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
state = (isRO) ?
States.CONNECTEDREADONLY : States.CONNECTED;//根据isRO设置state
seenRwServerBefore |= !isRO;//是否见过读写server
LOG.info("Session establishment complete on server "
+ clientCnxnSocket.getRemoteSocketAddress()
+ ", sessionid = 0x" + Long.toHexString(sessionId)
+ ", negotiated timeout = " + negotiatedSessionTimeout
+ (isRO ? " (READ-ONLY mode)" : ""));
KeeperState eventState = (isRO) ?
KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
eventState, null));//加入watcherEvent
}
思考
run方法主要干的几件事情
1.clientCnxnSocket相关初始化
2.不断检测clientCnxnSocket是否和服务器处于连接状态,没有连接则进行连接
3.检测是否超时:当处于连接状态时,检测是否读超时,当处于未连接状态时,检测是否连接超时
4.不断的发送ping通知,服务器端每接收到ping请求,就会从当前时间重新计算session过期时间,所以当客户端按照一定时间间隔不断的发送ping请求,就能保证客户端的session不会过期
5.如果当前是只读的话,不断去找有没有支持读写的server
6.不断进行IO操作,发送请求队列中的请求和读取服务器端的响应数据
7.!state.isAlive()时,进行相关清理工作
主要连接primeConnection干了什么,如果保证第一个发出去的请求是connect请求
根据之前是否连接过设置sessId以及生成ConnectRequest
根据disableAutoWatchReset将已有的watches和authData以及放入outgoingQueue准备发送
允许clientCnxnSocket可读写,表示和server准备IO
队列里面第一个就是connect请求
sessId和sessionId
根据seenRwServerBefore判断
没有连接过 sessId就是0
有连接过 则sessId就是上次连接的sessionId
用sessId和全局的sessionPasswd去连接
哪些回复是不存在于pendingQueue当中的
auth和ping以及正在处理的sasl没有加入pendingQueue,触发的watch也没有在pendingQueue中
startConnect和primeConnection区别是什么
两者的区别在于NIO的SelectionKey
前者限于connect和accept
后者已经连接完成,开始了write和read,准备开始和zk server完成socket io
pingRwServer和sendPing两个函数区别是什么
前者是目前client只连接了只读的zk server,会不断地调用,更新rwServerAddress
后者是不论client处于什么模式,都要进行的心跳验证
clientCnxnSocket.isConnected()和isFirstConnect为什么有这两个参数
isFirstConnect代表client 第一次连接,如果不是第一次连接,就sleep一段时间,然后从hostProvider选出下一个server addr
大体连接过程
首先与ZooKeeper服务器建立连接,有两层连接要建立。
1.客户端与服务器端的TCP连接
2.在TCP连接的基础上建立session关联
建立TCP连接之后,客户端发送ConnectRequest请求,申请建立session关联,此时服务器端会为该客户端分配sessionId和密码,同时开启对该session是否超时的检测。
当在sessionTimeout时间内,即还未超时,此时TCP连接断开,服务器端仍然认为该sessionId处于存活状态。
此时,客户端会选择下一个ZooKeeper服务器地址进行TCP连接建立,TCP连接建立完成后,拿着之前的sessionId和密码发送ConnectRequest请求,如果还未到该sessionId的超时时间,则表示自动重连成功。
对客户端用户是透明的,一切都在背后默默执行,ZooKeeper对象是有效的。
如果重新建立TCP连接后,已经达到该sessionId的超时时间了(服务器端就会清理与该sessionId相关的数据),则返回给客户端的sessionTimeout时间为0,sessionid为0,密码为空字节数组。
客户端接收到该数据后,会判断协商后的sessionTimeout时间是否小于等于0,如果小于等于0,则使用eventThread线程先发出一个KeeperState.Expired事件,通知相应的Watcher。
然后结束EventThread线程的循环,开始走向结束。此时ZooKeeper对象就是无效的了,必须要重新new一个新的ZooKeeper对象,分配新的sessionId了。
client一开始连接到了ReadOnly的server,后续找到rwServerAddress如何完成的重新连接
ClientCnxn.SendThread#run接收到RWServerFoundException异常,然后调用了
cleanUp调用后使得ClientCnxnSocketNIO#isConnected为false
因此ClientCnxn.SendThread#run方法又进入了连接的操作
if (!clientCnxnSocket.isConnected()) {//如果clientCnxnSocket的SelectionKey为null
if(!isFirstConnect){//如果不是第一次连接就sleep一下
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// don't re-establish connection if we are closing
if (closing || !state.isAlive()) {
break;
}
startConnect();//开始连接
clientCnxnSocket.updateLastSendAndHeard();
}
问题
SendThread#run的疑惑
为什么while(state.isAlive()) break出去之后
还有 if (state.isAlive())
备注
这是后续看server的网络IO代码后需要搞清楚的
怎么保证server的处理和发送顺序
什么时候server是只可读的,什么时候是读写的
server如何分配的sessionId和pwd
server是如何区分连接请求的不同sessId的,后续待看
refer
https://my.oschina.net/pingpangkuangmo/blog/486780 run方法以及大体过程
http://www.cnblogs.com/leesf456/p/6098255.html 概念
http://www.voidcn.com/blog/aBOUNTWINTER/article/p-6400711.html 概念
http://shift-alt-ctrl.iteye.com/blog/1846971 RwServer,seenRwServerBefore读写server相关说明
《paxos到zk》