杰瑞科技汇

python调用zookeeper

下面我将为你详细介绍如何使用 kazoo 库来操作 Zookeeper,包括安装、连接、基本 API 使用、高级特性以及一个完整的示例。


安装 kazoo

你需要通过 pip 来安装 kazoo

pip install kazoo

连接到 Zookeeper

使用 kazoo 的第一步是创建一个 KazooClient 实例并连接到 Zookeeper 集群。

from kazoo.client import KazooClient
# 1. 创建一个 KazooClient 实例
# zk_host 可以是单个地址('127.0.0.1:2181')或多个地址组成的逗号分隔的字符串('127.0.0.1:2181,127.0.0.1:2182')
zk = KazooClient(hosts='127.0.0.1:2181')
# 2. 启动连接
# start() 方法是异步的,它会启动一个后台线程来维护连接状态
try:
    zk.start()
    print("成功连接到 Zookeeper!")
except Exception as e:
    print(f"连接 Zookeeper 失败: {e}")
# ... 在这里执行你的 Zookeeper 操作 ...
# 3. 关闭连接
# 当程序结束时,务必关闭连接以释放资源
# zk.stop()
# zk.close()

重要提示:

  • hosts 参数: 建议始终提供 Zookeeper 集群中多个节点的地址,这样即使一个节点宕机,kazoo 也能自动切换到其他可用节点,提高了连接的可靠性。
  • 连接状态: kazoo 提供了监听连接状态的机制,例如监听连接丢失、连接恢复等,这对于构建健壮的应用非常重要。

核心 API 操作

kazoo 的 API 设计非常直观,与 Zookeeper 的原生命令一一对应。

1 节点操作

创建节点

# 确保根节点 '/my_zk' 存在
zk.ensure_path('/my_zk')
# 1. 创建一个持久化节点
# ephemeral=False (默认), sequence=False (默认)
zk.create('/my_zk/node1', b'value1', makepath=True) # makepath=True 会自动创建不存在的父节点
# 2. 创建一个临时节点
# 客户端会话结束时,节点会被自动删除
zk.create('/my_zk/ephemeral_node', b'ephemeral_value', ephemeral=True)
# 3. 创建一个顺序持久化节点
# Zookeeper 会在节点名后附加一个 10 位数的递增序号
# 创建 '/my_zk/seq_node',实际可能创建出 '/my_zk/seq_node0000000001'
zk.create('/my_zk/seq_node', b'sequential_value', sequence=True)
print("节点创建成功")

获取节点数据

# 获取节点数据和节点状态信息
data, stat = zk.get('/my_zk/node1')
print(f"节点数据: {data.decode('utf-8')}")
print(f"节点状态: {stat}") # stat 包含版本号、创建时间等信息

更新节点数据

# 更新节点数据
# version 参数用于乐观锁,防止并发更新导致数据覆盖
# 如果提供的 version 与服务器上的 version 不匹配,更新会失败
# 可以先获取 stat 对象,然后使用其 version 属性
try:
    zk.set('/my_zk/node1', b'new_value1', version=stat.version)
    print("节点数据更新成功")
except Exception as e:
    print(f"更新失败: {e}")

删除节点

# 删除节点
# 同样需要提供正确的 version
try:
    zk.delete('/my_zk/node1', version=stat.version)
    print("节点删除成功")
except Exception as e:
    print(f"删除失败: {e}")

2 子节点操作

获取子节点列表

# 获取指定路径下的直接子节点列表
children = zk.get_children('/my_zk')
print(f"子节点列表: {children}")

高级特性:Watcher (事件监听)

这是 Zookeeper 最强大的功能之一。kazoo 提供了两种监听方式:

  • 一次性监听 (One-time Watchers): 触发一次后自动失效。
  • 持久监听 (Persistent Watchers): 触发后不会失效,需要手动取消。

1 一次性监听

最常见的用法是在 getget_children 时注册监听器。

# 定义一个回调函数,当事件发生时会被调用
def my_watch_handler(event):
    """
    event 对象包含:
    - event.type: 事件类型,如 'NODE_DATA_CHANGED', 'NODE_DELETED', 'NODE_CHILDREN_CHANGED'
    - event.path: 发生事件的节点路径
    - event.state: 连接状态
    """
    print(f"事件触发! 类型: {event.type}, 路径: {event.path}")
    # 事件触发后,可以在这里重新获取数据或执行其他逻辑
    if event.type == 'NODE_DATA_CHANGED':
        new_data, _ = zk.get('/my_zk/node1', watch=my_watch_handler)
        print(f"重新获取数据: {new_data.decode('utf-8')}")
    elif event.type == 'NODE_DELETED':
        print(f"节点 {event.path} 已被删除")
