杰瑞科技汇

Twisted异步如何实现高效并发?

这是一个非常经典且强大的异步框架,理解它对于掌握 Python 异步编程的演进历史和核心思想至关重要。

Twisted异步如何实现高效并发?-图1
(图片来源网络,侵删)

目录

  1. 什么是 Twisted?
  2. 核心思想:事件循环与回调
  3. Twisted 的核心组件
    • reactor (反应堆)
    • ProtocolFactory (协议与工厂)
    • Deferred (延迟对象)
  4. 一个简单的 Twisted 服务器示例
  5. 深入 Deferred
    • 为什么需要 Deferred?
    • Deferred 的链式调用
    • 错误处理
  6. Twisted vs. asyncio

什么是 Twisted?

Twisted 是一个用 Python 编写的事件驱动网络编程框架,它诞生于 2000 年左右,比 Python 官方推出的 asyncio 框架早了十多年。

它的主要目标是让你能够高效地编写同时处理成千上万个网络连接的应用程序,而无需为每个连接创建一个操作系统线程(这会消耗大量资源),它通过一个单线程的事件循环机制来处理所有并发操作。

核心特点:

  • 事件驱动: 程序的执行流程由外部事件(如网络数据到达、定时器超时)驱动,而不是由代码的线性顺序驱动。
  • 非阻塞 I/O: 当需要进行 I/O 操作(如读写网络、文件)时,Twisted 不会等待操作完成,而是立即返回,当 I/O 操作就绪时,事件循环会触发一个回调函数来处理结果。
  • 基于回调: 这是 Twisted 最经典的编程模式,你将“接下来要做什么”(一个函数)作为参数传递给进行 I/O 操作的函数,当 I/O 完成时,这个函数就会被调用。

核心思想:事件循环与回调

想象一下一个繁忙的餐厅前台(这就是事件循环 reactor)。

Twisted异步如何实现高效并发?-图2
(图片来源网络,侵删)
  • 阻塞式(糟糕的方式): 前台服务员(线程)接到一个客人点餐的电话,他拿起电话,一直等待客人说完、报完地址、挂掉电话,然后他才能去处理下一个电话,如果这个电话打了很久,其他所有打电话的客人都在等待。
  • 非阻塞式(Twisted 的方式): 前台服务员接到电话,快速记下客人的号码和需求(发起 I/O 操作),然后告诉客人:“请稍等,我记下了,一会打给您。”(注册一个回调),他立刻挂断这个电话,去接听下一个电话,当厨房准备好客人的餐点(I/O 操作完成)时,有一个提示灯会亮起(事件触发),服务员看到灯亮,就拿起之前记下的号码,回电给客人(执行回调)。

