杰瑞科技汇

Python asyncio教程,如何快速上手异步编程?

Python asyncio 完整教程

asyncio 是 Python 3.4+ 中用于编写并发代码的标准库,它使用一种称为 async/await 的新语法,使得编写高并发、高 I/O 密集型的应用程序变得异常简单。

目录

  1. 为什么需要 asyncio - 解决什么问题
  2. 核心概念:事件循环、协程、任务
  3. async/await 语法详解
  4. asyncio 的基本使用
    • 运行一个协程
    • 并发运行多个协程
    • 创建和管理任务
  5. asyncio 常用对象
    • asyncio.sleep(): 模拟 I/O 操作
    • asyncio.gather(): 并发执行并收集结果
    • asyncio.Queue: 异步队列
    • asyncio.Lock: 异步锁
  6. 与外部世界交互:asyncioaiohttp
  7. 最佳实践与常见陷阱

为什么需要 asyncio?- 解决什么问题

想象一下,你的程序需要执行以下任务:

  • 从网站 A 下载一个大文件。
  • 从网站 B 获取一些用户数据。
  • 向数据库写入一条记录。

传统的同步方式会是这样:

  1. 开始下载文件 A。
  2. 阻塞:程序卡住,等待下载完成,在此期间,CPU 是空闲的,你无法做任何其他事情。
  3. 下载完成后,开始获取数据 B。
  4. 阻塞:程序再次卡住,等待网络响应。
  5. 获取完成后,开始写入数据库。
  6. 阻塞:等待数据库操作完成。

整个过程非常耗时,总时间是 T_A + T_B + T_DB

而使用 asyncio 的异步方式会是这样的:

  1. 启动下载任务 A,并告诉它:“下载好了就通知我。”
  2. 立即启动获取数据任务 B,同样告诉它:“获取好了就通知我。”
  3. 立即启动写入数据库任务
  4. 主程序进入“事件循环”(Event Loop),不断地检查:“任务 A 好了吗?任务 B 好了吗?数据库操作好了吗?”
  5. 假设任务 B 最先完成,事件循环会处理它的结果。
  6. 然后任务 A 完成,事件循环处理它的结果。
  7. 最后数据库操作完成。

总时间接近于 max(T_A, T_B, T_DB),因为多个 I/O 操作是并发执行的。

asyncio 的核心优势:

  • 高并发:特别适合处理大量 I/O 密集型任务(如网络请求、文件读写、数据库操作)。
  • 高效率:在等待 I/O 操作时,CPU 不会空闲,可以去执行其他任务,极大地利用了资源。
  • 单线程模型:避免了多线程编程中的锁、死锁、线程切换等复杂问题。

注意asyncio 不适合 CPU 密集型任务(如大量数学计算、图像处理),对于 CPU 密集型任务,应该使用多进程(multiprocessing)。


核心概念

在深入代码之前,必须理解三个核心概念:

a. 事件循环

事件循环是 asyncio心脏,你可以把它看作是一个无限循环的调度器,负责运行、暂停和恢复协程,它不断地检查哪些协程已经准备好被执行(即 I/O 操作已完成),然后调度它们继续运行。

在 Python 3.7+ 中,获取和启动事件循环变得非常简单。

b. 协程

协程是 asyncio 中的基本执行单元,它是一个使用 async def 关键字定义的函数,协程函数在被调用时不会立即执行,而是返回一个协程对象

协程可以暂停执行(使用 await),并在将来某个点恢复执行,这个暂停和恢复的过程不会阻塞事件循环。

c. 任务

任务是对协程的进一步封装,它将协程对象包装成一个可以在事件循环中调度的“,当你创建一个任务时,事件循环会在某个时刻运行这个协程,并可以在它暂停时去运行其他任务。

任务让多个协程能够真正地并发执行。


async/await 语法详解

这是 asyncio 的灵魂。

async def

用于定义一个协程函数。

import asyncio
async def say_hello():
    print("Hello")
    # 暂停这个协程,让其他协程有机会运行
    await asyncio.sleep(1)
    print("World")
# 调用 say_hello() 不会立即打印 "Hello"
# 它返回的是一个协程对象
coro = say_hello()
print(coro)
# 输出: <coroutine object say_hello at 0x...>

await

用于在一个协程中暂停执行,并等待另一个“可等待”对象(通常是另一个协程或任务)完成。

