Zookeeper之Session源码分析

Zookeeper之Session会话。
在服务端通过SessionTrackerImpl和ExpiryQueue来保存Session会话信息


image.png
SessionTrackerImpl有以下属性:

sessionsById:用来存储ConcurrentHashMap<Long, SessionImpl> {sessionId:SessionImpl}
sessionExpiryQueue:ExpiryQueue<SessionImpl>失效队列
sessionsWithTimeout: ConcurrentMap<Long, Integer>存储的是{sessionId: sessionTimeout}
nextSessionId: 下一个sessionId

ExpiryQueue失效队列有以下属性:

1、elemMap:ConcurrentHashMap<E, Long> 存储的是{SessionImpl: newExpiryTime} Session实例对象,失效时间
2、expiryMap:ConcurrentHashMap<Long, Set<E>>存储的是{time: set<SessionImp>}
失效时间,当前失效时间的Session对象集合
3、nextExpirationTime:下一次失效时间
{(System.nanoTime() / 1000000)/expirationInterval+1}*expirationInterval
当前系统时间毫秒值ms=System.nanoTime() / 1000000。
nextExpirationTime=当前系统时间毫秒值+expirationInterval(失效间隔)
4、expirationInterval:失效间隔,默认是10s,可以通过sessionlessCnxnTimeout修改。即是通过配置文件的tickTime修改

Session的创建

1、ConnectRequest发送连接请求
在客户端的SendThread线程创建Socket连接后,会尝试连接,如果连接成功成功或其他时机都会调用到primeConnection方法用来发送ConnectRequest连接请求,这里便是设置session会话
org.apache.zookeeper.ClientCnxn.SendThread#primeConnection
创建ConnectRequest对象


image.png

放入到outgoingQueue队列等待发送


image.png

2、服务端读取请求数据
在服务端经过org.apache.zookeeper.server.NIOServerCnxn#doIO方法会调用到读取客户端请求的readPayload
org.apache.zookeeper.server.NIOServerCnxn#readPayload
①、通过initialized属性判断是否是第一次连接
此时initialized=false,表示第一次连接 需要创建Session(createSession)
image.png

org.apache.zookeeper.server.NIOServerCnxn#readConnectRequest
只有在处理完连接请求之后才会把initialized设置为true,才可以处理客户端其他命令


image.png

②、processConnectRequest处理连接请求
org.apache.zookeeper.server.ZooKeeperServer#processConnectRequest
第一次从请求中获取的sessionId=0
image.png

根据两端的sessionTimeout来设置ServerCnxn的sessionTimeout是较大的一个
image.png

image.png

③、createSession创建Session请求
org.apache.zookeeper.server.ZooKeeperServer#createSession
创建一个OpCode.createSession的请求并提交
image.png

org.apache.zookeeper.server.SessionTrackerImpl#createSession
image.png

④、createSession请求经过请求处理链(RequestProcessor)

ZooKeeperServer#submitRequest后便经过RequestProcessor请求处理链处理连接请求
a、在PrepRequestProcessor中设置Reqeust的txn
org.apache.zookeeper.server.PrepRequestProcessor#pRequestHelper


image.png

org.apache.zookeeper.server.PrepRequestProcessor#pRequest2Txn
设置txn
image.png

b、在SyncRequestProcessor对txn(创建session的操作)进行持久化
c、在FinalRequestProcessor会对Session进行提交,其实就是把Session的ID和Timeout存到sessionsWithTimeout中去
org.apache.zookeeper.server.FinalRequestProcessor#processRequest
image.png

org.apache.zookeeper.server.FinalRequestProcessor#applyRequest
image.png

org.apache.zookeeper.server.ZooKeeperServer#processTxn(org.apache.zookeeper.server.Request)
image.png

org.apache.zookeeper.server.ZooKeeperServer#processTxnForSessionEvents
image.png

org.apache.zookeeper.server.SessionTrackerImpl#commitSession
到这里便把客户端连接session放入到SessionTrackerImpl#sessionsWithTimeout中,ConcurrentMap<Long, Integer>以{sessionId:timeout}存在
image.png

后续处理,发送响应
org.apache.zookeeper.server.FinalRequestProcessor#processRequest
image.png

org.apache.zookeeper.server.FinalRequestProcessor#updateStats
image.png

org.apache.zookeeper.server.ZooKeeperServer#finishSessionInit
image.png

image.png

Session的刷新

服务端无论接受什么请求命令(增删或ping等请求)都会更新Session的过期时间
这里是RequestThrottler请求限流器内处理ping请求为例
1、RequestThrottler#run
org.apache.zookeeper.server.RequestThrottler#run


image.png

