杰瑞科技汇

python twisted 多线程

不要在 Twisted 的事件循环中运行阻塞代码

必须理解 Twisted 的基本工作原理:它在一个单线程中通过一个事件循环来高效地处理成千上万的并发连接,如果你在这个事件循环线程中执行任何会阻塞的操作(比如进行同步的 I/O、计算密集型任务、调用某些阻塞的第三方库),整个事件循环就会被“卡住”,导致所有其他连接都无法得到处理,服务器就失去了响应能力。

Twisted 的多线程策略不是用多线程来“分担”网络 I/O,而是用多线程来“隔离”和“执行”那些会阻塞事件循环的任务。

主要工具:twisted.internet.threads.deferToThread

Twisted 提供了最核心的工具 deferToThread 来解决这个问题,它的作用是:

  1. 将一个耗时的、可能会阻塞的函数提交到一个线程池(默认是 ThreadPool)中去执行。
  2. 立即返回一个 Deferred 对象,这样事件循环就可以继续处理其他任务,不会被阻塞。
  3. 当阻塞函数执行完毕后,deferToThread 会自动将结果(或异常)传递回原始的事件循环线程中,然后触发 Deferred 的回调链。

deferToThread 的签名

from twisted.internet import threads
def deferToThread(f, *args, **kwargs):
    """
    将函数 f 在一个单独的线程中执行,并返回一个 Deferred 对象。
    """
    # ...
  • f: 你想要执行的、可能会阻塞的函数。
  • *args, **kwargs: 传递给函数 f 的参数。

完整示例:一个会阻塞的 Web 服务器

让我们通过一个完整的例子来理解,假设我们有一个计算斐波那契数列的函数,这是一个非常典型的计算密集型、会阻塞的任务。

如果我们直接在 Twisted 的请求处理函数中调用它,整个服务器就会被冻结。

错误的用法(会阻塞事件循环)

# bad_server.py
from twisted.web.server import Site
from twisted.web.resource import Resource
from twisted.internet import reactor
# 一个非常耗时的计算函数
def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)
class BlockingResource(Resource):
    isLeaf = True
    def render_GET(self, request):
        try:
            # 直接调用,会阻塞事件循环!
            n = int(request.args.get(b'n', [35])[0])
            result = fibonacci(n)
            request.write(f"Result: {result}".encode('utf-8'))
            request.finish()
        except Exception as e:
            request.setResponseCode(500)
            request.write(f"Error: {e}".encode('utf-8'))
            request.finish()
if __name__ == '__main__':
    site = Site(BlockingResource())
    reactor.listenTCP(8080, site)
    print("Server running on http://localhost:8080")
    reactor.run()

运行这个错误版本,然后访问 http://localhost:8080/?n=35,你会发现服务器在计算期间完全无法响应任何新的请求。


正确的用法(使用 deferToThread

我们使用 deferToThread 来修复这个问题。

# good_server.py
from twisted.web.server import Site, NOT_DONE_YET
from twisted.web.resource import Resource
from twisted.internet import reactor, threads
from twisted.internet.defer import inlineCallbacks
# 同样的耗时计算函数
def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)
class GoodResource(Resource):
    isLeaf = True
    def render_GET(self, request):
        try:
            n = int(request.args.get(b'n', [35])[0])
            # 使用 deferToThread 将任务提交到线程池
            d = threads.deferToThread(fibonacci, n)
            # 为 Deferred 添加回调函数
            d.addCallback(self._write_result, request)
            # 如果发生错误,添加错误回调
            d.addErrback(self._handle_error, request)
            # 返回 NOT_DONE_YET 告诉 Twisted 我们稍后手动完成请求
            return NOT_DONE_YET
        except Exception as e:
            self._handle_error(e, request)
            return NOT_DONE_YET
    def _write_result(self, result, request):
        """在线程池中执行完毕后,这个回调会在主事件循环线程中被调用"""
        request.write(f"Result: {result}".encode('utf-8'))
        request.finish()
    def _handle_error(self, failure, request):
        """发生错误时调用"""
        request.setResponseCode(500)
        request.write(f"Error: {failure.getErrorMessage()}".encode('utf-8'))
        request.finish()
if __name__ == '__main__':
    site = Site(GoodResource())
    reactor.listenTCP(8081, site)
    print("Server running on http://localhost:8081")
    reactor.run()