关键点

  • await 只能在 async def 函数内部使用。
  • 当执行到 await 时,当前协程会暂停,并将控制权交还给事件循环,事件循环会利用这个时间去执行其他已经准备好的任务。
  • 当被 await 的对象完成后,当前协程会从暂停的地方继续执行。

asyncio 的基本使用

a. 运行一个协程

要运行一个协程,你需要将它提交给事件循环,在 Python 3.7+ 中,asyncio.run() 是最简单、最推荐的方式,它会创建一个新的事件循环,运行你传入的协程,直到完成,然后关闭事件循环。

import asyncio
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
async def main():
    print("Hello")
    # say_after 是一个协程,await 会等待它执行完毕
    await say_after(1, "World")
    print("Finished")
# asyncio.run 会启动事件循环并运行 main() 协程
asyncio.run(main())

输出:

Hello
(等待 1 秒后)
World
Finished

b. 并发运行多个协程

这是 asyncio 的威力所在,我们不再需要一个接一个地 await 协程,而是可以创建多个任务,让它们并发执行。

asyncio.create_task() 可以将一个协程包装成一个任务,并“安排”它在事件循环中尽快运行。

import asyncio
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
async def main():
    print("Start")
    # 创建两个任务,但它们不会立即开始运行
    # 事件循环会在稍后调度它们
    task1 = asyncio.create_task(say_after(1, "Hello"))
    task2 = asyncio.create_task(say_after(2, "World"))
    # await 会等待所有任务完成
    # 在等待期间,事件循环会运行其他任务
    await task1
    await task2
    print("End")
asyncio.run(main())

输出:

Start
(等待 1 秒后)
Hello
(再等待 1 秒后)
World
End

注意:总耗时约 2 秒,而不是 3 秒。say_after(1, ...)say_after(2, ...) 是并发执行的。

c. asyncio.gather() - 更优雅的并发

asyncio.gather(*coroutines_or_futures) 是一个更强大、更常用的函数,用于并发地运行多个可等待对象,并收集它们的返回值。

import asyncio
async def count(name, delay):
    print(f"Count {name} started")
    await asyncio.sleep(delay)
    print(f"Count {name} finished")
    return delay # 返回延迟时间
async def main():
    # 创建多个协程对象
    coro1 = count("A", 1)
    coro2 = count("B", 2)
    coro3 = count("C", 1)
    # gather 会并发运行它们,并等待全部完成
    # results 是一个列表,包含了各个协程的返回值,顺序与输入一致
    results = await asyncio.gather(coro1, coro2, coro3)
    print(f"All results: {results}")
asyncio.run(main())

输出:

Count A started
Count C started
(等待 1 秒后)
Count A finished
Count C finished
(再等待 1 秒后)
Count B finished
All results: [1, 2, 1]

gather 不仅让代码更整洁,还能自动处理结果的收集,非常方便。


asyncio 常用对象

a. asyncio.sleep()

这是 asyncio 中最常用的工具,用于模拟一个 I/O 操作,它会让当前协程暂停指定的时间,而不会阻塞整个事件循环。

b. asyncio.Queue

异步队列,用于在线程或任务之间安全地传递数据,它的 get()put() 方法都是异步的。

import asyncio
async def producer(queue):
    for i in range(5):
        await asyncio.sleep(0.5)
        print(f"Producer putting item {i}")
        await queue.put(i)
    await queue.put(None) # 发送信号,表示生产完成
async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break # 收到结束信号
        print(f"Consumer got item {item}")
        # 模拟处理时间
        await asyncio.sleep(1)
        queue.task_done() # 标记任务完成
async def main():
    q = asyncio.Queue()
    # 创建生产者和消费者任务
    p_task = asyncio.create_task(producer(q))
    c_task = asyncio.create_task(consumer(q))
    # 等待生产者完成
    await p_task
    # 等待队列被清空
    await q.join()
    # 取消消费者任务
    c_task.cancel()
    await c_task
asyncio.run(main())

c. asyncio.Lock

异步锁,用于保护共享资源,防止多个任务同时修改它,造成数据竞争。

import asyncio
counter = 0
lock = asyncio.Lock()
async def safe_counter():
    global counter
    async with lock: # 类似于 with lock:,但适用于异步上下文
        print(f"Task {asyncio.current_task().get_name()} is counting")
        current_value = counter
        await asyncio.sleep(0.1) # 模拟耗时操作
        counter = current_value + 1
        print(f"Task {asyncio.current_task().get_name()} set counter to {counter}")
