Zookeeper
Apache ZooKeeper 是分布式协调服务,提供命名服务、配置管理、分布式锁、Leader 选举等原语。基于 ZAB(ZooKeeper Atomic Broadcast)协议保证强一致性,是 Kafka、Hadoop、HBase 等众多中间件的依赖组件。
数据模型
ZooKeeper 以树形结构组织数据,每个节点称为 ZNode:
/
├── /config
│ └── /config/db
├── /locks
│ └── /locks/order-lock
└── /services
└── /services/user-service
ZNode 类型
| 类型 | 说明 |
|---|---|
| 持久节点(PERSISTENT) | 默认类型,客户端断连后仍存在 |
| 临时节点(EPHEMERAL) | 客户端会话结束自动删除,用于服务注册 |
| 持久顺序节点(PERSISTENT_SEQUENTIAL) | 创建时自动追加递增序号 |
| 临时顺序节点(EPHEMERAL_SEQUENTIAL) | 临时 + 顺序,用于分布式锁、选举 |
核心机制
Watcher 监听
客户端可对 ZNode 注册 Watcher,节点数据变更或子节点变化时收到一次性通知(触发后需重新注册):
zk.getData("/config/db", event -> {
if (event.getType() == EventType.NodeDataChanged) {
byte[] data = zk.getData("/config/db", true, null);
// 重新加载配置
}
}, null);ZAB 协议
集群中只有一个 Leader 处理写请求,写操作广播到所有 Follower,超过半数确认后才返回成功,保证强一致性(非最终一致)。
集群部署
Client
│
├── Server 1(Leader,处理写)
├── Server 2(Follower,处理读)
└── Server 3(Follower,处理读)
配置文件 zoo.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888每台服务器在 dataDir 下创建 myid 文件,内容为对应的 server.id。
常用命令(zkCli)
zkCli.sh -server localhost:2181
ls / # 列出子节点
get /config/db # 读取节点数据
create /locks "" # 创建持久节点
create -e /services/node1 "192.168.1.1" # 创建临时节点
set /config/db "jdbc:mysql://new-host" # 更新数据
delete /locks/order-lock # 删除节点
deleteall /locks # 递归删除
get -w /config/db # 监听数据变化(触发一次)
ls -w /services # 监听子节点变化典型应用场景
分布式锁
利用临时顺序节点实现公平锁:
1. 创建临时顺序节点 /locks/lock-000000001
2. 获取 /locks 下所有子节点并排序
3. 若自己序号最小 → 获得锁
4. 否则 → 监听前一个节点,等待其删除后重试
5. 释放锁:删除自己的节点(会话断开自动删除)
Leader 选举
各节点创建临时顺序节点,序号最小者成为 Leader;Leader 下线后序号次小者接任。
配置中心
将配置写入 ZNode,所有服务注册 Watcher,配置变更时收到通知并热加载。
服务注册与发现
服务启动时创建临时节点(如 /services/user/192.168.1.1:8080),宕机后临时节点自动消失,消费方通过监听子节点感知上下线。
Java 客户端(Curator)
Apache Curator 是 ZooKeeper 的高级封装,简化了重连、Watcher 重注册等操作:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.5.0</version>
</dependency>CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
// 分布式锁
InterProcessMutex lock = new InterProcessMutex(client, "/locks/order-lock");
lock.acquire();
try {
// 临界区
} finally {
lock.release();
}
// 持久监听节点变化(自动重注册)
NodeCache nodeCache = new NodeCache(client, "/config/db");
nodeCache.getListenable().addListener(() -> {
byte[] data = nodeCache.getCurrentData().getData();
System.out.println("配置变更:" + new String(data));
});
nodeCache.start();注意事项
- 适合小数据、低频写场景,不适合存储大量业务数据
- 集群节点数建议为奇数(3/5/7),超过半数存活才能提供服务
- 会话超时(
sessionTimeout)决定临时节点存活时间,默认 30s - 生产环境建议使用 Curator,原生客户端 API 较底层
相关链接
- Seata — 分布式事务,HA 模式依赖 ZooKeeper 存储事务日志
- Canal — binlog 订阅,HA 模式依赖 ZooKeeper 进行主备切换
- ShardingSphere — 分库分表,集群模式使用 ZooKeeper 做配置中心