2、submitRequestNow
org.apache.zookeeper.server.ZooKeeperServer#submitRequestNow
此时还没有交给第一个请求处理器处理


image.png

3、touchSession
org.apache.zookeeper.server.SessionTrackerImpl#touchSession
image.png

4、updateSessionExpiry
org.apache.zookeeper.server.SessionTrackerImpl#updateSessionExpiry

更新sessionExpiryQueue失效队列中的失效时间


image.png

org.apache.zookeeper.server.ExpiryQueue#update
获取新的失效时间newExpiryTime
image.png

image.png

image.png

这里就是根据now和timeout不断获取新的失效时间newExpiryTime,并更新elemMap集合和expiryMap过期时间的集合(需要了解存储的具体数据值是什么)。

Session过期

SessionTrackerImpl是一个线程类,继承了ZooKeeperCriticalThread
org.apache.zookeeper.server.SessionTrackerImpl#run

image.png

org.apache.zookeeper.server.ExpiryQueue#getWaitTime
如果expirationTime>now,表示下一个失效时间在当前时间后面,只需要休眠到达失效时间点。
假如当前时间是 20:00:00 失效间隔是2s
那么失效时间点就是:20:00:02 20:00:04 20:00:06.......
image.png

org.apache.zookeeper.server.ExpiryQueue#poll
这里就是获取键是失效时间的客户端实现类的集合,这就是expiryMap集合类的意义。
image.png

Session关闭

从sessionExpiryQueue获取到失效的实现类SessionImpl,有哪些客户端已经关闭了或其他原因达到失效点。其实这里的实现类是从expiryMap集合中获取的
1、run
org.apache.zookeeper.server.SessionTrackerImpl#run


image.png

2、setSessionClosing
当SessionImpl不为空
org.apache.zookeeper.server.SessionTrackerImpl#setSessionClosing
把isClosing 关闭状态设为true


image.png

3、expire
org.apache.zookeeper.server.ZooKeeperServer#expire
关闭当前sessionId
image.png

4、close
org.apache.zookeeper.server.ZooKeeperServer#close
提交了一个closeSession的请求


image.png

5、这里也会经过请求处理链RequestProcessor
①、PrepRequestProcessor
org.apache.zookeeper.server.PrepRequestProcessor#pRequest2Txn
获取当前客户端的临时节点ephemerals,并删除
image.png

image.png

image.png

②、移除sessionsById、sessionsWithTimeout和sessionExpiryQueue
a、processRequest
org.apache.zookeeper.server.FinalRequestProcessor#processRequest
image.png

b、applyRequest
org.apache.zookeeper.server.FinalRequestProcessor#applyRequest
image.png

c、processTxn
org.apache.zookeeper.server.ZooKeeperServer#processTxn


image.png

d、processTxnForSessionEvents
org.apache.zookeeper.server.ZooKeeperServer#processTxnForSessionEvents
image.png

e、removeSession
org.apache.zookeeper.server.SessionTrackerImpl#removeSession
image.png

Session关闭就是提交关闭请求,删除相应缓存。

Session的恢复

由于在服务端打快照时,会对sessionsWithTimeout进行持久化。启动时也会加载这个集合的数据。
1、sessionsWithTimeout持久化
org.apache.zookeeper.server.SyncRequestProcessor#run
通过ZooKeeperThread线程对快照持久化


image.png

org.apache.zookeeper.server.ZooKeeperServer#takeSnapshot()


image.png

image.png

2、启动加载sessionsWithTimeout
怎么加载是FileTxnSnapLog工具类序列化的问题
org.apache.zookeeper.server.NIOServerCnxnFactory#startup
image.png

org.apache.zookeeper.server.ZooKeeperServer#startdata


image.png

org.apache.zookeeper.server.ZooKeeperServer#loadData
image.png

org.apache.zookeeper.server.ZKDatabase#loadDataBase
image.png

org.apache.zookeeper.server.persistence.FileTxnSnapLog#restore
image.png

3、重连
在处理连接请求processConnectRequest时,如果得到的sessionId不等于0则会重连。与创建createSession对应。
org.apache.zookeeper.server.ZooKeeperServer#processConnectRequest
image.png

总计:

对于Session会话,是通过服务端SessionTrackerImpl(会话跟踪实现类)来保存会话数据。提供会话类的创建、刷新、删除(过期)、关闭、恢复等流程。Session的实现也是通过请求的提交。提供了持久化和初始化的加载,所以Session也提供了恢复这样的功能。如果需要了解细节,则需要清楚SessionTrackerImpl的属性值保存了怎么样的数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
禁止转载,如需转载请通过简信或评论联系作者。