0%

流畅的 Python 第 2 版(21):异步编程

这篇文章将介绍 Python 中的异步编程,这些内容建立在之介绍的可迭代对象和生成器、上下文管理器、以及并发编程一般概念等基础之上。

一些定义

  • 原生协程:使用 async def 定义的协程函数。在原生协程内可以使用 await 关键字委托另一个原生协程,这类似于在经典协程中使用 yield fromasync def 语句定义的始终是原生协程,即使主体中没有使用 await 关键字。await 关键字不能在原生协程外部使用

  • 经典协程:一种生成器函数,在表达式中使用 yield 读取 my_coro.send(data) 调用发送的数据。经典协程可以使用 yield from 委托其他经典协程。经典协程不能由 await 驱动,而且 asyncio 库不再支持

  • 基于生成器的协程:基于生成器的协程一种使用 @types.coroutine(Python3.5 引入)装饰的生成器函数。使用这个装饰器的生成器与新增的 await 关键字兼容

供经典协程和基于生成器的协程使用的 @asyncio.coroutine 装饰器在 Python3.8 中已被弃用

探测域名

如下脚本通过原生协程对象探测 DNS。由于异步操作是交叉执行的,因此检查多个域名所需的时间比依序检查少很多。其实,总用时基本与最慢的 DNS 响应时长相当,而不是所有响应时间之和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python3
import asyncio
import socket
from keyword import kwlist

MAX_KEYWORD_LEN = 4


async def probe(domain: str) -> tuple[str, bool]:
loop = asyncio.get_running_loop()
try:
await loop.getaddrinfo(domain, None)
except socket.gaierror:
return (domain, False)
return (domain, True)


async def main() -> None:
names = (kw for kw in kwlist if len(kw) <= MAX_KEYWORD_LEN)
domains = (f'{name}.dev'.lower() for name in names)
coros = [probe(domain) for domain in domains]
for coro in asyncio.as_completed(coros):
domain, found = await coro
mark = '+' if found else ' '
print(f'{mark} {domain}')


