对于线程,应用程序开发者遇到的使用场景,99% 的情况下只需知道如何派生一堆独立的线程,然后用队列收集结果。这篇文章讨论实现了 concurrent.futures.Executor 接口的类,这些类对上面这种使用场景进行了封装,使用起来特别容易,不仅能用于线程,而且还能用于进程处理计算密集型任务。同时还会介绍 future,这种对象表示异步执行的操作,类似于 JavaScript 中的 promise。
并发网络下载
为了高效处理网络 I/O,势必需要并发,我们不应漫无目的地等下去,在远程设备发回响应期间,可以让应用程序做些其他的事情。
- 对于网络 I/O 操作,不管使用哪种并发结构——线程或协程——只要代码写得没有问题,吞吐量都比依序执行的代码高很多
- 对于可以控制发起多少请求的 HTTP 客户端,线程与协程之间的性能差异不明显
如下是一个顺序下载的示例:
1 | import time |
- 这里使用了 httpx 库来发送 HTTP 请求,HTTPX 库受符合 Python 习惯用法的 requests 包启发,不过底层建构更符合现代化思想。更重要的是,HTTPX 同时提供了同步和异步 API,Python 标准库中的
urllib.request模块只有同步 API,而且对用户不友好
使用 concurrent.futures 模块下载
concurrent.futures 模块的功能主要由 ThreadPoolExecutor 和 ProcessPoolExecutor 类提供。这两个类实现的 API 能分别在不同的线程和进程中执行可调用对象。这两个类在内部维护着一个工作线程或进程池,以及分配任务和收集结果的队列。
1 | from concurrent import futures |
- 实例化
ThreadPoolExecutor,作为上下文管理器。executor.__exit__方法将调用executor.shutdown(wait=True),在所有线程都执行完毕前阻塞线程 - map 方法的作用与内置函数 map 类似,不过 download_one 函数会在多个线程中并发调用。map 方法返回一个生成器,通过迭代可以获取各个函数调用返回的值
- 如果有线程抛出异常,那么异常在 list 构造方法尝试从
executor.map返回的迭代器中获取相应的返回值时抛出
ThreadPoolExecutor 构造函数接受多个参数:
- 其中,第一个参数最为重要,即 max_workers。该参数设置最多执行多少个工作线程
- max_workers 的值为 None 时(默认值),会自动决定合适的线程数量,而且 ThreadPoolExecutor 不轻易启动新 worker
future 对象在哪里
future 对象是 concurrent.futures 模块和 asyncio 包的核心组件,可是,作为这两个库的用户,我们有时却见不到 future 对象。
从 Python 3.4 起,标准库中有两个名为 Future 的类:concurrent.futures.Future 和asyncio.Future。二者的作用相同:两个 Future 类的实例都表示可能已经完成或者尚未完成的延迟计算,这与现代 JavaScript 中的 Promise 类似。
future 对象封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或异常)后可以获取。但是要记住一点,future 对象不应自己动手创建,只能由并发框架(concurrent.futures 或 asyncio)实例化。原因很简单:future 对象表示终将运行的操作,必须排期运行,而这是框架的工作。具体而言:
-
只有把可调用对象提交给某个
concurrent.futures.Executor子类执行时才创建concurrent.futures.Future实例。例如,Executor.submit()方法接受一个可调用对象,排期执行,返回一个 Future 实例 -
应用程序代码不应改变 future 对象的状态,并发框架在 future 对象表示的延迟计算结束后改变 future 对象的状态,而我们无法掌控计算何时结束
-
两种 future 对象都有
.done()方法。该方法不阻塞,返回一个布尔值,指明 future 对象包装的可调用对象是否已经执行 -
客户代码通常不询问 future 对象是否运行结束,而是等待通知。因此,两个 Future 类都有
.add_done_callback()方法:提供一个可调用对象,在 future 对象执行完毕后调用。注意,回调的这个可调用对象与 future 对象包装的函数在同一个工作线程或进程中运行 -
此外,还有
.result()方法。该方法在两个 Future 类中的作用相同,当 future 对象运行结束后,返回可调用对象的结果,或者重新抛出执行可调用对象时抛出的异常,如果 future 对象没有运行结束:- 对于
concurrency.futures.Future实例,调用f.result()方法将阻塞调用方所在的线程,直到有结果返回。此时,result 方法可以接受可选的 timeout 参数 asyncio.Future.result()则会抛出InvalidStateError异常。对于 asyncio 包,获取 future 对象的结果首选 await
- 对于
这两个库中有几个函数返回 future 对象,其他函数则使用 future 对象,以用户易于理解的方式实现自身。Executor.map 方法属于后者,它返回一个迭代器,迭代器的 __next__ 方法调用各个 future 对象的 result 方法,因此我们得到的是各个 future 对象的结果,而非 future 对象本身
concurrent.futures.as_completed 函数的参数是一个 future 对象构成的可迭代对象,返回值是一个迭代器,在 future 对象运行结束后产出 future 对象。如下使用了 as_completed 函数重写这个例子:
1 | def download_many(cc_list: list[str]) -> int: |
这里又两个循环,一个用于创建并排定 future 对象,另一个用于获取 future 对象的结果。
executor.submit方法排定可调用对象的执行时间,返回一个 future 对象,表示待执行的操作- 存储各个 future 对象,后面传给 as_completed 函数
- as_completed 函数在 future 对象运行结束后产出对应的 future 对象,从而获取 future 对象的结果
- 在这个例子中,调用
future.result()方法绝不会阻塞,因为 future 由 as_completed 函数产出
使用 concurrent.futures 启动进程
ProcessPoolExecutor 和 ThreadPoolExecutor 类都实现了 Executor 接口,因此使用concurrent.futures 模块能特别轻松地把基于线程的方案转成基于进程的方案:
1 | def download_many(cc_list: list[str]) -> int: |
这样在 CPU 密集型作业中使用 concurrent.futures 轻松绕开 GIL。相较线程,进程使用的内存更多,启动时间更长,因此 ProcessPoolExecutor 的价值在 CPU 密集型作业中才能体现出来。
1 | import sys |
executor.map(check, numbers)返回结果的顺序与 numbers 中数的顺序一致- 这就可能导致虽然某些进程已经运行结束了,但是仍然要按照 numbers 所对应的 Future 对象的顺序产出结果
实验 Executor.map 方法
1 | from time import sleep, strftime |
1 | [21:50:48] Script starting. |
- 把 5 个任务提交给 executor。因为只有 3 个线程,所以只有 3 个任务立即开始:
loiter(0)、loiter(1)和loiter(2),这些是非阻塞调用 executor.map方法返回的结果(results)是一个生成器。不管有多少任务,也不管 max_workers 的值是多少,目前都不会阻塞- for 循环中的 enumerate 函数隐式调用
next(results),这个函数又在(内部)表示第一个任务(loiter(0)) 的 future 对象_f上调用_f.result()。result 方法会阻塞,直到 future 对象运行结束,因此这个循环每次迭代都要等待下一个结果做好准备
Executor.map 函数易于使用,不过通常最好等结果准备好之后再获取,不要考虑提交的顺序。为此,要把 Executor.submit 方法和 futures.as_completed 函数结合起来使用:
executor.submit和futures.as_completed这个组合比executor.map更灵活,因为submit方法能处理不同的可调用对象和参数,而executor.map只适用于使用不同参数调用同一个可调用对象- 传给
futures.as_completed函数的一系列 future 对象可以来自多个执行器,例如一些由ThreadPoolExecutor实例创建,另一些由ProcessPoolExecutor实例创建
在使用 futures.as_completed 函数时,有一个惯用法:构建一个字典,把各个 future 对象映射到 future 对象运行结束后可能有用的其他数据上。这样,尽管 future 对象生成的结果顺序已经乱了,依然便于使用结果做后续处理。
Python 线程特别适合 I/O 密集型应用程序,concurrent.futures 包大大简化了某些使用场景下 Python 线程的用法。另外,使用 ProcessPoolExecutor 还可以利用多核解决 CPU 密集型问题(如果是高度并行计算的话)。