Python asyncio 完整教程
asyncio 是 Python 3.4+ 中用于编写并发代码的标准库,它使用一种称为 async/await 的新语法,使得编写高并发、高 I/O 密集型的应用程序变得异常简单。
目录
- 为什么需要
asyncio? - 解决什么问题 - 核心概念:事件循环、协程、任务
async/await语法详解asyncio的基本使用- 运行一个协程
- 并发运行多个协程
- 创建和管理任务
asyncio常用对象asyncio.sleep(): 模拟 I/O 操作asyncio.gather(): 并发执行并收集结果asyncio.Queue: 异步队列asyncio.Lock: 异步锁
- 与外部世界交互:
asyncio和aiohttp - 最佳实践与常见陷阱
为什么需要 asyncio?- 解决什么问题
想象一下,你的程序需要执行以下任务:
- 从网站 A 下载一个大文件。
- 从网站 B 获取一些用户数据。
- 向数据库写入一条记录。
传统的同步方式会是这样:
- 开始下载文件 A。
- 阻塞:程序卡住,等待下载完成,在此期间,CPU 是空闲的,你无法做任何其他事情。
- 下载完成后,开始获取数据 B。
- 阻塞:程序再次卡住,等待网络响应。
- 获取完成后,开始写入数据库。
- 阻塞:等待数据库操作完成。
整个过程非常耗时,总时间是 T_A + T_B + T_DB。
而使用 asyncio 的异步方式会是这样的:
- 启动下载任务 A,并告诉它:“下载好了就通知我。”
- 立即启动获取数据任务 B,同样告诉它:“获取好了就通知我。”
- 立即启动写入数据库任务。
- 主程序进入“事件循环”(Event Loop),不断地检查:“任务 A 好了吗?任务 B 好了吗?数据库操作好了吗?”
- 假设任务 B 最先完成,事件循环会处理它的结果。
- 然后任务 A 完成,事件循环处理它的结果。
- 最后数据库操作完成。
总时间接近于 max(T_A, T_B, T_DB),因为多个 I/O 操作是并发执行的。
asyncio 的核心优势:
- 高并发:特别适合处理大量 I/O 密集型任务(如网络请求、文件读写、数据库操作)。
- 高效率:在等待 I/O 操作时,CPU 不会空闲,可以去执行其他任务,极大地利用了资源。
- 单线程模型:避免了多线程编程中的锁、死锁、线程切换等复杂问题。
注意:asyncio 不适合 CPU 密集型任务(如大量数学计算、图像处理),对于 CPU 密集型任务,应该使用多进程(multiprocessing)。
核心概念
在深入代码之前,必须理解三个核心概念:
a. 事件循环
事件循环是 asyncio 的心脏,你可以把它看作是一个无限循环的调度器,负责运行、暂停和恢复协程,它不断地检查哪些协程已经准备好被执行(即 I/O 操作已完成),然后调度它们继续运行。
在 Python 3.7+ 中,获取和启动事件循环变得非常简单。
b. 协程
协程是 asyncio 中的基本执行单元,它是一个使用 async def 关键字定义的函数,协程函数在被调用时不会立即执行,而是返回一个协程对象。
协程可以暂停执行(使用 await),并在将来某个点恢复执行,这个暂停和恢复的过程不会阻塞事件循环。
c. 任务
任务是对协程的进一步封装,它将协程对象包装成一个可以在事件循环中调度的“,当你创建一个任务时,事件循环会在某个时刻运行这个协程,并可以在它暂停时去运行其他任务。
任务让多个协程能够真正地并发执行。
async/await 语法详解
这是 asyncio 的灵魂。
async def
用于定义一个协程函数。
import asyncio
async def say_hello():
print("Hello")
# 暂停这个协程,让其他协程有机会运行
await asyncio.sleep(1)
print("World")
# 调用 say_hello() 不会立即打印 "Hello"
# 它返回的是一个协程对象
coro = say_hello()
print(coro)
# 输出: <coroutine object say_hello at 0x...>
await
用于在一个协程中暂停执行,并等待另一个“可等待”对象(通常是另一个协程或任务)完成。
关键点:
await只能在async def函数内部使用。- 当执行到
await时,当前协程会暂停,并将控制权交还给事件循环,事件循环会利用这个时间去执行其他已经准备好的任务。 - 当被
await的对象完成后,当前协程会从暂停的地方继续执行。
asyncio 的基本使用
a. 运行一个协程
要运行一个协程,你需要将它提交给事件循环,在 Python 3.7+ 中,asyncio.run() 是最简单、最推荐的方式,它会创建一个新的事件循环,运行你传入的协程,直到完成,然后关闭事件循环。
import asyncio
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print("Hello")
# say_after 是一个协程,await 会等待它执行完毕
await say_after(1, "World")
print("Finished")
# asyncio.run 会启动事件循环并运行 main() 协程
asyncio.run(main())
输出:
Hello
(等待 1 秒后)
World
Finished
b. 并发运行多个协程
这是 asyncio 的威力所在,我们不再需要一个接一个地 await 协程,而是可以创建多个任务,让它们并发执行。
asyncio.create_task() 可以将一个协程包装成一个任务,并“安排”它在事件循环中尽快运行。
import asyncio
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print("Start")
# 创建两个任务,但它们不会立即开始运行
# 事件循环会在稍后调度它们
task1 = asyncio.create_task(say_after(1, "Hello"))
task2 = asyncio.create_task(say_after(2, "World"))
# await 会等待所有任务完成
# 在等待期间,事件循环会运行其他任务
await task1
await task2
print("End")
asyncio.run(main())
输出:
Start
(等待 1 秒后)
Hello
(再等待 1 秒后)
World
End
注意:总耗时约 2 秒,而不是 3 秒。say_after(1, ...) 和 say_after(2, ...) 是并发执行的。
c. asyncio.gather() - 更优雅的并发
asyncio.gather(*coroutines_or_futures) 是一个更强大、更常用的函数,用于并发地运行多个可等待对象,并收集它们的返回值。
import asyncio
async def count(name, delay):
print(f"Count {name} started")
await asyncio.sleep(delay)
print(f"Count {name} finished")
return delay # 返回延迟时间
async def main():
# 创建多个协程对象
coro1 = count("A", 1)
coro2 = count("B", 2)
coro3 = count("C", 1)
# gather 会并发运行它们,并等待全部完成
# results 是一个列表,包含了各个协程的返回值,顺序与输入一致
results = await asyncio.gather(coro1, coro2, coro3)
print(f"All results: {results}")
asyncio.run(main())
输出:
Count A started
Count C started
(等待 1 秒后)
Count A finished
Count C finished
(再等待 1 秒后)
Count B finished
All results: [1, 2, 1]
gather 不仅让代码更整洁,还能自动处理结果的收集,非常方便。
asyncio 常用对象
a. asyncio.sleep()
这是 asyncio 中最常用的工具,用于模拟一个 I/O 操作,它会让当前协程暂停指定的时间,而不会阻塞整个事件循环。
b. asyncio.Queue
异步队列,用于在线程或任务之间安全地传递数据,它的 get() 和 put() 方法都是异步的。
import asyncio
async def producer(queue):
for i in range(5):
await asyncio.sleep(0.5)
print(f"Producer putting item {i}")
await queue.put(i)
await queue.put(None) # 发送信号,表示生产完成
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break # 收到结束信号
print(f"Consumer got item {item}")
# 模拟处理时间
await asyncio.sleep(1)
queue.task_done() # 标记任务完成
async def main():
q = asyncio.Queue()
# 创建生产者和消费者任务
p_task = asyncio.create_task(producer(q))
c_task = asyncio.create_task(consumer(q))
# 等待生产者完成
await p_task
# 等待队列被清空
await q.join()
# 取消消费者任务
c_task.cancel()
await c_task
asyncio.run(main())
c. asyncio.Lock
异步锁,用于保护共享资源,防止多个任务同时修改它,造成数据竞争。
import asyncio
counter = 0
lock = asyncio.Lock()
async def safe_counter():
global counter
async with lock: # 类似于 with lock:,但适用于异步上下文
print(f"Task {asyncio.current_task().get_name()} is counting")
current_value = counter
await asyncio.sleep(0.1) # 模拟耗时操作
counter = current_value + 1
print(f"Task {asyncio.current_task().get_name()} set counter to {counter}")
async def main():
# 创建多个任务,它们会竞争 counter
tasks = [asyncio.create_task(safe_counter(), name=f"Task-{i}") for i in range(3)]
await asyncio.gather(*tasks)
asyncio.run(main())
# 输出会是安全的,counter 最终会等于 3
与外部世界交互:asyncio 和 aiohttp
asyncio 本身不包含网络客户端库,你需要使用支持 asyncio 的第三方库。aiohttp 是最流行的异步 HTTP 客户端/服务器库。
安装:
pip install aiohttp
示例:并发下载多个网页
import asyncio
import aiohttp
async def fetch_url(session, url):
try:
print(f"Fetching {url}")
async with session.get(url, timeout=10) as response:
# response.text() 也是一个异步操作
html = await response.text()
print(f"Finished fetching {url}, length: {len(html)}")
return html
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def main():
urls = [
"http://python.org",
"http://github.com",
"http://stackoverflow.com",
"http://non-existent-website.xyz" # 这个会失败
]
# aiohttp.ClientSession 管理连接池,是推荐的用法
async with aiohttp.ClientSession() as session:
# 为每个 URL 创建一个任务
tasks = [fetch_url(session, url) for url in urls]
# 并发执行所有任务
htmls = await asyncio.gather(*tasks)
# 打印结果
for i, html in enumerate(htmls):
if html:
print(f"URL {i+1} fetched successfully.")
asyncio.run(main())
这个例子清晰地展示了 asyncio + aiohttp 在处理多个网络请求时的强大能力。
最佳实践与常见陷阱
最佳实践
- 使用
asyncio.run()作为程序的入口点:除非你有特殊需求,否则在主脚本中,用asyncio.run(your_main_coroutine())来启动。 - 避免在协程中使用阻塞 I/O:绝对不要在
async def函数中使用time.sleep()、requests.get()等同步阻塞函数,它们会阻塞整个事件循环,让你的异步程序失去所有优势,请使用它们的异步替代品(asyncio.sleep(),aiohttp)。 - 用
async with管理资源:对于像aiohttp.ClientSession这样的异步上下文管理器,始终使用async with来确保资源被正确释放。 - 将
asyncio.gather()用于独立的并发任务:当你有一组可以独立运行的任务,并且需要它们全部完成时,gather是最佳选择。 - 合理划分任务粒度:任务太小,调度开销会变大;任务太大,并发度会降低,找到平衡点。
常见陷阱
- 忘记
await:如果你只是调用了另一个协程函数而没有await它,它只会返回一个协程对象,并不会被执行,这是最常见的错误。# 错误示范 async def foo(): print("foo") async def bar(): foo() # 这不会打印 "foo" await foo() # 这样才会 - 在同步代码中混用
asyncio.run():asyncio.run()会创建一个新的事件循环并在运行后关闭它,如果你在已经运行着事件循环的线程中调用它(在另一个异步函数中),会抛出RuntimeError,在这种情况下,应该使用asyncio.get_event_loop().run_until_complete()。 - 过度使用
asyncio:如果你的应用主要是 CPU 密集型的,或者任务数量很少,使用asyncio可能比简单的同步代码更复杂,且性能提升不明显,选择正确的工具。
asyncio 是 Python 中处理高并发 I/O 操作的利器,通过掌握 async/await 语法、事件循环、协程和任务等核心概念,你可以编写出简洁、高效且易于维护的并发代码。
学习路径建议:
- 从
async def和await开始,理解协程的暂停和恢复机制。 - 使用
asyncio.run()运行简单的协程。 - 学习使用
asyncio.create_task()和asyncio.gather()实现并发。 - 尝试使用
asyncio.sleep()模拟真实场景。 - 学习使用
aiohttp进行异步网络请求,这是asyncio最经典的应用场景。 - 探索更高级的对象,如
Queue,Lock,Semaphore等。
希望这份教程能帮助你顺利入门 asyncio!