分析这个正确的版本:

  1. render_GET:

    • 它不再直接调用 fibonacci
    • 它调用 threads.deferToThread(fibonacci, n),这个调用会立即返回一个 Deferred 对象 d,而 fibonacci 的计算任务已经被丢给后台的线程池去执行了,事件循环完全不会被阻塞。
    • 我们返回 twisted.web.server.NOT_DONE_YET,这是一个特殊的值,告诉 Twisted:“这个请求我正在处理,但还没完成,我稍后会手动调用 request.finish()”。
  2. d.addCallback(self._write_result, request):

    • 这是最关键的一步,我们告诉 Deferred:“当 fibonacci 计算完成并返回结果时,请调用 _write_result 这个方法”。
    • _write_result 的第一个参数 result fibonacci 的返回值,第二个参数 request 是我们通过 addCallback 传递进来的。
  3. _write_result(self, result, request):

    • 这个函数是在主事件循环线程中执行的,这是 deferToThread 的魔力所在:它会自动将结果从工作线程“带回来”到主线程。
    • 因为我们在主线程,所以可以安全地操作 request 对象,向客户端写入响应并调用 request.finish() 来结束请求。
  4. d.addErrback(self._handle_error, request):

    • 这是一个良好的实践。fibonacci 函数抛出异常,deferToThread 会捕获它,并让 Deferred 触发一个错误。addErrback 会捕获这个错误,并调用 _handle_error 来返回一个 500 错误给客户端。

inlineCallbacksyield 的优雅写法

对于习惯协程风格的开发者,Twisted 也提供了 @inlineCallbacks 装饰器和 yield 关键字,可以让代码看起来更同步、更线性。

# good_server_inlinecallbacks.py
from twisted.web.server import Site, NOT_DONE_YET
from twisted.web.resource import Resource
from twisted.internet import reactor, threads
from twisted.internet.defer import inlineCallbacks, returnValue
def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)
class InlineCallbackResource(Resource):
    isLeaf = True
    @inlineCallbacks
    def render_GET(self, request):
        try:
            n = int(request.args.get(b'n', [35])[0])
            # yield 一个 Deferred 对象,函数会在这里暂停,直到 Deferred 触发
            # deferToThread 返回的 Deferred 会被 yield 捕获
            result = yield threads.deferToThread(fibonacci, n)
            # 当 deferToThread 完成后,代码会从这里继续执行
            request.write(f"Result: {result}".encode('utf-8'))
            request.finish()
        except Exception as e:
            request.setResponseCode(500)
            request.write(f"Error: {e}".encode('utf-8'))
            request.finish()
# ... reactor.run() 部分与上面相同 ...

@inlineCallbacks 的工作原理:

  • 它会将你的函数转换成一个生成器。
  • 当函数 yield 一个 Deferred 对象时,函数会暂停执行。
  • Deferred 被触发(回调或错误回调)时,@inlineCallbacks 会恢复生成器的执行,并将 Deferred 的结果(或异常)作为 yield 表达式的值返回。
  • 这种写法极大地简化了处理异步流程的代码,避免了回调地狱。

线程池管理

deferToThread 默认使用一个全局的线程池,对于大多数应用来说这已经足够了,但如果你有特殊需求,比如需要为不同类型的任务创建不同大小的线程池,你可以手动管理线程池。

from twisted.internet import reactor, threads
from twisted.python.threadpool import ThreadPool
# 创建一个自定义的线程池
# 参数: 最小线程数, 最大线程数
my_thread_pool = ThreadPool(minthreads=2, maxthreads=10)
my_thread_pool.start() # 启动线程池
# 将线程池“安装”到 reactor 上
reactor.addSystemEventTrigger('before', 'shutdown', my_thread_pool.stop)
# 使用自定义线程池
def my_blocking_task(data):
    # ... 耗时操作 ...
    return processed_data
def handle_request(request):
    d = threads.deferToThread(my_blocking_task, "some data", threadPool=my_thread_pool)
    d.addCallback(lambda result: request.write(result.encode('utf-8')))
    d.addBoth(lambda _: request.finish())
    return NOT_DONE_YET

何时需要自定义线程池?

  • 当你的应用中有多种不同类型的阻塞任务,并且你想为它们设置不同的并发限制时。
  • 当你需要精确控制后台线程的生命周期和资源使用时。
场景 解决方案 核心思想
在 Twisted 中执行阻塞代码 deferToThread 将阻塞任务放到后台线程池执行,立即返回 Deferred,让事件循环不被阻塞。
简化异步代码逻辑 @inlineCallbacksyield 使用生成器语法,让异步代码看起来像同步代码,提高可读性。
精细化管理线程资源 自定义 ThreadPool 当需要为不同任务隔离或限制并发度时,手动创建和管理线程池。
与线程安全的数据结构交互 callInThread 如果只是想在后台线程中执行一些代码(不关心返回值),可以使用 reactor.callInThread,但对于大多数需要返回值的场景,deferToThread 是更好的选择。

关键原则回顾:

  1. 事件循环是神圣的:永远不要在事件循环线程中做任何会阻塞它的事情。
  2. 线程是工具,不是目的:在 Twisted 中使用多线程的目的是为了处理阻塞,而不是为了提高网络 I/O 的并发度。
  3. Deferred 是桥梁deferToThread@inlineCallbacks 是连接阻塞世界和非阻塞世界的桥梁,确保结果能安全地回到主事件循环。
分享:
扫描分享到社交APP
上一篇
下一篇