# 1. 监听节点数据变化
# 在第一次 get 时注册监听器
data, stat = zk.get('/my_zk/node1', watch=my_watch_handler)
print(f"初始数据: {data.decode('utf-8')}")
# 在另一个终端或脚本中修改节点数据
# zk.set('/my_zk/node1', b'value_changed_in_another_process')
# 2. 监听子节点变化
# children = zk.get_children('/my_zk', watch=my_watch_handler)
# print(f"初始子节点列表: {children}")
# 在另一个终端或脚本中创建或删除子节点
# zk.create('/my_zk/new_child', b'new_child_data')

注意: kazoowatch 是一次性的,如果你想在事件发生后继续监听,必须在回调函数中再次调用 getget_children 并传入 watch 参数,如上面的 my_watch_handler 函数所示。

2 持久监听

# 持久监听器
def persistent_watch_handler(event):
    print(f"持久监听器触发! 类型: {event.type}, 路径: {event.path}")
# 创建一个持久监听
zk.get('/my_zk/node1', watch=persistent_watch_handler)
# 手动取消持久监听
# zk.stop_watch()
# 修改数据,监听器会持续触发
# zk.set('/my_zk/node1', b'value_for_persistent_watch')

完整示例:分布式锁

Zookeeper 经典的应用场景之一是实现分布式锁,下面是一个使用 kazooLock 对象实现分布式锁的简单示例。

from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, NodeExistsError
from kazoo.recipe.lock import Lock
import time
import threading
# Zookeeper 地址
ZK_HOST = '127.0.0.1:2181'
LOCK_PATH = '/my_distributed_lock'
def worker(worker_id):
    zk = KazooClient(hosts=ZK_HOST)
    zk.start()
    print(f"Worker {worker_id} 启动并尝试获取锁...")
    # 创建一个锁对象
    lock = Lock(zk, LOCK_PATH)
    try:
        # 尝试获取锁,阻塞直到获取成功
        # with lock 语句会在获取锁后执行代码块,并在退出时自动释放锁
        with lock:
            print(f"Worker {worker_id} 成功获取锁!开始执行任务...")
            # 模拟任务执行
            time.sleep(5)
            print(f"Worker {worker_id} 任务完成,释放锁。")
    except Exception as e:
        print(f"Worker {worker_id} 获取锁时发生错误: {e}")
    finally:
        zk.stop()
        print(f"Worker {worker_id} 已关闭 Zookeeper 连接。")
# 模拟多个工作线程竞争同一个锁
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()
print("所有工作线程已完成。")

如何运行这个示例:

  1. 启动你的 Zookeeper 服务。
  2. 运行上面的 Python 脚本。
  3. 你会看到,只有一个线程能成功打印 "成功获取锁!",其他线程会阻塞,当第一个线程释放锁后,下一个线程才能获取到锁。

其他库和注意事项

  • python-zookeeper: 这是一个更底层的库,直接是 Zookeeper C 库的 Python 封装,它功能强大但 API 相对复杂,并且需要手动编译安装 C 库,不推荐在新的项目中使用。
  • kazoo vs zookeeper: kazoo 是纯 Python 实现的,对开发者更友好,功能也更高级(如提供了分布式锁、队列等现成的 recipe)。python-zookeeper 是 C 绑定,性能理论上更高,但使用起来更麻烦。

最佳实践:

  1. 总是使用集群地址:在 hosts 参数中提供多个 Zookeeper 节点。
  2. 处理异常:网络不稳定、Zookeeper 宕机等情况都可能抛出异常,代码需要有完善的异常处理机制。
  3. 理解 Watcher 的生命周期:记住一次性 Watcher 的特性,并在需要时重新注册。
  4. 管理连接:程序结束时调用 stop()close() 来释放资源。
  5. 使用 ensure_path:在创建节点前,使用 ensure_path 可以避免因父节点不存在而创建失败,简化代码。

希望这份详细的指南能帮助你顺利地在 Python 中使用 Zookeeper!

分享:
扫描分享到社交APP
上一篇
下一篇