写在前面
分布式系统就是多个进程协同工作,干好一件事。进程间协调工作,当前我们一般都用zookeeper或者类似的替代品。其中有一个很重要的机制就是watcher机制。为了了解zookeeper的watcher机制,笔者就花了点时间撸了下zookeeper的源码。
下面就记录一下走读的过程。
代码走读
watcher机制的整个流程
使用zk的watcher需要客户端向服务端注册watcher。流程为:
zk客户端向服务端注册watcher,并将watcher存放到自己的ZKWatchManager里。服务端有变化时,判断当前节点是否注册了watcher,注册了,则发送WatchedEvent给客户端。客户端通过发送过来的path找到watcher,进行相应的操作。
客户端注册watcher源码走读
阅读Zookeeper.java类发现,只有通过exists,getChildren,getData 三个操作才能注册watcher。
以getData操作为例来走读注册过程。
先上代码:
public void getData(final String path, Watcher watcher,
DataCallback cb, Object ctx)
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
//包装watcher
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
// 发送请求到服务端
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
clientPath, serverPath, ctx, wcb);
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
synchronized (outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//将请求放入待发送队列里
outgoingQueue.add(packet);
}
}
//唤醒发送线程,发送请求到服务端
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
这里看下sendThread发送线程做了啥操作。
在SendThread的run方法里正在去处理发送的是clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
这句。
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
上面一段代码就是用多路复用的方式发送请求到服务端了。
doIO 里处理write请求的代码片段如下:
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
到这里,注册流程中第一步完成了。客户端向服务端注册完成后,服务端返回结果给客户端。这里就是相反的路径了。从channel里读取流,然后反序列化成watchEvent,并通知客户端,经watcher放到客户的ZkWatchmanager里。
首先是读取流。
//读事件准备就绪,从通道里读取数据。
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
//此处省略不影响分析流程的代码
} else {
//这里进行响应的处理
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
readResponse的代码片段如下:
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
// ping操作的响应
if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
// 表示一个鉴权的响应
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
// 表示服务端有变化的响应
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
// 反序列化event,通过deserialize方法看以看出,服务端只返回了event的类型,状态及变化的节点路径path
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//放到待处理的队列里,通过path找到对应的watcher进行回调处理。
eventThread.queueEvent( we );
return;
}
} finally {
//注册
finishPacket(packet);
}
}
注册代码
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
// 注册
p.watchRegistration.register(p.replyHeader.getErr());
}
//...
public void register(int rc) {
if (shouldAddWatch(rc)) {
//获取ZKWatchManager中的watcher缓存
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
从以上代码中可以看出, 客户端在定义 watcher 之后会将其与 path 绑定添加到 ZKWatchManager.dataWatches; 从而完成 watcher 的注册。
watcher在服务端注册代码走读
客户端通过doIo操作向服务端发送注册请求。服务端在接受到请求后代码如下:
NIOServerCnxnFactory.java 主要就是接受客户端请求。代码如下:
public void run() {
for (SelectionKey k : selectedList) {
// .... 如果通道准备就绪,且可以读或写
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
//开始处理网络的读写请求
c.doIO(k);
} else {
。。。。
}
只有当服务端的通道读准备就绪时,才能接受来自客户端的注册请求。
走读NIOServerCnxn的doIO代码:
void doIO(SelectionKey k) throws InterruptedException {
try {
if (isSocketOpen() == false) {
LOG.warn("trying to do i/o on a null socket for session:0x"
+ Long.toHexString(sessionId));
return;
}
if (k.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
if (incomingBuffer == lenBuffer) { // start of next request
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
if (isPayload) { // not the case for 4letterword
//准备就绪,开始处理。
readPayload();
}
else {
// four letter words take care
// need not do anything else
return;
}
}
}
//...省略
private void readPayload() throws IOException, InterruptedException {
// 读request
readRequest();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
最后会调用到zookeeperServer类里的submitRequest方法。这里会调用FinalRequestProcessor的processRequest。
本文以getData为例走读watcher的注册流程,摘录processRequest的getData的处理。
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
Long aclL;
synchronized(n) {
aclL = n.acl;
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
中是注册watcher操作。进一步跟进。
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
//watcher不为空,则注册watcher
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
public synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
上面代码表示watcher注册时,放入到WatchManager类里的watchTable和watch2Paths里。watchTable通过path索引watcher,watch2Paths表示watcher对应的path。
到此,服务端的watcher注册就完成了。通过流程图来总结一下:
至此,watcher注册流程代码就走读完成了。
watcher触发
通过一个时序图来看流程。具体的代码之前已经有所涉及。
到此watcher的相关代码应经全部走读完成。