在一次会话的创建过程中,需要客户端首先发送创建会话请求
,服务端集群创建会话成功后会将响应
发送给客户端。
客户端会话请求
在zookeeper源码分析(2)-客户端启动流程中我们分析了客户端发送的第一个请求就是会话创建请求。对于客户端,请求的通信内容都是由ClientCnxn.Packet.createBB
构造
static class Packet {
//请求头,只有创建session的过程不带这个内容
RequestHeader requestHeader;
//响应头,当收到服务端的响应后,根据响应内容构建该变量
ReplyHeader replyHeader;
//请求内容
Record request;
//响应内容
Record response;
//实际发送请求的通信内容
ByteBuffer bb;
/** Client's view of the path (may differ due to chroot) **/
String clientPath;
/** Servers's view of the path (may differ due to chroot) **/
String serverPath;
//请求是否完成
boolean finished;
//请求完成时的异步回调函数
AsyncCallback cb;
Object ctx;
//watch注册器
WatchRegistration watchRegistration;
//客户端是否仅从服务端读取数据
public boolean readOnly;
//构建请求的发送内容
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
//1.先写入数据总长度len
boa.writeInt(-1, "len"); // We'll fill this in later
//2.如果请求头requestHeader不为null,则写入请求头的序列化内容
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
//3.写入请求内容request的序列化内容
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
//会话创建请求的话写入readOnly
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
//4.重新写入 "len"的长度
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
}
实际上会话创建请求的Packet构造为:
//sendThread.primeConnection中
//protocolVersion:协议版本,默认为0
//lastZxidSeen:客户端认为的服务端最大zxid,lastZxid
//timeOut:会话超时时间,sessionTimeout
//sessionId:此时为0
//passwd:会话密码,此时也没有值
ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd);
//请求体为conReq
Packet packet = new Packet(null, null, conReq, null, null, readOnly);
所以实际的通信内容也就是len + connectRequest(protocolVersion+lastZxidSeen+timeOut+sessionId+passwd) + readOnly
发送完这个连接请求之后客户端会等待服务端的响应数据,反序列化响应数据后重新设置sessionId等,完成会话创建。
服务端会话创建过程
首先,会话创建一般是事务请求,主要可分为请求接收
,会话创建
,预处理
,事务处理
,事务应用
和会话响应
6个阶段。而客户端选择连接的服务端可能是Leader,Follewer或Observer。Follewer或Observer接收到事务请求后会将请求交给Leader处理。此处仅分析连接服务端是Follewer
的情况。
在此之前,需要先了解集群服务器的请求处理链的初始化过程,可参考zookeeper源码分析(7)-服务器请求处理链的初始化
会话创建服务端流程如下:
Follewer接收请求
1.I/O层接收客户端请求
当刚接收客户端的连接请求时,会针对客户端初始化一个
NIOServerCnxn
实例,负责来自该客户端的所有请求。当服务端监听到来自客户端通道的读请求时,会最终调用到NIOServerCnxn.doIO
处理来自该客户端的读写请求。NIOServerCnxn.doIO
void doIO(SelectionKey k) throws InterruptedException {
try {
if (isSocketOpen() == false) {
LOG.warn("trying to do i/o on a null socket for session:0x"
+ Long.toHexString(sessionId));
return;
}
if (k.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) { // start of next request
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
//处理读数据
if (isPayload) { // not the case for 4letterword
readPayload();
}
else {
// four letter words take care
// need not do anything else
return;
}
}
}
....................省略写请求处理和异常处理..................
}
可以看到会将通道数据读到incomingBuffer中,如果不是四字命令的请求会调用readPayload
方法。
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
}
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
packetReceived();
incomingBuffer.flip();
if (!initialized) {
readConnectRequest();
} else {
//客户端非连接请求
readRequest();
}
lenBuffer.clear();
//重置incomingBuffer ,用来接收下一个读数据
incomingBuffer = lenBuffer;
}
}
如果NIOServerCnxn尚未被初始化!initialized
,则说明此时的请求就是第一个连接请求ConnectRequest
。
2.反序列化ConnectRequest
NIOServerCnxn.readConnectRequest
private void readConnectRequest() throws IOException, InterruptedException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
zkServer.processConnectRequest(this, incomingBuffer);
//初始化标志设置为true
initialized = true;
}
调用 zkServer.processConnectRequest
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
boolean readOnly = false;
try {
readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
// this is ok -- just a packet from an old client which
// doesn't contain readOnly field
LOG.warn("Connection request from old client "
+ cnxn.getRemoteSocketAddress()
+ "; will be dropped if server is in r-o mode");
}
if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client "
+ cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg);
}
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
LOG.info(msg);
throw new CloseRequestException(msg);
}
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the
// session is setup
cnxn.disableRecv();
long sessionId = connReq.getSessionId();
if (sessionId == 0) {
//客户端第一次连接
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);
} else {
//客户端重连
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId);
}
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}
}
主要流程为:
1.将客户端序列化数据反序列化为 ConnectRequest connReq
对象
2.判断服务端是否以ReadOnly模式启动,此时将不能处理写相关请求
3.判断客户端客户端zxid是否比服务端大,此时将抛异常
4.校验会话过期时间sessionTimeout
,使其落在minSessionTimeout ~maxSessionTimeout 之间
3.根据sessionId 是否大于0判断客户端是第一次连接还是重练,第一次连接sessionId ==0,此时需要创建Session
createSession
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
if (passwd == null) {
// Possible since it's just deserialized from a packet on the wire.
passwd = new byte[0];
}
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
setLocalSessionFlag(si);
submitRequest(si);
return sessionId;
}
- 生成sessionId的方法为
sessionTracker.createSession(timeout)
。在每个服务器启动时,都会初始化一个会话管理器sessionTracker,对于Follewer服务器而言就是LearnerSessionTracker
,同时也会初始化当前服务器的sessionId(基准sessionId),以后每创建一个客户端连接,它的sessionId只需要在基准sessionId的基础上递增就可以。
由于sessionId是zookeeper会话的重要标识,必须保持全局唯一。它的初始化算法为:
SessionTrackerImpl.initializeNextSession
//id 为myid的值
public static long initializeNextSession(long id) {
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id <<56);
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
++nextSid; // this is an unlikely edge case, but check it just in case
}
return nextSid;
}
nextSid即为基准sessionId,由当前时间(低56位)+服务标识构成(高8位)
首先获取当前时间的毫秒值Time.currentElapsedTime()
,左移24位保证将有意义位(非0位)移到高位上,无符号右移8位保证最高位为8个0,不会影响高8位的值
然后将服务id(myid)左移56位移到高8位, 与上面的当前时间异或运算nextSid | (id <<56)
,即为基准sessionId。
- 对于follewer此时不会注册激活会话,会将请求包装为
Request si
- 交给处理链处理
submitRequest
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
1.将会话放到全局session map中
LearnerSessionTracker.touchSession
public boolean touchSession(long sessionId, int sessionTimeout) {
if (localSessionsEnabled) {
if (localSessionTracker.touchSession(sessionId, sessionTimeout)) {
return true;
}
if (!isGlobalSession(sessionId)) {
return false;
}
}
touchTable.get().put(sessionId, sessionTimeout);
return true;
}
AtomicReference<Map<Long, Integer>> touchTable
负责存储当前Follewer服务器的全局session
2.交给请求链处理,Follewer服务器的第一个请求处理器为FollowerRequestProcessor
FollowerRequestProcessor.processRequest
public void processRequest(Request request) {
if (!finished) {
// Before sending the request, check if the request requires a
// global session and what we have is a local session. If so do
// an upgrade.
//如果当前请求是事务请求,但是会话又是本地会话,此时需要升级会话为全局会话
Request upgradeRequest = null;
try {
upgradeRequest = zks.checkUpgradeSession(request);
} catch (KeeperException ke) {
if (request.getHdr() != null) {
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(ke.code().intValue()));
}
request.setException(ke);
LOG.info("Error creating upgrade request", ke);
} catch (IOException ie) {
LOG.error("Unexpected error in upgrade", ie);
}
if (upgradeRequest != null) {
queuedRequests.add(upgradeRequest);
}
queuedRequests.add(request);
}
}
主要逻辑为将请求放到请求存储队列queuedRequests
中,run
方法会不断处理接受到的请求
FollowerRequestProcessor.run
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
zks.getFollower().request(request);
break;
case OpCode.createSession:
case OpCode.closeSession:
// Don't forward local sessions to the leader.
if (!request.isLocalSession()) {
zks.getFollower().request(request);
}
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("FollowerRequestProcessor exited loop!");
}
1.将请求交给下一个处理器CommitProcessor,后面分析
2.事务请求会转交给Leader服务器处理
因为会话创建请求request.type=OpCode.createSession
,一般不会创建本地会话,会调用
FollowerZooKeeperServer.getFollower().request
void request(Request request) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
if (request.request != null) {
request.request.rewind();
int len = request.request.remaining();
byte b[] = new byte[len];
request.request.get(b);
request.request.rewind();
oa.write(b);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
.toByteArray(), request.authInfo);
//发送QuorumPacket qp给leader服务器,并强刷马上发出
writePacket(qp, true);
}
Leader与Follewer同步时会建立socket通道,leader通过LearnerHandler不断接收来自Follewer的通信请求,当request.type=Leader.REQUEST
,会交给处理链处理
LearnerHandler.run
public void run() {
··········省略不相关代码··········
while (true) {
··········省略不相关代码··········
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
leader.zk.submitLearnerRequest(si);
break;
·············不相关代码············
}
}
public void submitLearnerRequest(Request request) {
//因为会话创建请求已经经过Follewer的校验和会话升级(如果需要的话),leader的请求链此时也
//已经初始化完成,所以可直接递交到prepRequestProcessor处理器
prepRequestProcessor.processRequest(request);
}
PrepRequestProcessor.processRequest
public void processRequest(Request request) {
submittedRequests.add(request);
}
将请求放入请求存储队列submittedRequests
,通过run
方法不断处理
PrepRequestProcessor.run
public void run() {
try {
while (true) {
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (Request.requestOfDeath == request) {
break;
}
pRequest(request);
}
}
··········省略异常和日志处理··········
}
调用pRequest(request);
对会话请求处理
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.setHdr(null);
request.setTxn(null);
try {
switch (request.type) {
·············省略不相关请求类型····························
//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
pRequest2Txn(request.type, zks.getNextZxid(), request,
null, true);
}
break;
default:
LOG.warn("unknown type " + request.type);
break;
}
}
··········省略异常和日志处理··········
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
流程主要为:
1.会话请求预处理
protected void pRequest2Txn(int type, long zxid, Request request,
Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));
switch (type) {
·············省略不相关请求类型····························
case OpCode.createSession:
request.request.rewind();
int to = request.request.getInt();
request.setTxn(new CreateSessionTxn(to));
request.request.rewind();
if (request.isLocalSession()) {
// This will add to local session tracker if it is enabled
zks.sessionTracker.addSession(request.sessionId, to);
} else {
// Explicitly add to global session if the flag is not set
zks.sessionTracker.addGlobalSession(request.sessionId, to);
}
zks.setOwner(request.sessionId, request.getOwner());
break;
default:
LOG.warn("unknown type " + type);
break;
}
}
主要流程为:
- 设置了请求事务头
TxnHeader
包含以下属性
//客户端sessionId,用来唯一标示该请求所属的客户端
private long clientId;
//客户端的请求序列号
private int cxid;
//该事务请求对应的事务ZXID
private long zxid;
//Leader服务器开始处理该事务请求的时间
private long time;
//事务请求的类型,如OpCode.createSession
private int type;
其中zxid是基于服务器同步时确定的zxid,即基准zxid,不断自增获得当前事务的zxid。
- 设置请求事务体
Txn
为CreateSessionTxn
包含了会话过期时间
private int timeOut;
- 注册并激活会话
将会话加入到全局会话中,交由会话管理器管理。
LeaderZooKeeperServer.sessionTracker.addGlobalSession
public boolean addGlobalSession(long sessionId, int sessionTimeout) {
boolean added =
globalSessionTracker.addSession(sessionId, sessionTimeout);
return added;
}
SessionTrackerImpl.addSession
public synchronized boolean addSession(long id, int sessionTimeout) {
sessionsWithTimeout.put(id, sessionTimeout);
boolean added = false;
SessionImpl session = sessionsById.get(id);
if (session == null){
session = new SessionImpl(id, sessionTimeout);
}
// findbugs2.0.3 complains about get after put.
// long term strategy would be use computeIfAbsent after JDK 1.8
SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
if (existedSession != null) {
session = existedSession;
} else {
added = true;
LOG.debug("Adding session 0x" + Long.toHexString(id));
}
updateSessionExpiry(session, sessionTimeout);
return added;
}
保存会话到SessionTrackerImpl.sessionsWithTimeout和SessionTrackerImpl.sessionsById中,并放入SessionTrackerImpl.sessionExpiryQueue
中,激活管理会话。参考会话管理
2.将请求交给下一个处理器ProposalRequestProcessor
,进行事务处理
public void processRequest(Request request) throws RequestProcessorException {
if (request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
//请求传递,Commit
nextProcessor.processRequest(request);
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
try {
//Proposal提议
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
//用于事务日志的存储,Sync
syncProcessor.processRequest(request);
}
}
}
包括三个流程:
1.Propasal流程
zookeeper中每个事务请求都需要集群中参与选举的过半机器认可才能应用到内存数据库中,这个投票和统计的过程就是Propasal流程
2.Sync流程
参与投票的服务器都需要将事务记录到事务日志中,完成了事务日志的记录会发送ACK给Leader,表示响应投票
3.Commit流程
每个事务请求都需要在所有服务器上提交,当投票过半后,会通知所有服务器提交请求。
事务流程图如下:
commit流程
ProposalRequestProcessor将请求传递给CommitProcessor
,用于控制事务的提交。
CommitProcessor. processRequest
public void processRequest(Request request) {
if (stopped) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
queuedRequests.add(request);
wakeup();
}
将请求放到请求存储队列queuedRequests
中,并唤醒CommitProcessor线程,说明有请求过来了,可以执行run
方法继续工作。
CommitProcessor. run
public void run() {
try {
//每次循环从queuedRequests队列中取出所有请求进行处理,记录此时的请求数。只先处理这一批,
//防止直接从队列中取出导致不停的读,而无法对请求进行提交下一步处理
int requestsToProcess = 0;
boolean commitIsWaiting = false;
do {
//follewer和leader服务器均共用该提交处理器
//对于follewer,接收到客户端的请求后会调用到此处;接收到leader的commit通知后,也会循环到此处,此时committedRequests队列不为空。
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
// Avoid sync if we have something to do
if (requestsToProcess == 0 && !commitIsWaiting){
// Waiting for requests to process
synchronized (this) {
while (!stopped && requestsToProcess == 0
&& !commitIsWaiting) {
//等待请求的处理
wait();
//说明此时有可提交的请求或有新的请求进来了
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
}
}
}
/*
* Processing up to requestsToProcess requests from the incoming
* queue (queuedRequests), possibly less if a committed request
* is present along with a pending local write. After the loop,
* we process one committed request if commitIsWaiting.
*/
Request request = null;
while (!stopped && requestsToProcess > 0
&& (request = queuedRequests.poll()) != null) {
requestsToProcess--;
if (needCommit(request)
|| pendingRequests.containsKey(request.sessionId)) {
// 事务请求会放入pendingRequests中,按照当前客户端sessionid进行存储
LinkedList<Request> requests = pendingRequests
.get(request.sessionId);
if (requests == null) {
requests = new LinkedList<Request>();
pendingRequests.put(request.sessionId, requests);
}
requests.addLast(request);
}
else {
//非事务请求直接交给nextProcessor处理器处理
sendToNextProcessor(request);
}
//判断是否有可提交的请求
if (!pendingRequests.isEmpty() && !committedRequests.isEmpty()){
/*
* We set commitIsWaiting so that we won't check
* committedRequests again.
*/
//如果有,则停止读取queuedRequests,处理可提交请求
commitIsWaiting = true;
break;
}
}
// Handle a single committed request
if (commitIsWaiting && !stopped){
//事务请求需要等待前面所有的请求处理完毕
waitForEmptyPool();
if (stopped){
return;
}
// Process committed head
if ((request = committedRequests.poll()) == null) {
throw new IOException("Error: committed head is null");
}
/*
* Check if request is pending, if so, update it with the committed info
*/
LinkedList<Request> sessionQueue = pendingRequests
.get(request.sessionId);
if (sessionQueue != null) {
// If session queue != null, then it is also not empty.
Request topPending = sessionQueue.poll();
if (request.cxid != topPending.cxid) {
//如果会话从A服务器移到了B服务器,但是原来A的事务请求也提交到了leader服务器,
//此时会处理这个请求,但可能这个请求的cxid小于等待队列中头部请求的cxid.
LOG.warn("Got request " + request +
" but we are expecting request " + topPending);
sessionQueue.addFirst(topPending);
} else {
//正常情况
topPending.setHdr(request.getHdr());
topPending.setTxn(request.getTxn());
topPending.zxid = request.zxid;
request = topPending;
}
}
sendToNextProcessor(request);
//等待当前事务请求执行完毕
waitForEmptyPool();
/*
* Process following reads if any, remove session queue if
* empty.
*/
if (sessionQueue != null) {
while (!stopped && !sessionQueue.isEmpty()
&& !needCommit(sessionQueue.peek())) {
sendToNextProcessor(sessionQueue.poll());
}
// Remove empty queues
if (sessionQueue.isEmpty()) {
pendingRequests.remove(request.sessionId);
}
}
}
} while (!stoppedMainLoop);
} catch (Throwable e) {
handleException(this.getName(), e);
}
LOG.info("CommitProcessor exited loop!");
}
大致流程为:
不断从请求队列中取出请求,非事务请求直接通过调用nextProcessor,封装为一个任务,移交线程池workerPool
处理。
事务请求需要先放入等待队列pendingRequests
中,等待commit通知。可以commit的时候将其放到committedRequests
队列中,然后调用nextProcessor,封装为一个任务,等待线程池处理完当前所有任务,再处理这个事务请求任务(类似读写锁)。
假设集群只有这一个客户端会话请求,那么:
接收该请求的Follewer
的CommitProcessor处理流程大致为:
请求从queuedRequests 队列移交到pendingRequests队列,然后线程处于wait状态,直到收到leader的commit通知,committedRequests不为空,继续进行处理链的处理。leader
的CommitProcessor处理流程基本同上。
nextProcessor为ToBeAppliedProcessor
,维护了一个可提交的事务请求队列leader.toBeApplied
,此时会将会话请求从该队列中剔除,并移交给它的下一处理器FinalRequestProcessor
处理。
public void processRequest(Request request) {
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
// Need to process local session requests
rc = zks.processTxn(request);
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (request.getHdr() != null) {
TxnHeader hdr = request.getHdr();
Record txn = request.getTxn();
long zxid = hdr.getZxid();
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = zks.outstandingChanges.remove();
if (cr.zxid < zxid) {
LOG.warn("Zxid outstanding " + cr.zxid
+ " is less than current " + zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
}
// do not add non quorum packets to the queue.
if (request.isQuorum()) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
}
//此处表示是其他服务器递交给leader的request类型请求,直接返回不会构造响应
if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn;
String lastOp = "NA";
zks.decInProcess();
Code err = Code.OK;
Record rsp = null;
switch (request.type) {
case OpCode.createSession: {
zks.serverStats().updateLatency(request.createTime);
lastOp = "SESS";
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, Time.currentElapsedTime());
zks.finishSessionInit(request.cnxn, true);
return;
}
}
}
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
ReplyHeader hdr =
new ReplyHeader(request.cxid, lastZxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
request.createTime, Time.currentElapsedTime());
·················省略无关代码和异常处理·············
}
主要流程是将会话事务应用到内存数据库ZKDatabase,并更新服务器相关的事务信息:zxid等;会话管理信息。并封装响应信息给客户端。
Propasal流程 && Sync流程
Leader.propose
public Proposal propose(Request request) throws XidRolloverException {
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg =
"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastProposalSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized(this) {
p.addQuorumVerifier(self.getQuorumVerifier());
if (request.getHdr().getType() == OpCode.reconfig){
self.setLastSeenQuorumVerifier(request.qv, true);
}
if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}
lastProposed = p.packet.getZxid();
// 记录最新的Proposal
outstandingProposals.put(lastProposed, p);
sendPacket(pp);
}
return p;
}
主要流程为:
将请求封装QuorumPacket发送给有选举资格的Follewer服务器,发送信息包括:包类型Leader.PROPOSAL
,事务zxid
,会话请求的内容
。
对于Follewer服务器,当接收到这个PROPOSAL提议
时,会进行事务日志的同步。
Follewer.followLeader
void followLeader() throws InterruptedException {
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
··········省略无关代码··········
}
protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
if (hdr.getType() == OpCode.reconfig){
SetDataTxn setDataTxn = (SetDataTxn) txn;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}
//事务日志的同步
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
//事务日志的提交
fzk.commit(qp.getZxid());
break;
··········省略无关代码··········
}
调用FollowerZooKeeperServer.logRequest
进行同步
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request);
}
syncProcessor.processRequest(request);
}
由SyncRequestProcessor
开始同步请求处理链的处理, syncProcessor.processRequest
将请求放入请求存储队列queuedRequests
,然后处理器线程调用run
方法不断处理同步请求
SyncRequestProcessor.run
public void run() {
try {
int logCount = 0;
//生成快照的随机次数,防止多个Follewer同时进行快照
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
//刷新缓存
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
//写入事务日志
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
//快照处理
randRoll = r.nextInt(snapCount/2);
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
//异步快照
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
//
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
流程为将请求写入事务日志,并添加到toFlush
队列中,然后调用flush
方法将日志刷新到磁盘上
private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException
{
if (toFlush.isEmpty())
return;
//刷新到磁盘
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
if (nextProcessor != null) {
//调用nextProcessor响应投票
nextProcessor.processRequest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
//强刷
((Flushable)nextProcessor).flush();
}
}
流程为:将事务日志刷新到磁盘上,调用nextProcessor响应投票,对于Follewer服务器,nextProcessor为SendAckRequestProcessor
,实现了Flushable
接口,会将响应强刷出去给Leader服务器。
SendAckRequestProcessor.processRequest:
发送ACK响应给Leader
public void processRequest(Request si) {
if(si.type != OpCode.sync){
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,
null);
try {
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
}
同理,对于事务请求,Leader服务器自身的同步也是调用SyncRequestProcessor.processRequest
,与Follewer服务器同步的区别就是leader中的nextProcessor为AckRequestProcessor
,只需本地响应投票即可。
AckRequestProcessor.processRequest
public void processRequest(Request request) {
QuorumPeer self = leader.self;
if(self != null)
leader.processAck(self.getId(), request.zxid, null);
else
LOG.error("Null QuorumPeer");
}
Leader本地响应投票调用leader.processAck
进行投票统计。
当Leader接收到Follewer服务器的投票ACK后的处理为:
LearnerHandler.run
public void run() {
···········省略不相关代码·················
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
ByteBuffer bb;
long sessionId;
int cxid;
int type;
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
···········省略不相关代码·················
}
}
可以看到同样会调用leader.processAck
进行投票统计.
leader.processAck
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
if (!allowedToCommit) return; // last op committed was a leader change - from now on
// the new leader should commit
if (outstandingProposals.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
}
return;
}
if (lastCommitted >= zxid) {
if (LOG.isDebugEnabled()) {
LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted), Long.toHexString(zxid));
}
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid);
if (p == null) {
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
Long.toHexString(zxid), followerAddr);
return;
}
p.addAck(sid);
//如果获得超过一半服务器的投票响应,则可以提交请求
boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){
long curZxid = zxid;
while (allowedToCommit && hasCommitted && p!=null){
curZxid++;
p = outstandingProposals.get(curZxid);
if (p !=null) hasCommitted = tryToCommit(p, curZxid, null);
}
}
}
流程主要为:从outstandingProposals
中取出当前事务的Proposal,加入该响应投票并统计投票是否超过一半,如果超过一半则调用tryToCommit
提交Proposal
``
synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
if (outstandingProposals.containsKey(zxid - 1)) return false;
// in order to be committed, a proposal must be accepted by a quorum.
//
// getting a quorum from all necessary configurations.
if (!p.hasAllQuorums()) {
return false;
}
// commit proposals in order
if (zxid != lastCommitted+1) {
LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
+ " from " + followerAddr + " not first!");
LOG.warn("First is "
+ (lastCommitted+1));
}
outstandingProposals.remove(zxid);
if (p.request != null) {
//加入可提交队列
toBeApplied.add(p);
}
if (p.request == null) {
LOG.warn("Going to commmit null: " + p);
} else if (p.request.getHdr().getType() == OpCode.reconfig) {
LOG.debug("Committing a reconfiguration! " + outstandingProposals.size());
QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
if (designatedLeader != self.getId()) {
allowedToCommit = false;
}
} else {
//发送Commit给Follewer,通知提交请求
commit(zxid);
//发送给Observer服务器,写入事务请求数据
inform(p);
}
//本地提交请求
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
return true;
}
流程主要为:
1.通知所有Follewer可提交请求
public void commit(long zxid) {
synchronized(this){
lastCommitted = zxid;
}
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
sendPacket(qp);
}
发送Leader.COMMIT
给所有Follewer,只带有zxid,因为请求数据内容在Proposal时已发送
2.通知所有Observer可提交请求
public void inform(Proposal proposal) {
QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,
proposal.packet.getData(), null);
sendObserverPacket(qp);
}
发送Leader.INFORM
给所有Observer,带有zxid和请求数据内容。
3.通知本地Leader可提交事务请求
CommitProcessor.commit
public void commit(Request request) {
if (stopped || request == null) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Committing request:: " + request);
}
committedRequests.add(request);
wakeup();
}
也就是将请求加入到CommitProcessor.committedRequests
队列中,此时会将请求交给下一处理器ToBeAppliedProcessor
进行处理。
感谢您的阅读,我是Monica23334 || Monica2333 。立下每周写一篇原创文章flag的小姐姐,关注我并期待打脸吧~