2.Zookeeper server 启动
zk 客户端工具:https://github.com/vran-dev/PrettyZoo/releases (已停止维护)
1. 拉起代码
GitHub
2. 导入idea
2.1 编译
导入idea之后,先不要进行启动代码,先进行编译一次:
mvn clean package -DskipTests
2.2更新pom
更新 server 模块下的pom,因有些依赖的scope是provide,将其注释掉,直接导入
- jetty 相关
- snappy-java
- metrics-core
2.3 配置文件
打开conf目录,将zoo_sample.cfg 复制一份,更新其中的dataDir,然后复制绝对路径。
3 运行Zk
根据启动脚本,知道zK的主启动类是:
org.apache.zookeeper.server.quorum.QuorumPeerMain#main
运行前,将 配置文件路径拷贝到程序启动参数中。
流程解析
org.apache.zookeeper.server.quorum.QuorumPeerMain#main
org.apache.zookeeper.server.quorum.QuorumPeerMain#initializeAndRun
- 初始化配置 QuorumPeerConfig
- 初始化默认配置
- 读取刚刚传入的配置文件中的配置,已配置文件中的配置覆盖默认配置,如果有。
- 创建和启动:DatadirCleanupManager 数据文件清理器
- 用于清理快照和事务日志
- 清理任务实现:org.apache.zookeeper.server.DatadirCleanupManager.PurgeTask#PurgeTask
- 这里只创建了没有启动
- 判断是分布式还是单机?这里是单机模式
- 启动单机模式:org.apache.zookeeper.server.ZooKeeperServerMain#main
- 启动单机模式:org.apache.zookeeper.server.ZooKeeperServerMain#initializeAndRun
- 初始化配置对象:ServerConfig
- 启动单机模式:org.apache.zookeeper.server.ZooKeeperServerMain#runFromConfig
4.启动单机模式
代码入口位置:org.apache.zookeeper.server.ZooKeeperServerMain#runFromConfig
4.1指标相关
忽略不看,主要就是采集zk的信息,方便后期发送给promthes之类
4.2 权限认证
加载Zk的权限认证组件
ProviderRegistry.initialize();
4.3 FileTxnSnapLog 工具类
FileTxnSnapLog 是一个工具类用于协助管理 操作 dataDir
, snapDir
两个类
分别存放:
// 事务日志文件
- txnLog = new FileTxnLog(this.dataDir);
// 快照文件
- snapLog = new FileSnap(this.snapDir);
4.4 创建zk实例
org.apache.zookeeper.server.ZooKeeperServer#ZooKeeperServer(org.apache.zookeeper.server.util.JvmPauseMonitor, org.apache.zookeeper.server.persistence.FileTxnSnapLog, int, int, int, int, org.apache.zookeeper.server.ZKDatabase, java.lang.String)
ZooKeeperServer 430 行
/**
* * Creates a ZooKeeperServer instance. It sets everything up, but doesn't
* actually start listening for clients until run() is invoked.
*
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int clientPortListenBacklog, ZKDatabase zkDb, String initialConfig, boolean reconfigEnabled) {
// 用于记录服务器运行时的各种统计信息(如处理请求数量、延迟等)。
serverStats = new ServerStats(this);
// 管理事务日志和快照的工厂类。
this.txnLogFactory = txnLogFactory;
this.txnLogFactory.setServerStats(this.serverStats);
// 存储 ZooKeeper 数据树和事务日志的数据库对象
this.zkDb = zkDb;
// ZooKeeper 的基本时间单位(以毫秒为单位),用于控制心跳、超时等。
this.tickTime = tickTime;
// 客户端会话最小超时时间。
setMinSessionTimeout(minSessionTimeout);
// 客户端会话最大超时时间。
setMaxSessionTimeout(maxSessionTimeout);
// 客户端连接请求的最大排队数。 TCP/IP 协议中的 listen backlog 参数。
this.listenBacklog = clientPortListenBacklog;
//是否启用动态重新配置。
this.reconfigEnabled = reconfigEnabled;
// 用于监听服务器状态变化(比如启动、停止等事件)
listener = new ZooKeeperServerListenerImpl(this);
// 初始化读取响应缓存(用于 getData 请求),大小从系统属性或默认值获取。
readResponseCache = new ResponseCache(Integer.getInteger(
GET_DATA_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getData");
// 初始化获取子节点响应缓存(用于 getChildren 请求),大小从系统属性或默认值获取。
getChildrenResponseCache = new ResponseCache(Integer.getInteger(
GET_CHILDREN_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE), "getChildren");
// 保存初始配置字符串,可能用于后续配置恢复或重载。
this.initialConfig = initialConfig;
// 初始化一个路径级别的请求度量收集器,用于监控不同路径的访问情况。
this.requestPathMetricsCollector = new RequestPathMetricsCollector();
// 调用方法初始化大请求限流设置,防止某些请求影响整体性能。
this.initLargeRequestThrottlingSettings();
}
4.5 创建、启动adminServer 实例
// Start Admin server
adminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();
这个东西我就不看了. 管理控制台
4.6 创建、启动 服务端网络连接工厂 ServerCnxnFactory
if (config.getClientPortAddress() != null) {
// 服务端网络连接工厂, 默认是:NIOServerCnxnFactory,可以指定netty的实现 NettyServerCnxnFactory
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
config.getClientPortListenBacklog(),
false);
cnxnFactory.startup(zkServer);
// zkServer has been started. So we don't need to start it again in secureCnxnFactory.
needStartZKServer = false;
}
- 如果配置文件
zoo.cfg
中配置了网络相关部分:如:0.0.0.0:2181
则开始进行网络配置 - 初始化网络请求服务工厂类,默认是使用NIO,也可以通过参数:
zookeeper.serverCnxnFactory
指定netty 对服务端网络进行配置:cnxnFactory.configure
- 启动网络服务
- 启动IO读取线程
- accept线程
- expire线程
- 启动zkServer
- 启动ZkDb,初始化或者加载本地数据到内存
- setupRequestProcessors 请求器链
- PrepRequestProcessor:进行预处理,生成事务头zxid,日志记录等,读请求直接跳过,写请求必须被处理
- SyncRequestProcessor:负责写事务日志
- FinalRequestProcessor:响应数据给客户,更新内存树
- startRequestThrottler 限流器
在一个典型的 ZooKeeper 请求处理流程中,这些处理器按照如下顺序协同工作:
- PrepRequestProcessor首先接收到来自客户端的请求,并对其进行预处理,包括检查和生成必要的元数据。
- 然后,它将请求转发给SyncRequestProcessor,后者负责将所有相关的写操作安全地记录到磁盘上。
- 最后,经过同步处理的请求会被传递给FinalRequestProcessor,在这里完成最终的数据更新并将结果反馈给客户端。
这种设计模式允许 ZooKeeper 实现高效且可靠的服务,同时保持良好的扩展性和性能。每种处理器专注于自己的职责,使得整个系统既灵活又强大。
4.7 ContainerManager:ZNode容器管理者创建与启动
containerManager = new ContainerManager(
zkServer.getZKDatabase(),
zkServer.firstProcessor,
Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
Integer.getInteger("znode.container.maxPerMinute", 10000),
Long.getLong("znode.container.maxNeverUsedIntervalMs", 0)
);
containerManager.start();
这个组件的代码狠简单,创建了一个Java的定时器timer,然后start的时候,创建了一个任务timerTask提交到timer中,定时循环执行,通过不断调用 org.apache.zookeeper.server.ContainerManager#checkContainers
达到清理的目的,而这个清理,又调用了
- 清理目标:zkDb
- 清理手段:FinalRequestProcessor#processRequest