I/O 多路复用
在深入 register 之前,必须理解它背后的核心思想:I/O 多路复用。
想象一下,你是一个餐厅服务员,需要同时服务多个客人,传统的做法是:
- 去客人 A 那里问:“您需要点什么?”(
accept或recv) - 然后傻傻地等 A 回答,期间不能做任何事。
- A 说好了,你再跑去客人 B 那里问同样的问题。
- A 和 B 都没准备好,你就干等着,时间都浪费了。
这种低效的方式就像传统的 阻塞 I/O。
而 I/O 多路复用就像这样:
- 你给所有客人发一张卡片,上面写着:“准备好了就按铃。”
- 你站在一个总控制台前(这就是
Selector对象)。 - 你可以去做其他事,比如整理餐具。
- 突然,铃响了! 控制台显示是客人 A 的铃响了。
- 你就知道:“哦,A 准备好了,我现在可以去服务 A 了。”
selectors 模块就是那个“总控制台”,register 方法就是给客人的“卡片注册”过程,它允许你监控多个文件描述符(网络套接字、文件、管道等),并等待其中任何一个变得“可读”、“可写”或出现“异常”,而不是阻塞在任何一个上。
Selector 对象的 register 方法
register 方法的作用是将一个文件描述符及其感兴趣的事件注册到 Selector 对象中。
方法签名
selector.register(fileobj, events, data=None)
参数详解
-
fileobj(文件对象)- 类型: 任何拥有
fileno()方法的对象,或者是一个整数文件描述符。 - 常见类型:
socket.socket对象 (最常用)file对象 (普通文件)pipe对象- 任何实现了
fileno()方法的自定义对象。
- 作用: 你想要监控的 I/O 对象。
- 类型: 任何拥有
-
events(事件)- 类型: 一个或多个预定义的事件常量的按位或 组合。
- 常用常量 (定义在
selectors模块中):selectors.EVENT_READ(值为 1): 对象变为可读。- 对于服务器套接字:意味着有新的连接到达 (
accept)。 - 对于客户端套接字:意味着有数据到达 (
recv)。 - 对于管道:意味着另一端有数据写入。
- 对于服务器套接字:意味着有新的连接到达 (
selectors.EVENT_WRITE(值为 4): 对象变为可写。- 对于套接字:意味着可以安全地发送数据 (
send),而不会阻塞。 - 对于文件:通常总是可写的。
- 对于套接字:意味着可以安全地发送数据 (
- 示例:
selectors.EVENT_READ | selectors.EVENT_WRITE: 表示我既关心这个对象何时可读,也关心它何时可写。selectors.EVENT_READ: 只关心何时可读。
-
data(可选数据)- 类型: 任意 Python 对象。
- 作用: 这是一个非常强大的功能,当你通过
select()方法获得“就绪”事件时,SelectorKey对象会包含你当初注册时传入的data,这可以帮助你记住与这个文件描述符相关的上下文信息,比如连接地址、用户数据、缓冲区等,而无需维护一个额外的字典来映射fileobj和数据。
register 的完整工作流程
一个典型的使用 selectors 的程序流程如下:
-
创建一个 Selector 实例:
import selectors sel = selectors.DefaultSelector() # 会自动选择当前最高效的 I/O 多路复用实现 (epoll, kqueue, select)
-
创建并注册监听套接字:
import socket # 创建一个 TCP 服务器套接字 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', 65432)) sock.listen() sock.setblocking(False) # 必须设置为非阻塞模式! # 注册监听套接字,我们只关心它何时可读(即有新连接) sel.register(sock, selectors.EVENT_READ, data=None) -
进入主循环,等待事件:
print("服务器启动,等待连接...") while True: # 阻塞,直到至少一个注册的文件描述符就绪 # 返回一个 (key, events) 列表 events = sel.select() for key, mask in events: # key 是一个 SelectorKey 对象 # mask 是一个位掩码,表示实际发生的事件 (EVENT_READ, EVENT_WRITE 等) process_events(key, mask) -
处理就绪事件:
def process_events(key, mask): # 如果是监听套接字就绪 if key.fileobj is sock: # 接受新连接 conn, addr = sock.accept() print(f"已接受来自 {addr} 的连接") conn.setblocking(False) # 将新连接的套接字也注册到 selector,并附带地址作为 data sel.register(conn, selectors.EVENT_READ, data=addr) # 如果是客户端套接字就绪 else: # 从 key.data 中获取我们注册时附加的数据 conn = key.fileobj addr = key.data try: data = conn.recv(1024) if data: print(f"从 {addr} 收到数据: {data.decode()}") # 告诉 selector,我们现在也关心这个套接字何时可写 sel.modify(conn, selectors.EVENT_READ | selectors.EVENT_WRITE) else: # 连接已关闭 print(f"来自 {addr} 的连接已关闭") sel.unregister(conn) conn.close() except ConnectionResetError: print(f"来自 {addr} 的连接意外断开") sel.unregister(conn) conn.close()
SelectorKey 对象
当你调用 register 后,它会返回一个 SelectorKey 对象,这个对象包含了注册的所有信息,并且在 select() 返回的事件中也会提供它。
SelectorKey 对象有以下属性:
fileobj: 被注册的原始文件对象。fd: 文件对象的文件描述符 (整数)。events: 注册时感兴趣的事件 (EVENT_READ,EVENT_WRITE等)。data: 注册时传入的data参数。
其他相关方法
除了 register,Selector 对象还有几个常用的方法:
-
modify(fileobj, events, data=None)- 修改一个已注册文件描述符的事件或数据。
- 比如一个客户端刚连接时你只关心它是否可读,当它发送数据后,你可能想同时关心它是否可写以便响应,这时就可以用
modify。
-
unregister(fileobj)- 取消注册一个文件描述符。
- 当一个连接关闭时,必须调用此方法,否则 selector 会继续尝试处理一个无效的文件描述符。
-
select(timeout=None)- 核心“等待”方法。
- 阻塞,直到至少一个已注册的文件描述符就绪,或者超时。
- 返回一个
(SelectorKey, int)元组的列表。int是实际发生的事件掩码。
-
close()关闭 selector 对象,释放所有资源。
代码示例:一个简单的 Echo Server
下面是一个完整的、可运行的 echo server 例子,它清晰地展示了 register 的用法。
import selectors
import socket
import types
# 1. 创建 Selector
sel = selectors.DefaultSelector()
# 2. 定义服务函数
def accept_wrapper(sock):
"""接受新连接并注册到 selector"""
conn, addr = sock.accept() # 阻塞操作,但 sock 是监听套接字,很快就会就绪
print(f"已接受来自 {addr} 的连接")
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
# 注册新连接,我们关心它是否可读
sel.register(conn, selectors.EVENT_READ, data=data)
def service_connection(key, mask):
"""处理已就绪的连接"""
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
# 接收数据
recv_data = sock.recv(1024) # 阻塞操作,但 sock 已就绪
if recv_data:
data.outb += recv_data
print(f"从 {data.addr} 收到数据: {recv_data.decode()}")
else:
# 对方关闭了连接
print(f"来自 {data.addr} 的连接已关闭")
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
# 发送数据
if data.outb:
print(f"向 {data.addr} 发送数据: {data.outb.decode()}")
sent = sock.send(data.outb) # 阻塞操作,但 sock 已就绪
data.outb = data.outb[sent:]
# 3. 启动服务器
host = '127.0.0.1'
port = 65432
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
lsock.setblocking(False) # 设置为非阻塞
print(f"服务器启动在 {host}:{port}")
# 注册监听套接字
sel.register(lsock, selectors.EVENT_READ, data=None)
# 4. 主事件循环
try:
while True:
events = sel.select() # 阻塞,等待事件
for key, mask in events:
if key.data is None:
# data 是 None,说明是监听套接字就绪了
accept_wrapper(key.fileobj)
else:
# 否则是客户端套接字就绪了
service_connection(key, mask)
except KeyboardInterrupt:
print("服务器关闭。")
finally:
sel.close()
lsock.close()
如何测试这个服务器:
- 运行上面的 Python 脚本。
- 打开另一个终端,使用
telnet或nc(netcat) 连接:telnet 127.0.0.1 65432。 - 在
telnet界面输入任何文本,然后按回车,你会在服务器终端看到接收和发送的消息,同时在telnet界面看到你输入的内容被“回显”了。
sel.register(sock, EVENT_READ, data=addr)是selectors模块的基石。- 它将一个 I/O 对象(如
socket)和一组你关心的事件(EVENT_READ/EVENT_WRITE)告诉Selector。 data参数是可选但极其有用的,它允许你将上下文信息与文件描述符关联起来。register之后,Selector就会在其select()循环中监控这个对象,并在它就绪时通知你的程序。- 与
selectors配合使用的文件描述符必须是非阻塞的,否则select()的优势就失去了。
