3.zk 客户端启动
2025/8/4大约 2 分钟
3.zk 客户端启动
创建 zookeeper client:
public static void main(String[] args) throws Exception {
// 创建ZooKeeper实例
ZooKeeper zk = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, event->{
System.out.println("收到事件:" + event.getType());
try {
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
// 如果是节点数据变化事件,可以在这里处理
System.out.println("节点数据发生变化");
} else if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
// 如果是子节点列表变化事件,可以在这里处理
System.out.println("子节点列表发生变化");
}
} catch (Exception e) {
e.printStackTrace();
}
});
// 监听某个路径下的数据变化
String path = "/example";
zk.exists(path, true);
// 或者监听子节点变化
zk.getChildren(path, true);
// 防止程序立即退出,等待接收事件
Thread.sleep(Long.MAX_VALUE);
}
2 重点看下zookeeper 的启动代码
org.apache.zookeeper.ZooKeeper#ZooKeeper(java.lang.String, int, org.apache.zookeeper.Watcher, boolean, org.apache.zookeeper.client.HostProvider, org.apache.zookeeper.client.ZKClientConfig)
- connectString:连接参数
- sessionTimeout:超时事件
- watcher:默认的watch,通过构造参数添加的就是默认watch
- clientConfig 配置对象,这个再配置网络方面有用
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider hostProvider, ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString={} sessionTimeout={} watcher={}", new Object[]{connectString, sessionTimeout, watcher});
this.clientConfig = clientConfig != null ? clientConfig : new ZKClientConfig();
this.hostProvider = hostProvider;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
// 核心,创建底层的网络连接管理对象
this.cnxn = this.createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this.clientConfig, watcher, this.getClientCnxnSocket(), canBeReadOnly);
// 启动客户端
this.cnxn.start();
}
2.1 createConnection
这个函数会创建:ClientCnxn ,这个类用于管理zk客户端的网络请求,对应的服务端用于处理网络的类是:ServerCnxn
2.2 创建 ClientCnxn
org.apache.zookeeper.ClientCnxn#ClientCnxn
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
// 认证队列
this.authInfo = new CopyOnWriteArraySet();
// 待处理请求队列????
this.pendingQueue = new ArrayDeque();
// 待发送请求队列
this.outgoingQueue = new LinkedBlockingDeque();
this.closing = false;
this.seenRwServerBefore = false;
// 干啥的?
this.eventOfDeath = new Object();
this.xid = 1; // id
this.state = States.NOT_CONNECTED;
this.chrootPath = chrootPath;
this.hostProvider = hostProvider;
this.sessionTimeout = sessionTimeout;
this.clientConfig = clientConfig;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.readOnly = canBeReadOnly;
// watch管理器,我们再zookeeper中传入的watch会作为默认watch
this.watchManager = new ZKWatchManager(clientConfig.getBoolean("zookeeper.disableAutoWatchReset"), defaultWatcher);
this.connectTimeout = sessionTimeout / hostProvider.size();
this.readTimeout = sessionTimeout * 2 / 3;
// 读写线程,这里
this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
this.initRequestTimeout();
}
SendThread中的构造参数:clientCnxnSocket,根据配置自动选择是netty实现,还是NIO实现的客户端,默认是NIO客户端:org.apache.zookeeper.ClientCnxnSocketNIO
2.3 启动zk 网络管理器 cnxn
zk的客户端的网络管理器的启动很简单,就是启动 sendThread 和 eventThread
public void start() {
this.sendThread.start();
this.eventThread.start();
}