async def main():
    # 创建多个任务,它们会竞争 counter
    tasks = [asyncio.create_task(safe_counter(), name=f"Task-{i}") for i in range(3)]
    await asyncio.gather(*tasks)
asyncio.run(main())
# 输出会是安全的,counter 最终会等于 3

与外部世界交互:asyncioaiohttp

asyncio 本身不包含网络客户端库,你需要使用支持 asyncio 的第三方库。aiohttp 是最流行的异步 HTTP 客户端/服务器库。

安装: pip install aiohttp

示例:并发下载多个网页

import asyncio
import aiohttp
async def fetch_url(session, url):
    try:
        print(f"Fetching {url}")
        async with session.get(url, timeout=10) as response:
            # response.text() 也是一个异步操作
            html = await response.text()
            print(f"Finished fetching {url}, length: {len(html)}")
            return html
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return None
async def main():
    urls = [
        "http://python.org",
        "http://github.com",
        "http://stackoverflow.com",
        "http://non-existent-website.xyz" # 这个会失败
    ]
    # aiohttp.ClientSession 管理连接池,是推荐的用法
    async with aiohttp.ClientSession() as session:
        # 为每个 URL 创建一个任务
        tasks = [fetch_url(session, url) for url in urls]
        # 并发执行所有任务
        htmls = await asyncio.gather(*tasks)
        # 打印结果
        for i, html in enumerate(htmls):
            if html:
                print(f"URL {i+1} fetched successfully.")
asyncio.run(main())

这个例子清晰地展示了 asyncio + aiohttp 在处理多个网络请求时的强大能力。


最佳实践与常见陷阱

最佳实践

  1. 使用 asyncio.run() 作为程序的入口点:除非你有特殊需求,否则在主脚本中,用 asyncio.run(your_main_coroutine()) 来启动。
  2. 避免在协程中使用阻塞 I/O:绝对不要在 async def 函数中使用 time.sleep()requests.get() 等同步阻塞函数,它们会阻塞整个事件循环,让你的异步程序失去所有优势,请使用它们的异步替代品(asyncio.sleep(), aiohttp)。
  3. async with 管理资源:对于像 aiohttp.ClientSession 这样的异步上下文管理器,始终使用 async with 来确保资源被正确释放。
  4. asyncio.gather() 用于独立的并发任务:当你有一组可以独立运行的任务,并且需要它们全部完成时,gather 是最佳选择。
  5. 合理划分任务粒度:任务太小,调度开销会变大;任务太大,并发度会降低,找到平衡点。

常见陷阱

  1. 忘记 await:如果你只是调用了另一个协程函数而没有 await 它,它只会返回一个协程对象,并不会被执行,这是最常见的错误。
    # 错误示范
    async def foo(): print("foo")
    async def bar():
        foo() # 这不会打印 "foo"
        await foo() # 这样才会
  2. 在同步代码中混用 asyncio.run()asyncio.run() 会创建一个新的事件循环并在运行后关闭它,如果你在已经运行着事件循环的线程中调用它(在另一个异步函数中),会抛出 RuntimeError,在这种情况下,应该使用 asyncio.get_event_loop().run_until_complete()
  3. 过度使用 asyncio:如果你的应用主要是 CPU 密集型的,或者任务数量很少,使用 asyncio 可能比简单的同步代码更复杂,且性能提升不明显,选择正确的工具。

asyncio 是 Python 中处理高并发 I/O 操作的利器,通过掌握 async/await 语法、事件循环、协程和任务等核心概念,你可以编写出简洁、高效且易于维护的并发代码。

学习路径建议:

  1. async defawait 开始,理解协程的暂停和恢复机制。
  2. 使用 asyncio.run() 运行简单的协程。
  3. 学习使用 asyncio.create_task()asyncio.gather() 实现并发。
  4. 尝试使用 asyncio.sleep() 模拟真实场景。
  5. 学习使用 aiohttp 进行异步网络请求,这是 asyncio 最经典的应用场景。
  6. 探索更高级的对象,如 Queue, Lock, Semaphore 等。

希望这份教程能帮助你顺利入门 asyncio

分享:
扫描分享到社交APP
上一篇
下一篇