0%

流畅的 Python 第 2 版(20):并发执行器

对于线程,应用程序开发者遇到的使用场景,99% 的情况下只需知道如何派生一堆独立的线程,然后用队列收集结果。这篇文章讨论实现了 concurrent.futures.Executor 接口的类,这些类对上面这种使用场景进行了封装,使用起来特别容易,不仅能用于线程,而且还能用于进程处理计算密集型任务。同时还会介绍 future,这种对象表示异步执行的操作,类似于 JavaScript 中的 promise。

并发网络下载

为了高效处理网络 I/O,势必需要并发,我们不应漫无目的地等下去,在远程设备发回响应期间,可以让应用程序做些其他的事情

  • 对于网络 I/O 操作,不管使用哪种并发结构——线程或协程——只要代码写得没有问题,吞吐量都比依序执行的代码高很多
  • 对于可以控制发起多少请求的 HTTP 客户端,线程与协程之间的性能差异不明显

如下是一个顺序下载的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import time
from pathlib import Path
from typing import Callable

import httpx
POP20_CC = ('CH IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://mp.ituring.com.cn/files'
DEST_DIR = Path('downloaded')

def save_flag(img: bytes, filename: str) -> None:
(DEST_DIR / filename).write_bytes(img)

def get_flag(cc: str) -> bytes:
url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
resp = httpx.get(url, timeout=6.1,
follow_redirects=True)
resp.raise_for_status()
return resp.content

def download_many(cc_list: list[str]) -> int:
for cc in sorted(cc_list):
image = get_flag(cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
return len(cc_list)

def main(downloader: Callable[[list[str]], int]) -> None:
DEST_DIR.mkdir(exist_ok=True)
t0 = time.perf_counter()
count = downloader(POP20_CC)
elapsed = time.perf_counter() - t0
print(f'\n{count} downloads in {elapsed:.2f}s')

if __name__ == '__main__':
main(download_many)
  • 这里使用了 httpx 库来发送 HTTP 请求,HTTPX 库受符合 Python 习惯用法的 requests 包启发,不过底层建构更符合现代化思想。更重要的是,HTTPX 同时提供了同步和异步 API,Python 标准库中的urllib.request 模块只有同步 API,而且对用户不友好

使用 concurrent.futures 模块下载

concurrent.futures 模块的功能主要由 ThreadPoolExecutor 和 ProcessPoolExecutor 类提供。这两个类实现的 API 能分别在不同的线程和进程中执行可调用对象。这两个类在内部维护着一个工作线程或进程池,以及分配任务和收集结果的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from concurrent import futures

from flags import save_flag, get_flag, main

def download_one(cc: str):
image = get_flag(cc)
save_flag(image, f'{cc}.gif')
print(cc, end=' ', flush=True)
return cc

def download_many(cc_list: list[str]) -> int:
with futures.ThreadPoolExecutor() as executor:
res = executor.map(download_one, sorted(cc_list))

return len(list(res))

if __name__ == '__main__':
main(download_many)
  • 实例化 ThreadPoolExecutor,作为上下文管理器。executor.__exit__ 方法将调用 executor.shutdown(wait=True),在所有线程都执行完毕前阻塞线程
  • map 方法的作用与内置函数 map 类似,不过 download_one 函数会在多个线程中并发调用。map 方法返回一个生成器,通过迭代可以获取各个函数调用返回的值
  • 如果有线程抛出异常,那么异常在 list 构造方法尝试从 executor.map 返回的迭代器中获取相应的返回值时抛出

ThreadPoolExecutor 构造函数接受多个参数:

  • 其中,第一个参数最为重要,即 max_workers。该参数设置最多执行多少个工作线程
  • max_workers 的值为 None 时(默认值)​,会自动决定合适的线程数量,而且 ThreadPoolExecutor 不轻易启动新 worker

future 对象在哪里

future 对象是 concurrent.futures 模块和 asyncio 包的核心组件,可是,作为这两个库的用户,我们有时却见不到 future 对象。

从 Python 3.4 起,标准库中有两个名为 Future 的类:concurrent.futures.Futureasyncio.Future。二者的作用相同:两个 Future 类的实例都表示可能已经完成或者尚未完成的延迟计算,这与现代 JavaScript 中的 Promise 类似。

future 对象封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或异常)后可以获取。但是要记住一点,future 对象不应自己动手创建,只能由并发框架(concurrent.futuresasyncio)实例化。原因很简单:future 对象表示终将运行的操作,必须排期运行,而这是框架的工作。具体而言:

  • 只有把可调用对象提交给某个 concurrent.futures.Executor 子类执行时才创建 concurrent.futures.Future 实例。例如,Executor.submit() 方法接受一个可调用对象,排期执行,返回一个 Future 实例

  • 应用程序代码不应改变 future 对象的状态,并发框架在 future 对象表示的延迟计算结束后改变 future 对象的状态,而我们无法掌控计算何时结束

  • 两种 future 对象都有 .done() 方法。该方法不阻塞,返回一个布尔值,指明 future 对象包装的可调用对象是否已经执行

  • 客户代码通常不询问 future 对象是否运行结束,而是等待通知。因此,两个 Future 类都有 .add_done_callback() 方法:提供一个可调用对象,在 future 对象执行完毕后调用。注意,回调的这个可调用对象与 future 对象包装的函数在同一个工作线程或进程中运行

  • 此外,还有 .result() 方法。该方法在两个 Future 类中的作用相同,当 future 对象运行结束后,返回可调用对象的结果,或者重新抛出执行可调用对象时抛出的异常,如果 future 对象没有运行结束:

    • 对于 concurrency.futures.Future 实例,调用 f.result() 方法将阻塞调用方所在的线程,直到有结果返回。此时,result 方法可以接受可选的 timeout 参数
    • asyncio.Future.result() 则会抛出 InvalidStateError 异常。对于 asyncio 包,获取 future 对象的结果首选 await

这两个库中有几个函数返回 future 对象,其他函数则使用 future 对象,以用户易于理解的方式实现自身。Executor.map 方法属于后者,它返回一个迭代器,迭代器的 __next__ 方法调用各个 future 对象的 result 方法,因此我们得到的是各个 future 对象的结果,而非 future 对象本身

concurrent.futures.as_completed 函数的参数是一个 future 对象构成的可迭代对象,返回值是一个迭代器,在 future 对象运行结束后产出 future 对象。如下使用了 as_completed 函数重写这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def download_many(cc_list: list[str]) -> int:
cc_list = cc_list[:5]
with futures.ThreadPoolExecutor(max_workers=3) as executor:
to_do: list[futures.Future] = []
for cc in sorted(cc_list):
future = executor.submit(download_one, cc)
to_do.append(future)
print(f'Scheduled for {cc}: {future}')

for count, future in enumerate(futures.as_completed(to_do), 1):
res: str = future.result()
print(f'{future} result: {res!r}')

return count

这里又两个循环,一个用于创建并排定 future 对象,另一个用于获取 future 对象的结果。

  • executor.submit 方法排定可调用对象的执行时间,返回一个 future 对象,表示待执行的操作
  • 存储各个 future 对象,后面传给 as_completed 函数
  • as_completed 函数在 future 对象运行结束后产出对应的 future 对象,从而获取 future 对象的结果
  • 在这个例子中,调用 future.result() 方法绝不会阻塞,因为 future 由 as_completed 函数产出

使用 concurrent.futures 启动进程

ProcessPoolExecutorThreadPoolExecutor 类都实现了 Executor 接口,因此使用concurrent.futures 模块能特别轻松地把基于线程的方案转成基于进程的方案:

1
2
def download_many(cc_list: list[str]) -> int:
with futures.ProcessPoolExecutor() as executor:

这样在 CPU 密集型作业中使用 concurrent.futures 轻松绕开 GIL。相较线程,进程使用的内存更多,启动时间更长,因此 ProcessPoolExecutor 的价值在 CPU 密集型作业中才能体现出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import sys
from concurrent import futures
from time import perf_counter
from typing import NamedTuple

from primes import is_prime, NUMBERS

class PrimeResult(NamedTuple):
n: int
flag: bool
elapsed: float

def check(n: int) -> PrimeResult:
t0 = perf_counter()
res = is_prime(n)
return PrimeResult(n, res, perf_counter() - t0)

def main() -> None:
if len(sys.argv) < 2:
workers = None
else:
workers = int(sys.argv[1])

executor = futures.ProcessPoolExecutor(workers)
actual_workers = executor._max_workers # type: ignore

print(f'Checking {len(NUMBERS)} numbers with {actual_workers} processes:')

t0 = perf_counter()

numbers = sorted(NUMBERS, reverse=True)
with executor:
for n, prime, elapsed in executor.map(check, numbers):
label = 'P' if prime else ' '
print(f'{n:16} {label} {elapsed:9.6f}s')

time = perf_counter() - t0
print(f'Total time: {time:.2f}s')

if __name__ == '__main__':
main()
  • executor.map(check, numbers) 返回结果的顺序与 numbers 中数的顺序一致
  • 这就可能导致虽然某些进程已经运行结束了,但是仍然要按照 numbers 所对应的 Future 对象的顺序产出结果

实验 Executor.map 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from time import sleep, strftime
from concurrent import futures

def display(*args):
print(strftime('[%H:%M:%S]'), end=' ')
print(*args)

def loiter(n):
msg = '{}loiter({}): doing nothing for {}s...'
display(msg.format('\t'*n, n, n))
sleep(n)
msg = '{}loiter({}): done.'
display(msg.format('\t'*n, n))
return n * 10

def main():
display('Script starting.')
executor = futures.ThreadPoolExecutor(max_workers=3)
results = executor.map(loiter, range(5))
display('results:', results)
display('Waiting for individual results:')
for i, result in enumerate(results):
display(f'result {i}: {result}')

if __name__ == '__main__':
main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[21:50:48] Script starting.
[21:50:48] loiter(0): doing nothing for 0s...
[21:50:48] loiter(0): done.
[21:50:48] loiter(1): doing nothing for 1s...
[21:50:48] loiter(3): doing nothing for 3s...
[21:50:48] loiter(2): doing nothing for 2s...
[21:50:48] results: <generator object Executor.map.<locals>.result_iterator at 0x1025457b0>
[21:50:48] Waiting for individual results:
[21:50:48] result 0: 0
[21:50:49] loiter(1): done.
[21:50:49] loiter(4): doing nothing for 4s...
[21:50:49] result 1: 10
[21:50:50] loiter(2): done.
[21:50:50] result 2: 20
[21:50:51] loiter(3): done.
[21:50:51] result 3: 30
[21:50:53] loiter(4): done.
[21:50:53] result 4: 40
  • 把 5 个任务提交给 executor。因为只有 3 个线程,所以只有 3 个任务立即开始:loiter(0)loiter(1)loiter(2),这些是非阻塞调用
  • executor.map 方法返回的结果(results)是一个生成器。不管有多少任务,也不管 max_workers 的值是多少,目前都不会阻塞
  • for 循环中的 enumerate 函数隐式调用 next(results),这个函数又在(内部)表示第一个任务(loiter(0)) 的 future 对象 _f 上调用 _f.result()。result 方法会阻塞,直到 future 对象运行结束,因此这个循环每次迭代都要等待下一个结果做好准备

Executor.map 函数易于使用,不过通常最好等结果准备好之后再获取,不要考虑提交的顺序。为此,要把 Executor.submit 方法和 futures.as_completed 函数结合起来使用:

  • executor.submitfutures.as_completed 这个组合比 executor.map 更灵活,因为submit 方法能处理不同的可调用对象和参数,而 executor.map 只适用于使用不同参数调用同一个可调用对象
  • 传给 futures.as_completed 函数的一系列 future 对象可以来自多个执行器,例如一些由 ThreadPoolExecutor 实例创建,另一些由 ProcessPoolExecutor 实例创建

在使用 futures.as_completed 函数时,有一个惯用法:构建一个字典,把各个 future 对象映射到 future 对象运行结束后可能有用的其他数据上。这样,尽管 future 对象生成的结果顺序已经乱了,依然便于使用结果做后续处理。

Python 线程特别适合 I/O 密集型应用程序,concurrent.futures 包大大简化了某些使用场景下 Python 线程的用法。另外,使用 ProcessPoolExecutor 还可以利用多核解决 CPU 密集型问题(如果是高度并行计算的话)。