在这个比喻中:

  • 服务员 = Twisted 的事件循环 (reactor)
  • 接电话 = 接受网络连接
  • 记下号码和需求 = 发起一个非阻塞的 I/O 操作(如 reactor.callWhenRunningtransport.write
  • 一会打给您 = 注册一个回调函数
  • 提示灯亮 = I/O 操作完成的事件
  • 回电客人 = 执行之前注册的回调函数

Twisted 的核心组件

a. reactor (反应堆)

reactor 是 Twisted 的心脏,也就是事件循环,一旦启动,reactor 就会进入一个无限循环,不断地监听各种事件(如新的网络连接、数据到达、定时器等),并执行对应的回调函数。

启动 reactor 是一个程序的终点,因为 reactor.run() 会阻塞当前线程,直到 reactor.stop() 被调用。

from twisted.internet import reactor
def print_and_quit():
    print("Hello from the reactor!")
    reactor.stop() # 停止事件循环
# 告诉reactor,在它启动后执行 print_and_quit 函数
reactor.callWhenRunning(print_and_quit)
print("Starting the reactor...")
reactor.run() # 程序会在这里阻塞,直到 reactor.stop() 被调用
print("Reactor stopped.")

b. ProtocolFactory

这是 Twisted 用于处理网络通信的核心设计模式,类似于 GoF 的工厂模式策略模式

  • Protocol (协议):

    • 定义了如何与一个具体的网络连接进行交互。
    • 它是一个类,当你与一个客户端建立连接或收到数据时,reactor 会自动创建一个 Protocol 实例,并调用其特定方法。
    • 关键方法:
      • connectionMade(): 连接成功建立时调用。
      • dataReceived(data): 收到数据时调用。
      • connectionLost(reason): 连接断开时调用。
  • Factory (工厂):

    • 负责创建 Protocol 实例。
    • 当有新的连接进入时,reactor 会调用 FactorybuildProtocol() 方法来创建一个新的 Protocol 对象。
    • 这使得你可以管理共享的资源(如数据库连接池),并将它们传递给每个新创建的 Protocol 实例。

c. Deferred (延迟对象)

Deferred 是 Twisted 中最强大也最复杂的部分,它解决了深度嵌套回调(也称为“回调地狱”)的问题。

一个 Deferred 对象代表了一个尚未完成的操作,你可以把它想象成一个未来的结果的占位符,你可以为这个未来的结果注册成功时的回调和失败时的回调(errback)。


一个简单的 Twisted 服务器示例

下面是一个使用 Twisted 编写的简单回显服务器,它接收客户端发来的任何数据,然后原样发回。

# -*- coding: utf-8 -*-
from twisted.internet.protocol import Protocol, Factory
from twisted.internet import reactor
# 1. 定义 Protocol
class EchoProtocol(Protocol):
    """
    每当一个新客户端连接时,reactor 会创建一个 EchoProtocol 实例。
    """
    def connectionMade(self):
        """连接建立时调用"""
        client_ip = self.transport.getPeer().host
        print(f"客户端 {client_ip} 已连接。")
        self.transport.write(b"欢迎来到回显服务器!\n")
    def dataReceived(self, data):
        """当收到数据时调用"""
        # data 是字节流,我们解码并打印
        message = data.decode('utf-8').strip()
        print(f"收到消息: '{message}'")
        # 将收到的数据原样发回
        self.transport.write(data)
    def connectionLost(self, reason):
        """连接断开时调用"""
        client_ip = self.transport.getPeer().host
        print(f"客户端 {client_ip} 已断开,连接原因: {reason.getErrorMessage()}")
# 2. 定义 Factory
class EchoFactory(Factory):
    """
    当有新的连接请求时,reactor 会调用这个 Factory 的 buildProtocol 方法。
    """
    def buildProtocol(self, addr):
        """
        addr 是客户端的地址信息。
        这个方法必须返回一个 Protocol 实例。
        """
        print(f"为 {addr.host} 建立新的协议实例。")
        return EchoProtocol()
# 3. 启动服务器
if __name__ == '__main__':
    # 在 8888 端口上监听,使用 TCP 协议
    # reactor 会监听这个端口,当有连接进来时,就会用 EchoFactory 来处理
    port = 8888
    print(f"服务器启动,正在监听端口 {port}...")
    reactor.listenTCP(port, EchoFactory())
    # 启动 reactor,程序会在这里运行
    reactor.run()
    print("服务器已停止。")

如何运行:

  1. 保存代码为 echo_server.py
  2. 在终端运行:python echo_server.py
  3. 打开另一个终端,使用 telnet 连接:telnet 127.0.0.1 8888
  4. telnet 窗口中输入任何文本,服务器会将其回显,并且服务器的终端会打印连接和消息信息。

深入 Deferred

为什么需要 Deferred?

假设你需要执行一系列有依赖关系的异步操作:

  1. 从数据库获取用户信息(异步)。
  2. 获取信息后,根据用户 ID 获取其订单列表(异步)。
  3. 获取订单列表后,计算总金额(同步)。

不使用 Deferred (回调地狱):

def get_user_info_cb(user_id):
    db_pool.runQuery("SELECT * FROM users WHERE id=?", (user_id,)).addCallback(
        lambda user_info: get_orders_cb(user_info[0]['id'])
    ).addCallback(
        lambda orders: calculate_total_cb(orders)
    ).addErrback(
        handle_error
    )
def get_orders_cb(user_id):
    db_pool.runQuery("SELECT * FROM orders WHERE user_id=?", (user_id,)).addCallback(
        lambda orders: process_orders_cb(orders)
    )
# ... 代码嵌套越来越深,难以维护

使用 Deferred (链式调用):

Deferred 允许你将这一系列操作优雅地链接起来。

from twisted.internet import defer
def get_user_info(user_id):
    # 这是一个异步操作,它返回一个 Deferred 对象
    # 模拟从数据库获取用户信息
    d = defer.Deferred()
    # 在实际应用中,当数据库查询完成时,你会调用 d.callback(result) 或 d.errback(failure)
    # 这里我们模拟 2 秒后完成
    reactor.callLater(2, d.callback, {"id": user_id, "name": "Alice"})
    return d
def get_orders(user):
    # 同样,这是一个返回 Deferred 的异步操作
    d = defer.Deferred()
    reactor.callLater(1, d.callback, [{"id": 1, "amount": 100}, {"id": 2, "amount": 50}])
    print(f"正在获取用户 {user['name']} 的订单...")
    return d
def calculate_total(orders):
    # 这是一个同步操作,但它可以返回一个值,这个值会自动传递给下一个回调
    total = sum(order['amount'] for order in orders)
    print(f"订单总金额计算完成: {total}")
    return total
def handle_error(failure):
    """错误处理函数"""
    print(f"发生错误: {failure.getTraceback()}")
    # 可以选择继续将错误传递下去,或者在这里处理并返回一个默认值
    return 0 # 返回一个默认值,让链条可以继续
def final_result(result):
    """最终的回调函数"""
    print(f"最终得到的结果是: {result}")
    reactor.stop()
# --- 主程序 ---
if __name__ == '__main__':
    user_id = 123
    d = get_user_info(user_id)
    # 链式调用
    # .addCallback 会将前一个回调的返回值作为参数传递给下一个回调
    d.addCallback(get_orders)           # get_orders 的参数是 get_user_info 的结果
    d.addCallback(calculate_total)      # calculate_total 的参数是 get_orders 的结果
    d.addCallback(final_result)         # final_result 的参数是 calculate_total 的结果
    # .addErrback 用于处理链条中任何地方发生的错误
    # 它会跳过所有正常的回调,直接执行第一个匹配的 errback
    d.addErrback(handle_error)
    print("开始执行异步操作链...")
    reactor.run()

运行这段代码,你会看到输出清晰地展示了操作链的执行过程,并且错误处理也得到了优雅地管理。


Twisted vs. asyncio

这是现代 Python 异步编程中一个重要的话题。

特性 Twisted asyncio (Python 3.4+)
起源 2002年,社区驱动 2025年,官方标准库
核心模型 基于回调 基于 async/await 语法糖
I/O 多路复用 自定义,跨平台(epoll, kqueue 使用操作系统的原生 API (epoll, kqueue, IOCP)
协议支持 极其丰富,内置了大量协议的实现(HTTP, SSH, SMTP, DNS 等) 核心库较简洁,但可通过 aiohttp 等第三方库扩展
学习曲线 陡峭,需要理解 DeferredProtocolFactory 等概念 平缓async/await 语法更接近同步代码,易于上手
生态系统 成熟稳定,尤其在特定领域(如游戏服务器、IM) 极其活跃,已成为 Python 异步编程的事实标准
适用场景 维护旧项目、需要 Twisted 特定协议、对性能有极致要求的网络服务 新项目、Web 服务、网络爬虫、绝大多数异步 I/O 场景
  • asyncio 是现代 Python 异步编程的首选。 它语法更友好,生态更繁荣,是官方推荐的方向。
  • Twisted 依然强大且有价值。 对于一些需要处理大量并发连接且需要使用 Twisted 内置的复杂协议(如 SSH, XMPP)的场景,它依然是不可替代的选择,理解 Twisted 的思想有助于你更深入地理解事件驱动编程。

Twisted 是 Python 异步编程历史上的一个里程碑,它通过事件循环基于回调的模式,为 Python 提供了强大而成熟的网络编程框架。

  • 核心是 reactor,它驱动着整个事件系统。
  • ProtocolFactory 是处理网络通信的基石,将连接逻辑和协议细节解耦。
  • Deferred 是解决异步操作链和错误处理的利器,避免了回调地狱。

虽然现在 asyncio 更为流行,但学习 Twisted 能够让你深刻理解异步编程的底层原理,并且在你遇到需要处理高并发、特定复杂协议的场景时,它仍然是一个非常有价值的工具。

分享:
扫描分享到社交APP
上一篇
下一篇