if __name__ == '__main__':
asyncio.run(main())
  • 通过 asyncio.get_running_loop() 获取当前事件循环的引用,供后面使用
  • 协程方法 loop.getaddrinfo(...) 返回一个五元组,使用套接字连接指定的地址
  • [probe(domain) for domain in domains] 通过调用 probe 协程函数,并用列表推导构建一个协程对象的列表
  • asyncio.as_completed 是一个生成器,产出协程,按照传入协程的完成的顺序(不是协程的提交顺序)返回结果(类似于之前看到过的 futures.as_completed
  • 由于 as_completed 产出的协程对象已经完成,因此直接 await 这个协程并不会阻塞
  • 通过 await coro 获取协程的运行结果,若 coro 抛出的异常未被处理,会在这里重新抛出
  • asyncio.run 启动事件循环,仅当事件循环退出后返回。将 main 作为协程并通过 asyncio.run 运行,是启动异步程序的常用方式

需要注意,asyncio.get_running_loop 函数在 Python3.7 中新增,供协程内部使用:

  • 如果没有运行中的循环,那么 asyncio.get_running_loop 抛出 RuntimeError
  • asyncio.get_running_loop 的实现比 asyncio.get_event_loop 更简单,速度也更快
  • asyncio.get_event_loop 在必要时会启动事件循环。从 Python3.10 开始, asyncio.get_event_loop 已被弃用,最终将变成 asyncio.get_running_loop 的别名

await loop.getaddrinfo(...) 句法能避免阻塞,因为 await 中止当前协程对象。loop.getaddrinfo(...) 创建一个新的协程对象,等待该协程对象启动底层 addrinfo 查询,控制权交还事件循环,而不是被中止的 probe 协程,这样事件循环就可以驱动其他待完成的协程对象

阅读异步代码一个技巧是,对 await 关键字视而不见,假装它不存在。如此一来,协程就像是依序运行的常规函数。

可异步调用对象

for 关键字处理可迭代对象,await 关键字处理可异步调用对象。作为 asyncio 库的终端用户,日常可见到以下两种可异步调用对象:

  • 原生协程对象:通过调用原生协程函数得到
  • asyncio.Task:通常由把协程对象传给 asyncio.create_task() 得到

终端用户编写的代码不一定要使用 await 处理 Task,还可以使用 asyncio.create_task(one_coro()) 调度 one_coro 自动执行,不等待它返回。如果不打算取消或等待任务,则无须保存 create_task 返回的 Task 对象。仅仅创建任务后,事件循环就能在合适的时机自动调度协程运行。相比之下,使用 await other_coro() 立即运行 other_coro等待协程运行完毕,因为继续向下执行之前需要协程返回的结果

总结一下:

  • asyncio.create_task(one_coro()) 创建得到一个 Task 对象,事件循环会自动在合适的时机调度协程执行
  • await other_coro() 立即运行协程,并等待它返回结果才会继续向下执行
  • 也能对 task 对象进行 await,以等待 task 对象完成

如下展示了两个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio
from time import time

async def one_coro():
print("coro started", time())
await asyncio.sleep(2)
print("coro ended", time())
return 10

async def main():
task = asyncio.create_task(one_coro())
print("task created", time())
# 事件循环会自动调度 one_coro 协程执行
await asyncio.sleep(5)
print("main ended", time())


asyncio.run(main())
1
2
3
4
task created 1765594501.602654
coro started 1765594501.602734
coro ended 1765594503.603949
main ended 1765594506.604118
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

async def one_coro():
print("coro started")
await asyncio.sleep(2)
print("coro ended")
return 10

async def main():
task = asyncio.create_task(one_coro())
print("await task started")
# 手动 await task 对象,等待它完成
t = await task
print(t)
print("await task ended")


asyncio.run(main())
1
2
3
4
5
await task started
coro started
coro ended
10
await task ended

实现异步库,或者为 asyncio 库做贡献时,可能还要处理以下底层的可异步调用对象:

  • 提供 __await__ 方法、返回一个迭代器的对象;例如,asyncio.Future 实例(asyncio.Taskasyncio.Future 的子类
  • 以其他语言编写的对象,使用 Python/C API,提供 tp_as_async.am_await 函数(类似于 __await__ 方法)​,返回一个迭代器

现有代码库里可能还有一种可异步调用对象:基于生成器的协程对象,但是正在走弃用流程。

PEP 492 指出,await 表达式 使用 yield from 实现,外加验证参数步骤​,而且 await 只接受一个可异步调用对象。

使用 asyncio 和 HTTPX 下载

自 Python3.10 起,asyncio 库仅直接支持 TCP 和 UDP,而且标准库中没有异步 HTTP 客户端和服务器包。在所有 HTTP 客户端示例中,将使用 HTTPX。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio

from httpx import AsyncClient

from flags import BASE_URL, save_flag, main

async def download_one(client: AsyncClient, cc: str):
image = await get_flag(client, cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
return cc

async def get_flag(client: AsyncClient, cc: str) -> bytes:
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
resp = await client.get(url, timeout=6.1,
follow_redirects=True)
return resp.read()

def download_many(cc_list: list[str]) -> int:
return asyncio.run(supervisor(cc_list))

async def supervisor(cc_list: list[str]) -> int:
async with AsyncClient() as client:
to_do = [download_one(client, cc)
for cc in sorted(cc_list)]
res = await asyncio.gather(*to_do)

return len(res)

if __name__ == '__main__':
main(download_many)
  • httpx 中的异步 HTTP 客户端是 AsyncClient。AsyncClient 还是异步上下文管理器,即提供了异步设置和清理方法的上下文管理器
  • asyncio.gather 接受的参数是一个或多个可异步调用对象,等待全部执行完毕,以可异步调用对象的提交顺序返回结果列表
  • 为了提升性能,download_one 中的 save_flag 调用应该异步执行,以免阻塞事件循环。但是,目前 asyncio 没有提供(类似 Node.js 那种)异步文件系统 API

原生协程的秘密:默默无闻的生成器

经典协程示例和原生协程的关键区别在于,后者没有一目了然的 .send() 调用或 yield 表达式。你的代码位于 asyncio 库和你使用的异步库(例如 HTTPX)之间。asyncio 事件循环在背后调用 .send 驱动你的协程,而你的协程使用 await 等待其他协程,包括库提供的协程。await 的实现大量借鉴 yield from,也调用 .send 驱动协程。

await 链最终到达一个底层可异步调用对象,返回一个生成器,由事件循环驱动,对计时器或网络 I/O 等事件做出响应。位于 await 链末端的底层可异步调用对象深埋在库的实现中,不作为 API 开放,有可能是 Python/C 扩展。

使用 asyncio.gatherasyncio.create_task 等函数可以启动多个并发 await 通道,在单个线程内由单个事件循环驱动多个 I/O 操作并发执行。

不成功便成仁 问题

为了使用 HTTPX 提供的异步 API,必须将其重写为协程,因此 get_flag 函数不能重用其同步版本。为了充分发挥 asyncio 的性能,必须把执行 I/ O操作的每个函数替换为异步版本,使用 awaitasyncio.create_task 激活,这样在函数等待 I/O 期间才能把控制权交还给事件循环。如果无法把导致阻塞的函数重写为协程,那就应该在单独的线程或进程中运行那个函数

这就异步编程所说的 你孤注一掷重写代码,要么彻底避免阻塞,要么纯属浪费时间

异步上下文管理器

之前介绍过,在 with 块主体的前后可以使用一个对象运行代码,前提是那个对象所属的类提供了 __enter____exit__ 方法。

看如下 asyncpg 代码,数据库事务特别适合使用上下文管理器协议:事务务必启动,数据由 connection.execute 改动,然后根据改动的结果,必须回滚或提交:

1
2
3
4
5
6
7
8
9
tr = connection.transaction()
await tr.start()
try:
await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")
except:
await tr.rollback()
raise
else:
await tr.commit()

这种异步驱动中,设置和清理需要由协程执行,好让其他操作有机会并发执行然而,传统的 with 语句采用的实现方式不支持协程使用 __enter____exit__ 执行相关操作。PEP 492—Coroutines with async and await syntax 引入了 async with 语句,用于实现异步上下文管理器,即一种以协程实现 __aenter____aexit__ 方法的对象

使用 async with,如下代码简化为:

1
2
async with connection.transaction():
await connection.execute("INSERT INTO mytable VALUES (1, 2, 3)")
  • asyncpg.Transaction 类中,协程方法 __aenter__ 使用 await self.start()
  • 协程方法 __aexit__ 根据有没有异常发生,异步 await 私有协程方法 __rollback__commit
  • 使用协程把 Transaction 实现为异步上下文管理器,asyncpg 就能并发处理多个事务

注意,只要 async with 要求,__aenter____aexit__ 是协程,但 with 主体的代码是否使用协程则无关紧要

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MyAsyncContextManager:
def __init__(self, name: str):
self.name = name

async def __aenter__(self):
print(f"[ASYNC] Entering context: {self.name}")
await asyncio.sleep(0.1)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"[ASYNC] Exiting context: {self.name}")
await asyncio.sleep(0.05)
return False


async def main():
async with MyAsyncContextManager("MyResource") as ctx:
x = 42
message = f"Hello from {ctx.name}"
result = x * 2
print(message)
print(f"Result: {result}")

asyncio.run(main())

或者 with 块内使用了协程,但是上下文管理器器对象不是异步上下文管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time

class MyAsyncContextManager:
def __init__(self, name: str):
self.name = name

def __enter__(self):
print(f"[SYNC] Entering context: {self.name}")
time.sleep(1)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
print(f"[SYNC] Exiting context: {self.name}")
time.sleep(1)
return False


async def main():
with MyAsyncContextManager("MyResource") as ctx:
print(f"in with block")
x = 42
message = f"Hello from {ctx.name}"
await asyncio.sleep(1)
print(message)

asyncio.run(main())

增强 asyncio 版下载脚本的功能

在之前的下载代码中,我们把几个协程传给 asyncio.gather,按照协程的提交顺序返回协程的结果构成的列表。这意味着,只有所有可异步调用对象都执行完毕后,asyncio.gather 才返回。我们可以使用 as_compeleted 替代 gather,以便在某个协程完成时立即得到结果。

如下增强 asyncio 版下载脚本的功能(不分代码省略):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import asyncio
from collections import Counter
from http import HTTPStatus
from pathlib import Path

import httpx
import tqdm # type: ignore

from flags2_common import main, DownloadStatus, save_flag

DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

async def get_flag(client: httpx.AsyncClient,
base_url: str,
cc: str) -> bytes:
url = f'{base_url}/{cc}/{cc}.gif'.lower()
resp = await client.get(url, timeout=3.1, follow_redirects=True)
resp.raise_for_status()
return resp.content

async def download_one(client: httpx.AsyncClient,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> DownloadStatus:
try:
async with semaphore:
image = await get_flag(client, base_url, cc)
except httpx.HTTPStatusError as exc:
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND
msg = f'not found: {res.url}'
else:
raise
else:
await asyncio.to_thread(save_flag, image, f'{cc}.gif')
status = DownloadStatus.OK
msg = 'OK'
if verbose and msg:
print(cc, msg)
return status

async def supervisor(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]:
counter: Counter[DownloadStatus] = Counter()
semaphore = asyncio.Semaphore(concur_req)
async with httpx.AsyncClient() as client:
to_do = [download_one(client, cc, base_url, semaphore, verbose)
for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
error: httpx.HTTPError | None = None
for coro in to_do_iter:
try:
status = await coro
except httpx.HTTPStatusError as exc:
error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
error_msg = error_msg.format(resp=exc.response)
error = exc
except httpx.RequestError as exc:
error_msg = f'{exc} {type(exc)}'.strip()
error = exc
except KeyboardInterrupt:
break

if error:
status = DownloadStatus.ERROR
if verbose:
url = str(error.request.url)
cc = Path(url).stem.upper()
print(f'{cc} error: {error_msg}')
counter[status] += 1

return counter

def download_many(cc_list: list[str],
base_url: str,
verbose: bool,
concur_req: int) -> Counter[DownloadStatus]:
coro = supervisor(cc_list, base_url, verbose, concur_req)
counts = asyncio.run(coro)

return counts

if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
  • 文件 I/O 也是阻塞操作,但是没有对应的异步处理版本。从 Python3.9 开始,asyncio.to_thread 协程可以轻松地把文件 I/O 委托给 asyncio 提供的一个线程池
  • 通过使用信号量,可以对网络客户端发起的请求进行限流。信号量可以配置最大数量,而且一个信号量可由多个协程持有,因此特别适合用于限制活动的并发协程数量
  • 在这个代码中,将信号量当做异步上下文管理器使用。asyncio.Semaphore 有一个内部计数器:
    • 每次使用 await 处理协程方法 .acquire() 时,计数器递减
    • 每次调用 .release() 方法(不是协程,因为永不阻塞)​,计数器递增
    • 计数器的初始值在实例化 Semaphore 时设定
    • 若计数器大于零,则使用 await 处理 .acquire() 方法没有延迟
    • 若计时器为零,则 .acquire() 中止待处理的协程,直到其他协程在同一个 Semaphore 实例上调用 .release(),递增计数器
    • 一般不直接调用这些方法,把 semaphore 当作异步上下文管理器使用更安全
    • 协程方法 Semaphore.__aenter__ 异步等待 .acquire(),协程方法 __aexit__ 调用 .release()
    • 标准库中的各个 Semaphore 类均有一个 BoundedSemaphore 子类,额外施加一个约束:如果 .release() 操作数超过 .acquire() 操作数,那么内部计时器的值不能比初始值大
  • 调用 as_completed 获取一个迭代器,返回处理完毕的协程对象
  • download_many 实例化 supervisor 协程对象,通过 asyncio.run 传给事件循环
  • Python 语言不使用块级作用域,循环和 try/except 等语句不在它们管理的块中创建局部作用域。但是,如果 except 子句把异常绑定到变量上(例如 exc 变量)​,绑定的变量只存在于 except 子句所在的块内。因此使用 error 变量记录异常

每次下载发起多个请求

在线程版脚本中很容易让同一个任务发起多个请求:先发起一个请求,再发起另一个请求,阻塞线程两次,把两部分数据(国家代码和名称)存储在局部变量中。如果在异步脚本中使用回调实现同样的操作,那就需要嵌套函数,在闭包内存储国家代码和名称变量,因为各个回调在不同的局部作用域内运行。await关键字的出现拯救了我们:一个异步请求结束后驱动另一个请求,共用驱动协程的局部作用域

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
async def get_country(client: httpx.AsyncClient,
base_url: str,
cc: str) -> str:
url = f'{base_url}/{cc}/metadata.json'.lower()
resp = await client.get(url, timeout=3.1, follow_redirects=True)
resp.raise_for_status()
metadata = resp.json()
return metadata['country']

async def download_one(client: httpx.AsyncClient,
cc: str,
base_url: str,
semaphore: asyncio.Semaphore,
verbose: bool) -> DownloadStatus:
try:
async with semaphore:
image = await get_flag(client, base_url, cc)
async with semaphore:
country = await get_country(client, base_url, cc)
except httpx.HTTPStatusError as exc:
res = exc.response
if res.status_code == HTTPStatus.NOT_FOUND:
status = DownloadStatus.NOT_FOUND
msg = f'not found: {res.url}'
else:
raise
else:
filename = country.replace(' ', '_')
await asyncio.to_thread(save_flag, image, f'{filename}.gif')
status = DownloadStatus.OK
msg = 'OK'
if verbose and msg:
print(cc, msg)
return status

get_flagget_country 也可以使用 asyncio.gather 并行调度,但若 get_flag 抛出异常,则没有图像需要保存,此时运行 get_country 没有意义。不过,有些时候应该使用 asyncio.gather 同时请求多个 API,而不是等一个请求得到响应之后再发起下一个请求。

使用现代 Python 做异步应用程序编程,如果大量使用回调,那就说明你使用的可能是旧的编程模式,已不适应现代 Python 环境。

把任务委托给执行器

与 Python 相比,Node.js 在异步编程上的显著优势体现在 Node.js 标准库上:为所有 I/O 提供了异步 API,而不仅限于网络 I/O。在 Python 中,如果你不够小心,那么文件 I/O 可以明显降低异步应用程序的性能水平,因为在主线程中读写存储器会阻塞事件循环

之前介绍过 asyncio.to_threadasyncio.to_thread 可以将阻塞操作提交到线程池执行,从而避免阻塞事件循环,它返回一个协程对象:

1
async asyncio.to_thread(func, /, *args, **kwargs)

之前已经用过了 asyncio.to_thread 来执行保存操作:

1
await asyncio.to_thread(save_flag, image, f'{cc}.gif')

asyncio.to_thread 在 Python 3.9 中增加。如果想支持 Python 3.7 或 3.8,则需要把那一行替换成如下代码:

1
2
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, save_flag, image, f'{cc}.gif')
  • 首先获取事件循环的引用
  • loop.run_in_executor` 第一个参数是要使用的执行器。这里传入 None,使用在 asyncio事件循环中始终可用的默认执行器ThreadPoolExecutor。
  • 可以通过位置参数将参数传入要运行的函数,但是如果传入关键字参数,则需要利用 functool.partial,详见 run_in_executor 文档

新增的 asyncio.to_thread 函数用起来更简单,也更灵活,因为它也接受关键字参数。asyncio 自身的实现在背后多次使用 run_in_executor,例如 loop.getaddrinfo(...) 协程通过调用 socket 模块中的 getaddrinfo 函数实现,而这个函数要阻塞几秒钟才返回。

异步 API 经常在内部使用 run_in_executor 把实现细节中的阻塞调用包装成协程。如此一来,协程的接口保持了一致,都使用 await 驱动,而且用到的线程被隐藏起来了,简单纯粹

把一个特定的执行器显式传给 loop.run_in_executor主要是为了使用 ProcessPoolExecutor 在不同的 Python 进程中运行 CPU 密集型函数,以避免争用 GIL。由于启动进程的开销较大,因此最好在 supervisor 中启动 ProcessPoolExecutor,再把它传给需要用到的协程。

使用 asyncio 编写服务器

FastAPI 是一个 Python ASGI Web 框架。FastAPI 旨在为单页应用程序和移动应用程序实现后端,基本上都是返回 JSON 响应的 Web API 端点,不在服务器端渲染 HTML。借助装饰器、类型提示和代码内省,FastAPI 可以削减 Web API 的大量样板代码。另外,FastAPI 还能为我们创建的 API 自动发布交互式 OpenAPI(即 Swagger)文档。

使用 uvicorn 在开发模式下运行代码的例子如下:

1
$ uvicorn web_mojifinder:app --reload
  • web_mojifinder:app:包名、一个冒号和包内定义的 ASGI 应用程序的名称
  • --reload 让 uvicorn 监控应用程序源文件的变化,自动重新加载。只在开发过程中有用

如下是示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from pathlib import Path
from unicodedata import name

from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel

from charindex import InvertedIndex

STATIC_PATH = Path(__file__).parent.absolute() / 'static'

app = FastAPI(
title='Mojifinder Web',
description='Search for Unicode characters by name.',
)

class CharName(BaseModel):
char: str
name: str

def init(app):
app.state.index = InvertedIndex()
app.state.form = (STATIC_PATH / 'form.html').read_text()

init(app)

@app.get('/search', response_model=list[CharName])
async def search(q: str):
chars = sorted(app.state.index.search(q))
return ({'char': c, 'name': name(c)} for c in chars)

@app.get('/', response_class=HTMLResponse, include_in_schema=False)
def form():
return app.state.form

# 没有主函数
  • app = FastAPI() 定义 ASGI 应用程序,提供的参数是自动生成的文档的元数据
  • 定义 /search 端点的路由。response_model 使用前面定义的 pydantic 模型 CharName 描述响应格式
  • FastAPI 假定函数或协程签名中不在路由路径内的参数都是 HTTP 查询字符串
  • search() 返回由字典构成的可迭代对象,与 response_model 模式兼容,因此 FastAPI 根据 @app.get 装饰器中的 response_model 构建 JSON 响应
  • 响应模型在 response_model 这个参数中声明,而不通过函数的返回值类型注解,因为处理路径的函数可能根本不返回响应模型,而是返回一个字典、数据库对象或其他模型,然后使用 response_model 限定字段和序列化
  • 在search中,返回一个生成器,产出的项是字典,而不是 CharName 对象构成的列表,但是 FastAPI 和 pydantic 能正确验证数据,构建与 response_model=list[CharName] 相容的 JSON 响应
  • 常规函数(即非异步运行)form 也可用于生成响应
  • 这个模块没有主函数,由 ASGI 服务器(本例中是 uvicorn)加载和驱动

这个例子没有直接调用 asyncio。FastAPI 构建在 ASGI 工具包 Starlette 之上,后者使用了 asyncio。另外要注意,search 的主体中没有 await、async with 或 async for,因此也可以定义为普通函数。在真实的应用中,多数端点要查询数据库或请求远程服务器,因此 FastAPI(以及其他 ASGI 框架)务必支持协程,以便利用异步库处理网络 I/O。

接下来再来看一个 TCP 服务器示例,无须借助外部依赖,使用 asyncio 就能写出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async def supervisor(index: InvertedIndex, host: str, port: int) -> None:
server = await asyncio.start_server(
functools.partial(finder, index),
host, port)

socket_list = cast(tuple[TransportSocket, ...], server.sockets)
addr = socket_list[0].getsockname()
print(f'Serving on {addr}. Hit CTRL-C to stop.')
await server.serve_forever()

def main(host: str = '127.0.0.1', port_arg: str = '2323'):
port = int(port_arg)
print('Building index.')
index = InvertedIndex()
try:
asyncio.run(supervisor(index, host, port))
except KeyboardInterrupt:
print('\nServer shut down.')

if __name__ == '__main__':
main(*sys.argv[1:])
  • start_server 的第一个参数 client_connected_cb 是一个回调,在客户端发起新连接时运行,这个回调可以是普通函数,也可以是协程。这个回调函数必须接受两个参数,一个是 asyncio.StreamReader 对象,另一个是 asyncio.StreamWriter 对象
  • 这里实现的 finder 协程还需要获取 index,因此使用 functools.partial 绑定该参数,得到一个接受 asyncio.StreamReaderasyncio.StreamWriter 对象的回调
  • 在事件循环存续期间,针对连接服务器的每一个客户端启动一个 finder 协程实例,从而让这个简单的服务器可以并发处理多个客户端,这个过程一直持续,直到服务器抛出 KeyboardInterrupt,或者服务器进程被操作系统终止
  • 虽然 start_server 是以并发任务启动服务器的,但这里还是要使用 await 处理 server_forever 方法,目的是让实现的 supervisor 协程在这里中止。倘若没有这一行,supervisor 将立即返回,终止 asyncio.run(supervisor(...)) 启动的循环,导致程序退出

接下来再看其中 finder 协程的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import asyncio
import functools
import sys
from asyncio.trsock import TransportSocket
from typing import cast

from charindex import InvertedIndex, format_results ➊

CRLF = b'\r\n'
PROMPT = b'?> '

async def finder(index: InvertedIndex,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter) -> None:
client = writer.get_extra_info('peername')
while True:
writer.write(PROMPT) # 不能使用await
await writer.drain() # 必须使用await
data = await reader.readline()
if not data:
break
try:
query = data.decode().strip()
except UnicodeDecodeError:
query = '\x00'
print(f' From {client}: {query!r}')
if query:
if ord(query[:1]) < 32:
break
results = await search(query, index, writer)
print(f' To {client}: {results} results.')

writer.close()
await writer.wait_closed()
print(f'Close {client}.')

async def search(query: str,
index: InvertedIndex,
writer: asyncio.StreamWriter) -> int:
chars = index.search(query)
lines = (line.encode() + CRLF for line
in format_results(chars))
writer.writelines(lines)
await writer.drain()
status_line = f'{"─" * 66} {len(chars)} found'
writer.write(status_line.encode() + CRLF)
await writer.drain()
return len(chars)
  • 这个循环处理一个对话,直到从客户端收到一个控制字符
  • StreamWriter.write 不是协程方法,只是普通函数
  • StreamWriter.drain 刷新 writer 缓冲。这是一个协程,必须由 await 驱动
  • StreamWriter.readline 是一个协程,返回 bytes
  • search 也是一个协程,因为需要写入 StreamWriter,而且必须使用 StreamWriter 的协程方法 .drain()

从这里例子中,可以看到,有些 I/O 方法是协程,必须使用 await 驱动,而另一些是普通函数。利用 asyncio 提供的高层 Streams API,我们只需实现一个处理函数(可以是普通回调或协程)就能得到一个可用的服务器。

异步迭代和异步可迭代对象

之前介绍过,async with 可以处理实现了 __aenter____aexit__ 方法的对象,二者返回可异步调用对象,通常是协程对象。类似地,async for 处理异步可迭代对象,即实现了 __aiter__ 的对象。然而,__aiter__ 必须是常规方法(不是协程方法)​,而且必须返回一个异步迭代器。异步迭代器提供 __anext__ 协程方法,返回一个可异步调用对象,通常是一个协程对象。异步迭代器也应实现 __aiter__,往往返回 self。

PostgreSQL 异步驱动 aiopg 的文档中有一个示例,演示了如何使用 async for 迭代一个数据库游标的各行:

1
2
3
4
5
6
7
8
9
async def go():
pool = await aiopg.create_pool(dsn)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
ret = []
async for row in cur:
ret.append(row)
assert ret == [(1,)]

异步生成器函数

若想实现异步迭代器,可以编写一个类,实现 __anext____aiter__。不过,还有更简单的方法:以 async def 声明一个函数,在主体中使用 yield。这与利用经典迭代器模式的生成器函数是一样的。

从 Python3.8 开始,使用命令行选项 -m asyncio 运行解释器可以得到一个 异步 REPL​,这个 Python 控制台导入 asyncio,提供一个运行中的事件循环,支持在顶层提示符下使用 awaitasync forasync with(这些在原生协程外部使用原本会导致句法错误)。

1
2
3
4
5
6
7
8
$ python3 -m asyncio
asyncio REPL 3.13.3 (main, Apr 8 2025, 13:54:08) [Clang 16.0.0 (clang-1600.0.26.6)] on darwin
Use "await" directly instead of "asyncio.run()".
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio

>>> await asyncio.sleep(2, "rise and shine")
'rise and shine'

异步生成器由 async for 驱动,可能导致阻塞​。另外,异步推导式也使用 async for。如下实现了一个异步生成器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import socket
from collections.abc import Iterable, AsyncIterator
from typing import NamedTuple, Optional

class Result(NamedTuple):
domain: str
found: bool


OptionalLoop = Optional[asyncio.AbstractEventLoop]


async def probe(domain: str, loop: OptionalLoop = None) -> Result:
if loop is None:
loop = asyncio.get_running_loop()
try:
await loop.getaddrinfo(domain, None)
except socket.gaierror:
return Result(domain, False)
return Result(domain, True)


async def multi_probe(domains: Iterable[str]) -> AsyncIterator[Result]:
loop = asyncio.get_running_loop()
coros = [probe(domain, loop) for domain in domains]
for coro in asyncio.as_completed(coros):
result = await coro
yield result
  • 异步生成器函数产生一个异步生成器对象,可以注解为 AsyncIterator[SomeType]​
  • multi_probe yield result,有这一行的存在,multi_probe 才是异步生成器

更简化的写法可以为:

1
2
for coro in asyncio.as_completed(coros):
yield await coro

而使用这个异步生成器的代码也很简单:

1
2
3
async for domain, found in multi_probe(domains):
indent = '' if found else '\t\t'
print(f'{indent}{domain}')

之前介绍过,生成器还有一个与迭代无关的用途:上下文管理器。对于异步上下文也是如此。自己编写异步上下文管理器并不常见,万一需要,可以考虑使用 Python3.7 在 contextlib 模块中增加的 @asynccontextmanager 装饰器。这与之前介绍的 @contextmanager 装饰器非常相似:

1
2
3
4
5
6
7
8
9
10
11
12
from contextlib import asynccontextmanager

@asynccontextmanager
async def web_page(url):
loop = asyncio.get_running_loop()
data = await loop.run_in_executor(
None, download_webpage, url)
yield data
await loop.run_in_executor(None, update_stats, url)

async with web_page('google.com') as data:
process(data)
  • 被装饰的函数必须是异步生成器
  • 这个 yield 表达式前面的所有行将变成装饰器构建的异步上下文管理器的 __aenter__ 协程方法
  • yield 表达式后面的行将变成 __aexit__ 协程方法。这里,我们把另一个阻塞调用委托给线程执行器
  • 使用 async with 结构调用 web_page

最后,对比一下异步生成器函数与原生协程,异步生成器与原生协程异步生成器函数与原生协程之间的主要同异点如下:

  • 都使用 async def 声明
  • 异步生成器的主体中肯定有一个 yield 表达式,这才是生成器,原生协程绝对不含 yield
  • 原生协程可能返回 None 之外的值,异步生成器只能使用空 return 语句
  • 原生协程是可异步调用对象,可由 await 表达式驱动,也可以传给 asyncio 库中众多接受可异步调用对象的函数,例如 create_task。异步生成器不是可异步调用对象,而是异步可迭代对象,由 async for异步推导式 驱动

异步生成器表达式和异步推导式

PEP 530—Asynchronous Comprehensions 为推导式和生成器表达式引入 async forawait 句法。如下针对异步生成器 multi_probe,我们可以再编写一个异步生成器,只返回找到的域名:

1
2
3
4
5
6
7
8
9
10
11
>>> from domainlib import multi_probe
>>> names = 'python.org rust-lang.org golang.org no-lang.invalid'.split()
>>> gen_found = (name async for name, found in multi_probe(names) if found)
>>> gen_found
<async_generator object <genexpr> at 0x10a8f9700>
>>> async for name in gen_found:
... print(name)
...
golang.org
python.org
rust-lang.org
  • 使用 async for,表明这是一个异步生成器表达式
  • 异步生成器表达式构建一个 async_generator 对象,与异步生成器函数(例如 multi_probe)返回的对象是一种类型
  • 异步生成器对象由 async for 语句驱动,而该语句只能出现在 async def 主体内,或者在异步控制台中使用

异步生成器表达式可在程序的任何位置定义,但是只能在原生协程或异步生成器函数内使用

对于如下代码:

1
2
3
4
result = []
async for i in aiter():
if i % 2:
result.append(i)

可以使用异步推导式进行改写:

1
result = [i async for i in aiter() if i % 2]

另外,对于原生协程 fun,我们可以这样编写代码:

1
result = [await fun() for fun in funcs]

在列表推导式中使用 await,作用类似于 asyncio.gather。不过,gather 接受一个可选的 return_exceptions 参数,可以进一步处理异常。建议始终设置return_exceptions=True(默认值为False)​。

PEP 530 允许在列表推导式中使用 async forawait,在词典和集合推导式中也可以:

1
2
3
>>> {name: found async for name, found in multi_probe(names)}
{'golang.org': True, 'python.org': True, 'no-lang.invalid': False,
'rust-lang.org': True}

forasync for 子句前面,以及 if 子句后面,可以使用 await 关键字。下面在异步控制台中使用集合推导式收集找到的域名。

1
2
>>> {name for name in names if (await probe(name)).found}
{'rust-lang.org', 'python.org', 'golang.org'}

再次强调,这些推导式只能出现在 async def 主体内,或者在施了魔法的异步控制台中使用

asyncio 之外的异步世界:Curio

Python 语言的 async/await 结构不限于特定的事件循环或库。特殊方法提供的 API 具有扩展性,任何有足够动机的人都可以自己编写异步运行时环境和框架,驱动原生协程、异步生成器等。David Beazley 开发的 Curio 项目就是这样做的。

异步对象的类型提示

原生协程返回的类型是原生协程函数主体内出现在 return 语句中的对象类型,指明使用 await 处理协程时得到的是什么

注解接受一个协程对象的参数,使用以下泛型:

1
2
class typing.Coroutine(Awaitable[V_co], Generic[T_co, T_contra, V_co]):
...

如下类型用于注解注解异步对象:

1
2
3
4
5
6
7
8
9
10
class typing.AsyncContextManager(Generic[T_co]):
...
class typing.AsyncIterable(Generic[T_co]):
...
class typing.AsyncIterator(AsyncIterable[T_co]):
...
class typing.AsyncGenerator(AsyncIterator[T_co], Generic[T_co, T_contra]):
...
class typing.Awaitable(Generic[T_co]):
...

关于其中的泛型:

  • 第一个类型参数,即异步对象产出的项的类型,均是协变的:之前说过的,如果一个形式类型参数定义的是从对象中获取的数据类型,该形式类型参数可能是协变的
  • AsyncGeneratorCoroutine 的第二个到最后一个参数是逆变的。这些是底层 .send() 方法的参数类型,事件循环调用该方法驱动异步生成器和协程。因此,属于 输入 类型。所以,根据 型变经验法则 第二条,可以是逆变的
  • 最后,与 typing.Generator 不同​,AsyncGenerator 没有返回值类型:
    • 通过抛出 StopIteration(value) 返回一个值是一种特殊处理方式,为的是把生成器用作协程,以及支持 yield from
    • 异步对象之间没有这种重叠:AsyncGenerator 对象不返回值,与原生协程对象(使用 typing.Coroutine 注解)完全不沾边

异步原理与陷阱

接下来我们将抛开所用的语言和库,讨论一下异步编程相关的总体思想。规范的异步编程方法可以提升服务器的性能。而我们面对的挑战是如何实现规范方法。第一步是认清一个事实,即 I/O密集型系统 只是一个幻觉。

  • 你遇到的可能是 I/O 密集型函数。系统中的函数,大多数算是 I/O 密集型的:与处理数据相比,用在等待 I/O 上的时间更多。在等待的过程中,函数把控制权交给事件循环,驱动其他挂起的任务
  • 任何非平凡系统都有一些部分是 CPU 密集型的,这是不可避免的

大规模使用 Python,应该备有自动化测试,专门检测性能衰退,尽早发现性能问题。这对异步代码至关重要,而且因为有 GIL,这对 Python 多线程代码也不可或缺。发现 CPU 占用出现瓶颈后,可以采取以下措施:

  • 把任务委托给 Python 进程池
  • 把任务委托给外部任务队列
  • 使用 Cython、C、Rust,或者可编译成机器码、能与 Python/C API 交互的其他语言(最好能释放GIL)重写相关代码
  • 项目启动伊始就应该选定并集成外部任务队列,需要时拿来即用,不给团队成员添麻烦

小结

准备开发一个异步编程项目时,首要任务之一是决定使用哪些工具来完成 CPU 密集型任务。不要阻塞事件循环,速度慢的任务应委托其他处理单元。一旦编写第一个 async def,毫无疑问,程序中将有越来越多的 async defawaitasync withasync for,再想使用非异步库,难度可想而知。