5.watch机制和实现原理
客户端发送命令
ZooKeeper zk = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event -> {
System.out.println("收到事件:" + event.getType());
try {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
// 如果是节点数据变化事件,可以在这里处理
System.out.println("节点数据发生变化");
} else if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
// 如果是子节点列表变化事件,可以在这里处理
System.out.println("子节点列表发生变化");
}
} catch (Exception e) {
e.printStackTrace();
}
});
// 监听某个路径下的数据变化
String path = "/example";
// 发送命令
zk.exists(path, true);
调用zk提供的api后,会对参数进行包装成一个existRequest,然后将这个request 发送给 zk的服务端,服务端收到请求后,返回对应节点的stat,如果添加了watch,服务端会对该path添加watch
** 封装请求包如下**
public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException {
final String clientPath = path;
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
// 封包逻辑,exist的type是3
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
// 请求体,服务端最后解析拿到的是这个请求对象,上面的请求只是用于将该请求转发给exist处理器
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath); // 要查看的patch
request.setWatch(watcher != null); // watch 是 true
SetDataResponse response = new SetDataResponse();
// 发送请求
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
服务端处理逻辑
处理的代码位置:org.apache.zookeeper.server.FinalRequestProcessor#processRequest
case OpCode.exists: {
lastOp = "EXIS";
// 上面客户端发送的 ExistsRequest , 注意没有请求头!!!
ExistsRequest existsRequest = request.readRequestRecord(ExistsRequest::new); // 从网络流中读取exist请求对象
path = existsRequest.getPath(); // 要查看的path
DataNode n = zks.getZKDatabase().getNode(path);
// 这里会判断是否需要watch,是的话会加上watch,不是就不加
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
// 返回 path的 stat
rsp = new ExistsResponse(stat);
requestPathMetricsCollector.registerRequest(request.type, path);
break;
}
通过上面代码看到,服务端收到请求后,
- 从zkTree中读取了path所属的node信息,
- 判断是否要添加watch
- 然后就是返回了节点的stat信息。
可以看到,服务端在查询node的stat的时候,会判断是否需要添加watch,是的话会为这个path所属的node添加一个watch
添加watch
最终通过WatchManager 添加到 watchTable 和 watch2Paths 中,简单代码如下:
public Stat statNode(String path, Watcher watcher) throws NoNodeException {
if (watcher != null) {
// watch 不为空,添加watch
dataWatches.addWatch(path, watcher);
}
DataNode n = nodes.get(path);
Stat stat = new Stat();
synchronized (n) {
n.copyStat(stat);
}
return stat;
}
public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) {
// 监听器已关闭
if (isDeadWatcher(watcher)) {
LOG.debug("Ignoring addWatch with closed cnxn");
return false;
}
Set<Watcher> list = watchTable.get(path);
if (list == null) {
list = new HashSet<>(4);
watchTable.put(path, list);
}
list.add(watcher);
Map<String, WatchStats> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashMap<>();
watch2Paths.put(watcher, paths);
}
return false;
}
那么 server是如何处理watch的?并通知客户端??
处理watch
watchManager中有一个函数:triggerWatch
org.apache.zookeeper.server.watch.WatchManager#triggerWatch(java.lang.String, org.apache.zookeeper.Watcher.Event.EventType, long, java.util.List<org.apache.zookeeper.data.ACL>)
在setData、以及其他修改数据的时候,回触发这个函数,

过滤可用watcher的代码,
如注释所说,watcher只能使用一次。
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, List<ACL> acl, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
// 创建最终可用的watch 集合
Set<Watcher> watchers = new HashSet<>();
// 过滤出可用的watcher,并将其添加到watchers集合中 需要保证 watchTable 、watch2Paths 都有对应的数据
// 注意watcher只能使用一次,使用过之后,就会将其移除
synchronized (this) {
PathParentIterator pathParentIterator = getPathParentIterator(path);
for (String localPath : pathParentIterator.asIterable()) {
Set<Watcher> thisWatchers = watchTable.get(localPath);
if (thisWatchers == null || thisWatchers.isEmpty()) {
continue;
}
Iterator<Watcher> iterator = thisWatchers.iterator();
while (iterator.hasNext()) {
Watcher watcher = iterator.next();
Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap());
WatchStats stats = paths.get(localPath);
if (stats == null) {
LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath);
continue;
}
if (!pathParentIterator.atParentPath()) {
watchers.add(watcher);
WatchStats newStats = stats.removeMode(WatcherMode.STANDARD);
if (newStats == WatchStats.NONE) {
iterator.remove();
paths.remove(localPath);
} else if (newStats != stats) {
paths.put(localPath, newStats);
}
} else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
watchers.add(watcher);
}
}
// 使用之后,将watcher 移除
if (thisWatchers.isEmpty()) {
watchTable.remove(localPath);
}
}
}
// 没有可用的watcher 直接结束
if (watchers.isEmpty()) { return null;}
// 分别调用 需要被触发的watcher
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
if (w instanceof ServerWatcher) {
((ServerWatcher) w).process(e, acl);
} else {
w.process(e);
}
}
return new WatcherOrBitSet(watchers);
}
** 回调逻辑**
org.apache.zookeeper.server.NIOServerCnxn#process
public void process(WatchedEvent event, List<ACL> znodeAcl) {
// 请求头 ,
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
WatcherEvent e = event.getWrapper();
// The last parameter OpCode here is used to select the response cache.
// Passing OpCode.error (with a value of -1) means we don't care, as we don't need
// response cache on delivering watcher events.
// 发送请求
int responseSize = sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
}
那么客户端是如何响应接受这个请求的呢?
客户端处理网络数据,将网络请求封包,然后添加到队列中
org.apache.zookeeper.ClientCnxn.SendThread#run
org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
org.apache.zookeeper.ClientCnxnSocketNIO#doIO
org.apache.zookeeper.ClientCnxnSocket#readConnectResult
org.apache.zookeeper.ClientCnxn.SendThread#onConnected
org.apache.zookeeper.ClientCnxn.EventThread#queueEvent(org.apache.zookeeper.WatchedEvent)
org.apache.zookeeper.ClientCnxn.EventThread#queueEvent(org.apache.zookeeper.WatchedEvent, java.util.Set<org.apache.zookeeper.Watcher>)
范德萨
将事件添加到org.apache.zookeeper.ClientCnxn.EventThread#waitingEvents
EventThread会循环遍历这个队列
org.apache.zookeeper.ClientCnxn.EventThread#run
org.apache.zookeeper.ClientCnxn.EventThread#processEvent :回调watch的process函数
这样就完成了一个完整的逻辑。