下面我将为你详细介绍如何使用 kazoo 库来操作 Zookeeper,包括安装、连接、基本 API 使用、高级特性以及一个完整的示例。
安装 kazoo 库
你需要通过 pip 来安装 kazoo。
pip install kazoo
连接到 Zookeeper
使用 kazoo 的第一步是创建一个 KazooClient 实例并连接到 Zookeeper 集群。
from kazoo.client import KazooClient
# 1. 创建一个 KazooClient 实例
# zk_host 可以是单个地址('127.0.0.1:2181')或多个地址组成的逗号分隔的字符串('127.0.0.1:2181,127.0.0.1:2182')
zk = KazooClient(hosts='127.0.0.1:2181')
# 2. 启动连接
# start() 方法是异步的,它会启动一个后台线程来维护连接状态
try:
zk.start()
print("成功连接到 Zookeeper!")
except Exception as e:
print(f"连接 Zookeeper 失败: {e}")
# ... 在这里执行你的 Zookeeper 操作 ...
# 3. 关闭连接
# 当程序结束时,务必关闭连接以释放资源
# zk.stop()
# zk.close()
重要提示:
hosts参数: 建议始终提供 Zookeeper 集群中多个节点的地址,这样即使一个节点宕机,kazoo也能自动切换到其他可用节点,提高了连接的可靠性。- 连接状态:
kazoo提供了监听连接状态的机制,例如监听连接丢失、连接恢复等,这对于构建健壮的应用非常重要。
核心 API 操作
kazoo 的 API 设计非常直观,与 Zookeeper 的原生命令一一对应。
1 节点操作
创建节点
# 确保根节点 '/my_zk' 存在
zk.ensure_path('/my_zk')
# 1. 创建一个持久化节点
# ephemeral=False (默认), sequence=False (默认)
zk.create('/my_zk/node1', b'value1', makepath=True) # makepath=True 会自动创建不存在的父节点
# 2. 创建一个临时节点
# 客户端会话结束时,节点会被自动删除
zk.create('/my_zk/ephemeral_node', b'ephemeral_value', ephemeral=True)
# 3. 创建一个顺序持久化节点
# Zookeeper 会在节点名后附加一个 10 位数的递增序号
# 创建 '/my_zk/seq_node',实际可能创建出 '/my_zk/seq_node0000000001'
zk.create('/my_zk/seq_node', b'sequential_value', sequence=True)
print("节点创建成功")
获取节点数据
# 获取节点数据和节点状态信息
data, stat = zk.get('/my_zk/node1')
print(f"节点数据: {data.decode('utf-8')}")
print(f"节点状态: {stat}") # stat 包含版本号、创建时间等信息
更新节点数据
# 更新节点数据
# version 参数用于乐观锁,防止并发更新导致数据覆盖
# 如果提供的 version 与服务器上的 version 不匹配,更新会失败
# 可以先获取 stat 对象,然后使用其 version 属性
try:
zk.set('/my_zk/node1', b'new_value1', version=stat.version)
print("节点数据更新成功")
except Exception as e:
print(f"更新失败: {e}")
删除节点
# 删除节点
# 同样需要提供正确的 version
try:
zk.delete('/my_zk/node1', version=stat.version)
print("节点删除成功")
except Exception as e:
print(f"删除失败: {e}")
2 子节点操作
获取子节点列表
# 获取指定路径下的直接子节点列表
children = zk.get_children('/my_zk')
print(f"子节点列表: {children}")
高级特性:Watcher (事件监听)
这是 Zookeeper 最强大的功能之一。kazoo 提供了两种监听方式:
- 一次性监听 (One-time Watchers): 触发一次后自动失效。
- 持久监听 (Persistent Watchers): 触发后不会失效,需要手动取消。
1 一次性监听
最常见的用法是在 get 和 get_children 时注册监听器。
# 定义一个回调函数,当事件发生时会被调用
def my_watch_handler(event):
"""
event 对象包含:
- event.type: 事件类型,如 'NODE_DATA_CHANGED', 'NODE_DELETED', 'NODE_CHILDREN_CHANGED'
- event.path: 发生事件的节点路径
- event.state: 连接状态
"""
print(f"事件触发! 类型: {event.type}, 路径: {event.path}")
# 事件触发后,可以在这里重新获取数据或执行其他逻辑
if event.type == 'NODE_DATA_CHANGED':
new_data, _ = zk.get('/my_zk/node1', watch=my_watch_handler)
print(f"重新获取数据: {new_data.decode('utf-8')}")
elif event.type == 'NODE_DELETED':
print(f"节点 {event.path} 已被删除")
# 1. 监听节点数据变化
# 在第一次 get 时注册监听器
data, stat = zk.get('/my_zk/node1', watch=my_watch_handler)
print(f"初始数据: {data.decode('utf-8')}")
# 在另一个终端或脚本中修改节点数据
# zk.set('/my_zk/node1', b'value_changed_in_another_process')
# 2. 监听子节点变化
# children = zk.get_children('/my_zk', watch=my_watch_handler)
# print(f"初始子节点列表: {children}")
# 在另一个终端或脚本中创建或删除子节点
# zk.create('/my_zk/new_child', b'new_child_data')
注意: kazoo 的 watch 是一次性的,如果你想在事件发生后继续监听,必须在回调函数中再次调用 get 或 get_children 并传入 watch 参数,如上面的 my_watch_handler 函数所示。
2 持久监听
# 持久监听器
def persistent_watch_handler(event):
print(f"持久监听器触发! 类型: {event.type}, 路径: {event.path}")
# 创建一个持久监听
zk.get('/my_zk/node1', watch=persistent_watch_handler)
# 手动取消持久监听
# zk.stop_watch()
# 修改数据,监听器会持续触发
# zk.set('/my_zk/node1', b'value_for_persistent_watch')
完整示例:分布式锁
Zookeeper 经典的应用场景之一是实现分布式锁,下面是一个使用 kazoo 的 Lock 对象实现分布式锁的简单示例。
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, NodeExistsError
from kazoo.recipe.lock import Lock
import time
import threading
# Zookeeper 地址
ZK_HOST = '127.0.0.1:2181'
LOCK_PATH = '/my_distributed_lock'
def worker(worker_id):
zk = KazooClient(hosts=ZK_HOST)
zk.start()
print(f"Worker {worker_id} 启动并尝试获取锁...")
# 创建一个锁对象
lock = Lock(zk, LOCK_PATH)
try:
# 尝试获取锁,阻塞直到获取成功
# with lock 语句会在获取锁后执行代码块,并在退出时自动释放锁
with lock:
print(f"Worker {worker_id} 成功获取锁!开始执行任务...")
# 模拟任务执行
time.sleep(5)
print(f"Worker {worker_id} 任务完成,释放锁。")
except Exception as e:
print(f"Worker {worker_id} 获取锁时发生错误: {e}")
finally:
zk.stop()
print(f"Worker {worker_id} 已关闭 Zookeeper 连接。")
# 模拟多个工作线程竞争同一个锁
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("所有工作线程已完成。")
如何运行这个示例:
- 启动你的 Zookeeper 服务。
- 运行上面的 Python 脚本。
- 你会看到,只有一个线程能成功打印 "成功获取锁!",其他线程会阻塞,当第一个线程释放锁后,下一个线程才能获取到锁。
其他库和注意事项
python-zookeeper: 这是一个更底层的库,直接是 Zookeeper C 库的 Python 封装,它功能强大但 API 相对复杂,并且需要手动编译安装 C 库,不推荐在新的项目中使用。kazoovszookeeper:kazoo是纯 Python 实现的,对开发者更友好,功能也更高级(如提供了分布式锁、队列等现成的 recipe)。python-zookeeper是 C 绑定,性能理论上更高,但使用起来更麻烦。
最佳实践:
- 总是使用集群地址:在
hosts参数中提供多个 Zookeeper 节点。 - 处理异常:网络不稳定、Zookeeper 宕机等情况都可能抛出异常,代码需要有完善的异常处理机制。
- 理解 Watcher 的生命周期:记住一次性 Watcher 的特性,并在需要时重新注册。
- 管理连接:程序结束时调用
stop()和close()来释放资源。 - 使用
ensure_path:在创建节点前,使用ensure_path可以避免因父节点不存在而创建失败,简化代码。
希望这份详细的指南能帮助你顺利地在 Python 中使用 Zookeeper!
