0%

流畅的 Python(11):协程

yield 对于 Python 来说,有两个含义:

  • yield item 会产生一个值,提供给 next(…) 的调用方
  • 此外还会做出让步,暂停执行生成器,让调用方继续工作,直到需要使用另一个值时再调用 next()。调用方会从生成器中拉取值

从语法上来看,协程与生成器都是类似的,都是定义体中包含 yield 关键字的函数。但是在协程中,yield 通常出现在表达式的右边,可以产出值,也可以不产生。如果 yield 关键字后面没有表达式,那么生成器产出 None。协程可能会从调用方接收数据,调用方把数据提供给协程使用的是 .send(datum) 方法,而不是 next(…) 函数。通常调用方会把值推送给协程。

yield 关键字甚至可以不接收或传出数据,不管数据如何流动,yield 都是一种流程控制工具,使用它可以实现协作时多任务:协程可以把控制流让步给中心调度程序,从而激活其他的协程。从根本上把 yield 视作控制流程的方式,这样就好理解协程了

生成器如何进化成协程

yield 关键字可以在表达式中使用,而且生成器 API 中增加了 .send(value) 方法,生成器的调用方可以使用 .send(...) 方法发送数据,发送的数据会成为生成器函数中 yield 表达式的值。因此生成器可以作为协程使用,协程是指一个过程,这个过程与调用方协作,产出由调用方提供的值

用作协程的生成器的基本行为

如下展示了一个基本的协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
>>> def simple_coroutine():
... print('-> coroutine started')
... x = yield
... print('-> coroutine received', x)
...
>>> my_cro = simple_coroutine()
>>> my_cro
<generator object simple_coroutine at 0x1070f73c0>
>>> t = next(my_cro)
-> coroutine started
>>> t
>>> my_cro.send(42)
-> coroutine received 42
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
  • 协程使用生成器函数定义:定义体中有 yield 关键字
  • yield 在表达式中使用,如果协程只需要从客户那里接收数据,那么产出的值是 None:该值是隐式的,因为 yield 关键字右边没有表达式
  • 与创建生成器的方式一样,调用函数得到生成器对象
  • 首先需要调用 next() 函数,因为生成器还没有启动,没有在 yield 语句处暂停,所以一开始无法接收数据
  • 在生成器调用 send() 后,协程会恢复,且 yield 表达式会计算出值为 42,协程继续运行直到下一个 yield 表达式,或者终止
  • 最后控制权流动到协程定义体的末尾,导致生成器像往常一样抛出 StopIteration 异常

协程可以处于以下 4 个状态之一:

  • GEN_CREATED:等待开始执行
  • GEN_RUNNING:解释器正在运行
  • GEN_SUSPEND:在 yield 表达处暂停
  • GEN_CLOSED:执行结束

可以使用 inspect.getgeneratorstate() 获取当前状态。因为 send 方法的参数会成为暂停的 yield 表达式的值,因此仅当协程处于暂停状态时才能调用 send 方法。但是如果协程还没有激活,可以调用 next(next_mycro) 激活协程,也可以调用 my_cro.send(None) 激活协程。如果创建协程后,立即把 None 之外的值发送给它,会出现错误。

如下完整地展示了这一过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
>>> def simple_coro2(a):
... print('-> Started: a =', a)
... b = yield a
... print('-> Received: b=', b)
... c = yield a + b
... print('-> Received: c=', c)
...
>>> my_cro2 = simple_coro2(14)
>>> from inspect import getgeneratorstate
>>> getgeneratorstate(my_cro2)
'GEN_CREATED'
>>> next(my_cro2)
-> Started: a = 14
14
>>> getgeneratorstate(my_cro2)
'GEN_SUSPENDED'
>>> my_cro2.send(28)
-> Received: b= 28
42
>>> my_cro2.send(99)
-> Received: c= 99
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
>>> getgeneratorstate(my_cro2)
'GEN_CLOSED'

这里最关键的一点是,协程在 yield 关键字所在位置暂停执行。对于赋值语句而言,= 右边的代码在赋值前执行。对于 b = yield a,需要等到调用方再次激活协程时才会设定 b 的值。

使用协程计算移动平均值

如下代码实现了一个计算移动平均值的协程:

1
2
3
4
5
6
7
8
9
def averager():
total = 0.0
count = 0
average = None
while True:
term = yield average
total += term
count += 1
average = total / count
1
2
3
4
5
6
7
8
9
>>> from coroaverager import averager
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
10.0
>>> coro_avg.send(20)
15.0
>>> coro_avg.send(30)
20.0

这里使用协程的好处是,total 和 count 声明为局部变量即可,无需使用实例属性或者闭包在多次调用之间保持上下文

预激协程的装饰器

如果不预激,那么协程没有什么用。对协程调用 send 之前一定要使用 next() 来预激协程。为了简化协程的用法,有时会使用一个预激装饰器。如下展示了一个预激协程的装饰器:

1
2
3
4
5
6
7
8
9
from functools import wraps

def coroutine(func):
@wraps(func)
def primer(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return primer
  • 被装饰的生成器函数替换成 primer 函数,而调用 primer 函数则直接返回预激后的生成器
  • 调用被装饰的函数,获取生成器对象

很多框架都提供了处理协程的特殊装饰器,但是不是所有装饰器都用于预激协程,有些会提供其他服务。使用 yield from 语法调用协程时会自动预激

终止协程和异常处理

协程未处理的异常会向上冒泡,传给 next() 函数或 send 方法的调用方。

1
2
3
4
5
6
7
8
9
10
11
12
>>> coro_avr.send(10)
10.0
>>> coro_avr.send("test")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/fuchencong/data/workspace/code/private/fluent_python/coroaverager.py", line 11, in averager
total += term
TypeError: unsupported operand type(s) for +=: 'float' and 'str'
>>> coro_avr.send(60)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

上面的例子展示了由于协程内部没有处理异常,协程会终止。如果重新激活协程,则会抛出 StopIteration 异常。这个例子也暗示了终止协程的一种方式:发送某个哨符值,让协程退出。内置的 None 和 Ellipsis 等常量经常用作哨符值。

客户代码可以在生成器对象上调用两个方法,显式地把异常发送给协程:

  • generator.throw(exec_type[, exc_value[, traceback]]):使生成器在暂停的的 yield 表达式处抛出指定的异常。如果生成器处理了抛出的异常,异常代码会向前执行到下一个 yield 表达式,而产出的值会成为调用 generator.throw 方法得到的返回值。如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中

  • generator.close():使生成器在暂停的 yield 表达式处抛出 GeneratorExit 异常,如果生成器没有处理这个异常,或者抛出了 StopIteration 异常(通常是运行到结尾),调用方不会报错。如果收到了 GeneratorExit 异常,生成器一定不能产出值,否则解释器会抛出 RuntimeError 异常。生成器抛出的其他异常会向上冒泡,传给调用方。

如果不管协程如何结束都想做一些清理工作,要把协程定义体中相关的代码放在 try/finally 块中。

让协程返回值

如下展示了 averager 协程的另一个版本,该版本每次激活协程时不会产出移动平均值,而是在最后返回一个值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python3

# Copyright (C) fuchencong.com

from collections import namedtuple

Result = namedtuple('Result', 'count average')


def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)
1
2
3
4
5
6
7
8
9
10
>>> from coroaverager_v2 import averager
>>> coro_avg = averager()
>>> next(coro_avg)
>>> coro_avg.send(10)
>>> coro_avg.send(20)
>>> coro_avg.send(30)
>>> coro_avg.send(None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: Result(count=3, average=20.0)

注意,return 表达式的值会偷偷传递给调用方,赋值给 StopIteration 异常的一个属性。这样做有点不合理,但是能够保留生成器对象的常规行为:耗尽时抛出 StopIteration 异常。yield from 结构会在内部自动捕获 StopIteration 异常,同时把 value 属性的值变成 yield from 表达式的值

使用 yield from

yield from 是全新的语言结构,在其他语言中类似的结构使用 await 关键字,它传到了重要的一点:在生成器 gen 中使用 yield from subgen() 时,subgen 会获得控制权,把产出的值传给 gen 的调用方。即调用方可以直接控制 subgen,与此同时 gen 会阻塞,等待 subgen 终止。

之前介绍过,yield from 可以简化 for 循环中的 yield 表达式。yield from x 表达式对 x 对象所做的第一件事,调用 iter(x),从中获取迭代器。因此 x 可以是任何可迭代的对象。但是 yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在中间的协程中添加大量处理异常的样板代码。有了该结构,协程可以通过以前无法实现的方式来委托职责。

  • 委派生成器:包含 yield from <iterable> 表达式的生成器函数
  • 子生成器:从 yield from 表达式中 <iterable> 部分获取的生成器
  • 调用方:指调用委派生成器的客户端代码

委派生成器在 yield from 表达式处暂停,调用方可以直接把数据发送给子生成器,子生成器再把产出的值发送给调用方。子生成器返回后,解释器会抛出 StopIteration 异常,并把附加值加到异常对象上,此时委派生成器会恢复。如下展示了使用 yield from 的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from collections import namedtuple

Result = namedtuple('Result', 'count average')


def averager():
total = 0.0
count = 0
average = None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)


def grouper(results, key):
while True:
results[key] = yield from averager()


def main(data):
for key, values in data.items():
group = grouper(results, key)
next(group)
for value in values:
group.send(value)
group.send(None)

report(results)


def report(results):
for key, result in sorted(results.items()):
group, unit = key.split(";")
print('{:2}.{:5} averaging {:.2f}{}'.format(
result.count, group, result.average, unit))

data = {
'girs;kg':
[40, 41, 42, 43, 44, 45],
'girls;m':
[1.62, 1.65, 1.66, 1.67, 1.68, 1.69],
'boys;kg':
[50, 51, 52, 53, 55, 55],
'boys;m':
[1.72, 1.75, 1.77, 1.77, 1.78, 1.79],
}


if __name__ == '__main__':
main(data)

grouper 发送的每个值都会经由 yield from 处理,通过管道传送给 averager 实例。grouper 会在 yield from 表达式处暂停,等待 averager 实例处理客户端发来的值。grouper 永远不知道传入的值是什么。当把 None 传入 grouper 后,导致当前的 averager 实例终止,也让 grouper 继续运行,再创建一个 averager 实例,处理下一组值。这里最关键的一点是,如果子生成器不终止,委派生成器会在 yield from 表达式处永远暂停。如果是这样,程序不会向前执行,因为 yield from 把控制权交给客户代码了,此时任务无法完成。

因为委派生成器相当于管道,所以可以把任意数量的个委派生成器连接在一起:

  • 一个委派生成器使用 yield from 调用另一个子生成器,而该子生成器本身也是委派生成器,使用 yield from 调用另一个子生成器
  • 最终该链条以一个只使用 yield 表达式的简单生成器(或者任何可迭代对象)结束
  • 任何 yield from 链条都必须由客户驱动,在最外层委派生成器上调用 next(…) 或 send(…) 方法。可以隐式调用,例如 for 循环

yield from 的意义

这里说明一下 yield from 的行为:

  • 子生成器产出的值都直接传给委派生成器的调用方
  • 使用 send() 方法发送给委派生成器的值都直接传给子生成器。如果发送的值是 None,则调用子生成器的 __next__ 方法,否则调用其 send() 方法。如果调用的方法抛出 StopIteration 异常,则委派生成器恢复运行。任何其他异常都会向上冒泡,传给委派生成器
  • 生成器退出时,生成器(或子生成器)中的 return 表达式会触发 StopIteration 异常
  • yield from 表达式的值是子生成器终止时传给 StopIteration 异常的第一个参数
  • 传给委派生成器的异常,除了 GeneratorExit 之外都传给子生成器的 throw 方法。如果调用 throw 方法抛出 StopIteration 异常,委派生成器恢复运行,StopIteration 之外的异常向上冒泡,传给委派生成器。
  • 如果把 GeneratorExit 异常传给委派生成器,或者在委派生成器上调用 close() 方法,那么则在子生成器上调用 close() 方法(如果存在的话)。如果调用 close() 方法抛出异常,那么异常会向上冒泡,传给委派生成器,否则委派生成器会抛出 GeneratorExit 异常。

对于 yield from EXPR,其含义可以用如下伪代码表示:

1
2
3
4
5
6
7
8
9
10
11
12
13
_i = iter(EXPR)
try:
_y = next(_i)
except StopIteration as _e:
_r = _e.value
else:
while 1:
_s = yield _y
try:
_y = _i.send(s)
except StopIteration as _e:
_r = _e.value
break

现实情况还要复杂一些,因为要处理客户对 .throw().close() 方法的调用,而这两个方法的操作必须传入子生成器。此外,子生成器也可能是普通的迭代器,不支持 .throw().close() 方法。因此 yield from 结构的逻辑必须处理这个情况。

使用案例:使用协程做离散事件仿真

协程能自然地表述很多算法,例如仿真、游戏、异步 I/O,以及其他事件驱动型编程形式或协作式多任务。接下来分析一个使用协程的经典案例,仿真编程,说明如何只使用协程和标准库中的对象实现一个特别简单的仿真系统,该仿真系统是对出租车队运营的仿真:

1
2
3
4
5
6
7
8
Event = collections.namedtuple('Event', 'time proc action')

def taxi_process(ident, trips, start_time=0):
time = yield Event(start_time, ident, 'leave garage')
for i in range(trips):
time = yield Event(time, ident, 'pick up passenger')
time = yield Event(time, ident, 'drop off passenger')
yield Event(time, ident, 'going home')
  • 在 Event 实例中,time 是事件发生时的仿真时间,proc 字段是出租车的编号,action 字段是描述活动的字符串
  • 每辆出租车调用一次 taxi_process 函数,创建一个生成器对象,表示各个出租车的运营情况

在这个仿真系统中,各个出租车协程由 Simulator.run 方法中的主循环驱动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Simulator:
def __init__(self, procs_map):
self.events = queue.PriorityQueue()
self.procs = dict(procs_map)

def run(self, end_time):
for _, proc in sorted(self.procs.items()):
first_event = next(proc)
self.events.put(first_event)

sim_time = 0
while sim_time < end_time:
if self.events.empty():
print("*** end of events ***")
current_event = self.events.get()
sim_time, proc_id, previous_action = current_event
print('taxi:', proc_id, proc_id * ' ', current_event)
active_proc = self.procs[proc_id]
next_time = sim_time + compute_duration(previous_action)
try:
next_event = active_proc.send(next_time)
except StopIteration:
del self.procs[proc_id]
else:
self.events.put(next_event)
else:
msg = '*** end of simulation time: {} events pending ***'
print(msg.format(self.event.qsize()))

该示例主要是为了说明如何在一个主循环中处理事件,以及如何通过发送数据驱动协程。这是 asyncio 包的基本思想

生成器有三种不同的代码编写风格:传统的拉取式(迭代器)、推送式、任务式。这个离散事件仿真系统示例,说明了如何使用生成器替代线程和回调,实现并发。该实例虽然简单,但是说明了事件驱动型框架的运作方式:在单个线程中使用一个主循环驱动协程协程执行并发活动。使用协程做面向事件编程时,协程会不断把控制权让步给主循环,激活并向前运行其他协程,从而执行各个并发活动。这是一种协作式多任务:协程显式自主地把控制权让步给中央调度程序,而多线程实现的是抢占式多任务。调度程序可以在任何时刻暂停线程,把控制权让给其他线程

最后需要说明一下,这里对协程的定义是宽泛、不正式的,即:通过客户调用 .send(...) 方法发送数据或使用 yield from 结构驱动的生成器。

使用期物处理并发

接下来将讨论 Python3.2 引入的 concurrent.futures 模块,然后还会介绍 期物(future)的概念,期物指一种对象,表示异步执行的操作。期物是一种对象,表示异步执行的操作,这个概念的作用很大,是 concurrent.futures 模块和 asyncio 包的基础。

示例:网络下载的三种风格

为了高效处理网络 I/O,需要使用并发,因为网络有很高的延迟,所以为了不浪费 CPU 周期去等待,最好在收到网络响应之前去做其他的事情。

如下示例中,第一个程序依序下载每个国家的国旗:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
import sys
import time

import requests

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'


def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)


def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = requests.get(url)
return resp.content

def show(text):
print(text, end=' ')
sys.stdout.flush()

def download_many(cc_list):
for cc in sorted(cc_list):
img = get_flag(cc)
show(cc)
save_flag(img, cc.lower() + '.gif')


def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))


if __name__ == '__main__':
main(download_many)

接下来的程序使用 concurrent.futures 模块下载。concurrent.futures 模块的主要特色是 ThreadPoolExecutor 和 ProcessPoolExecutor 类,这两个类实现的接口能分别在不同的线程或进程中执行可调用的对象。这两个类在内部维护者一个同坐线程或进程池,以及要执行的任务队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from concurrent import futures
from flags import save_flag, get_flag, show, main

MAX_WORKERS = 20


def download_one(cc):
img = get_flag(cc)
show(cc)
save_flag(img, cc.lower() + '.gif')
return cc


def download_many(cc_list):
workers = min(MAX_WORKERS, len(cc_list))
with futures.ThreadPoolExecutor(workers) as executor:
res = executor.map(download_one, sorted(cc_list))
return len(list(res))


if __name__ == '__main__':
main(download_many)

这里 download_one 函数其实是 download_many 函数中的 for 循环体,编写并发代码时经常这样重构:把依序执行的 for 循环体改成函数,以便并发调用。

期物是 concurrent.futures 模块和 asyncio 包的重要组件。concurrent.futures.Future 和 asyncio.Future 两个类的作用相同:两个 Future 类的实例都表示可能已经完成或者尚未完成的延迟计算。期物封装待完成的操作,可以放入队列,完成的状态可以查询,得到结果(或抛出异常)后可以获取结果。通常情况下,我们不应该自己创建期物,而只能由并发框架实例化,因为期物表示终将发生的事情,而确定某件事情会发生的唯一方式是执行的时间已经排定。客户端代码不应该改变期物的状态,并发框架在期物表示的延迟计算结束后会改变期物的状态,而我们无法控制计算何时结束。

  • 这两种期物都有 .done() 方法,返回布尔值,表示期物链接的可调用对象是否已经执行
  • .result 方法在期物运行结束后调用的话,返回可调用对象的结果,或者重新抛出执行可调用的对象时抛出的异常
  • 通过 .add_done_callback() 方法可以指定一个可调用对象,期物运行结束后会调用指定的可调用对象

如下程序使用期物中的 as_completed 函数来重新实现 download_many 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def download_many(cc_list):
cc_list = cc_list[:5]
with futures.ThreadPoolExecutor(max_workers=3) as executor:
to_do = []
for cc in sorted(cc_list):
future = executor.submit(download_one, cc)
to_do.append(future)
msg = 'Scheduled for {}: {}'
print(msg.format(cc, future))

results = []
for future in futures.as_completed(to_do):
res = future.result()
msg = '{} result: {!r}'
print(msg.format(future, res))
results.append(res)

return len(results)
  • as_completed 函数的参数是一个期物列表,返回值是一个迭代器,在期物运行结束后产出期物
  • 这里 future.result() 方法绝对不会阻塞,因为 future 由 as_completed 函数产出

严格来说,目前测试的并发脚本都不能并发下载。因为目前 concurrent.futures 库实现的示例受 GIL(Global Interpreter Lock,全局解释器锁) 的限制。

阻塞型 I/O 和 GIL

CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只能允许一个线程执行 Python 字节码。因此一个 Python 进程通常不能使用多个 CPU 核心。

编写 Python 代码时无法控制 GIL,但是执行耗时任务时,可以使用一个内置的函数或一个使用 C 语言编写的扩展来释放 GIL。但是标准库所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放 GIL。这意味在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能够从中受益:一个 Python 程序等待网络响应时,阻塞型 I/O 函数(包括 time.sleep() 函数)会释放 GIL,再运行一个线程。

使用 concurrent.futures 模块启动进程

concurrent.futures 模块其实也可以实现真正的并行计算(注意并发与并行的区别),因为它可以使用 ProcessPoolExecutor 类把工作分配给多个 Python 进程处理。因此如果需要做 CPU 密集型处理,使用这个模块可以绕过 GIL,利用所有可用的 CPU 核心。 ProcessPoolExecutor 和 ThreadPoolExecutor 类都实现了通用的 Executor 接口,因此使用 concurrent.futures 模块可以轻松地把基于线程的方案转换成基于进程的方案。

在 ProcessPoolExecutor 类中,指定进程池中进程数量的参数是可选的,默认值是 os.cpu_count() 函数返回的 CPU 数量。ProcessPoolExecutor 的价值体现在 CPU 密集型作业上。另外,如果使用 Python 处理 CPU 密集型工作,可以尝试一下 PyPy。

实验 Executor.map 方法

如果想并发运行多个可调用对象,最简单的方法是通过 Executor.map 方法。如下展示了一个 Executor.map 使用案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from time import sleep, strftime
from concurrent import futures


def display(*args):
print(strftime('[%H:%M:%S]'), end=' ')
print(*args)


def loiter(n):
msg = "{}loiter({}): doing nothing for {}s..."
display(msg.format('\t'*n, n, n))
sleep(n)
msg = '{}loiter({}): done'
display(msg.format('\t'*n, n))
return n * 10


def main():
display('Script starting.')
executor = futures.ThreadPoolExecutor(max_workers=3)
results = executor.map(loiter, range(5))
display('results:', results)
display('Waiting for individual results:')
for i, result in enumerate(results):
display('result{}: {}'.format(i, result))

main()

脚本的运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
$ python3 executor_map.py
[18:17:36] Script starting.
[18:17:36] loiter(0): doing nothing for 0s...
[18:17:36] loiter(1): doing nothing for 1s...
[18:17:36] loiter(0): done
[18:17:36] loiter(2): doing nothing for 2s...
[18:17:36] results: <generator object Executor.map.<locals>.result_iterator at 0x10e9652e0>
[18:17:36] Waiting for individual results:
[18:17:36] result0: 0
[18:17:36] loiter(3): doing nothing for 3s...
[18:17:37] loiter(1): done
[18:17:37] loiter(4): doing nothing for 4s...
[18:17:37] result1: 10
[18:17:38] loiter(2): done
[18:17:38] result2: 20
[18:17:39] loiter(3): done
[18:17:39] result3: 30
[18:17:41] loiter(4): done
[18:17:41] result4: 40
  • 这里把五个任务提交给 executor(因为只有三个线程,所以只有 3 个任务会立即开始),它是非阻塞调用
  • for 循环中的 enumerate 函数会调用 next(results),该函数会在表示第一个任务的期物上调用 results 方法,该方法会阻塞,直到期物运行结束。因此这个循环每次迭代时都要等待下一个结果做好准备

需要注意,Executor 函数返回结果的顺序与调用开始的顺序一致。如果希望:不管提交的顺序,只要有结果就获取。要把 Executor.submit 方法和 futures.as_completed 函数结合起来使用。

线程和多进程的替代方案

Python 线程特别适合于 I/O 密集型应用,concurrent.futures 模块大大简化了某些场景下 Python 线程的用法。如果 futures.ThreadPoolExecutor 类对某个作业来说不够灵活,可能要使用 threading 模块中的组件(如 Thread、Lock、Seamaphore 等)自行定制方案。

而对 CPU 密集型工作来说,要启动多个进程,来规避 GIL。创建多个进程最简单的方式是,使用 futures.ProcessPoolExecutor 类。如果使用的场景较为复杂,则需要更高级的工具。multiprocessing 模块的 API 与 threading 模块相仿,但是作业是交给多个进程处理。

使用 asyncio 包处理并发

并发是指一次处理多件事,并行是指一次做多件事。二者不同,但是是由联系。一个关于结构,一个关于执行。并发用于指定方案,用来解决可能(但未必)并行的问题。真正的并行需要多个核心。

这篇文章介绍 asyncio 包,这个包使用事件循环驱动的协程实现并发。

线程与协程对比

首先看一个示例程序,它显式一个动画效果,然后 3 s 后停止显式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import threading
import itertools
import time
import sys


class Signal:
go = True


def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
time.sleep(1)
if not signal.go:
break

write(' ' * len(status) + '\x08' * len(status))

def slow_function():
time.sleep(3)
return 42


def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin, args=('thinking!', signal))
print('spiner object:', spinner)
spinner.start()
result = slow_function()
signal.go = False
spinner.join()
return result


def main():
result = supervisor()
print('Answer:', result)


if __name__ == '__main__':
main()

这里解释一下:

  • 使用退格符 \x08 把光标移动回来,这也是显示文本式动画的诀窍
  • 主线程通过 sleep 函数被阻塞,此时会释放 GIL,从而从属线程有创建并运行的时机
  • 通过改变 signal 的状态,来使主线程控制从属线程的退出。Python 没有提供终止线程的 API,这是有意为之。若想关闭线程,必须给线程发送消息。

接下来使用 @asyncio.coroutine 装饰器替代线程,实现相同的行为。之前说过,asyncio 包实现的协程是较为严格的定义,适合 asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用,或者把协程传给 asyncio 包中的某个函数(例如 asncio.async(…)) 从而驱动协程。最后 @asyncio.coroutine 装饰器应该应用在协程上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import asyncio
import itertools
import sys


@asyncio.coroutine
def spin(msg):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\'):
status = char + ' ' + msg
write(status)
flush()
write('\x08' * len(status))
try:
yield from asyncio.sleep(.1)
except asyncio.CancelledError:
break
write(' ' * len(status) + '\x08' * len(status))


@asyncio.coroutine
def slow_function():
yield from asyncio.sleep(3)
return 42

@asyncio.coroutine
def supervisor():
spinner = asyncio.async(spin('thinking!'))
print('spinner object:', spinner)
result = yield from slow_function()
spinner.cancel()
return result


def main():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(supervisor)
loop.close()
print('Answer:', result)


if __name__ == '__main__':
main()
  • 打算交给 asyncio 处理的协程都应该使用 @asyncio.coroutine 装饰
  • 使用 yield from asyncio.sleep(.1) 替代 time.sleep(.1),这样的休眠不会阻塞事件循环

asyncio.Task 对象差不多与 threading.Thread 对象等效。Task 对象是实现协作式多任务的库中的绿色线程。Task 对象用于驱动协程,Thread 对象用于调用可调用的对象。Task 对象不是由自己动手实例化,而是通过把协程传给 asyncio.asyncloop.create_task() 方法获取

线程与协程之间的对比,还有重要一点需要说明。由于调度程序能够在任何时候中断线程,因此必须记住保留锁,去保护程序中的重要数据。而协程默认会做好全方位保护,以防止中断。必须显式产出才能让程序的余下部分运行。对于协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任意时刻只能有一个协程运行。如果想交出控制权时,可以使用 yieldyield from 把控制权交还给调度程序。

在 asyncio 包中,BaseEventLoop.create_task() 方法接收一个协程,排定它的运行时间,然后返回一个 asyncio.Task 实例,即 asyncio.Future 类的实例。asyncio.Future 类的 .result() 方法没有参数,不能指定超时时间,调用该方法时如果期物还没有运行完毕,那么 .result() 方法不会阻塞去等待结果,而是抛出 asyncio.InvalidStateError 异常。其实,获取 asyncio.Future 对象的结果通常使用 yield from。使用 yield from 处理期物,等待期物运行完毕这一步无需我们关心,而且不会阻塞事件循环。在 asyncio 包中,yield from 的作用是把控制权还给事件循环。

asyncio.Future 类的目的是与 yield from 一起使用,所以通常不需要使用以下方法:

  • 无需调用 my_future.add_done_callback(…),因为可以直接把想在期物运行结束后执行的操作放在协程的 yield from my_future 表达式后面。协程是可以暂停和恢复的函数。
  • 无需调用 my_future.result(),因为 yield from 从期物中产出的值就是结果。

从期物、任务和协程中产出

在 asyncio 包中,期物和协程关系紧密,因为可以使用 yield from 从 asyncio.Future 对象中产出结果。如果 foo 是协程函数(调用后返回协程对象),亦或是返回 Future 或 Task 实例的普通函数,那么可以这样写:res = yield from foo(),这也是 asyncio 包的 API 中很多地方可以互换协程与期物的原因之一。

对于协程来说,获取 Task 对象有两种主要的方式:

  • asyncio.async:这个函数统一了协程和期物。如果第一个参数是 Future 或 Task 对象,那么就原封不动的返回。如果是协程,它会调用 loop.create_task() 创建 Task 对象。
  • BaseEventLoop.create_task 方法排定协程的执行时间,返回一个 asyncio.Task 对象。

asyncio 包中有多个函数会自动把参数指定的协程包装在 asyncio.Task 对象中。

asyncio 的事件循环在背后维护着一个 ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法,把可调用的对象发给它执行。

使用 asyncio 和 aiohttp 包下载

asyncio 包只直接支持 TCP 和 UDP,如果想要使用 HTTP 或其他协议,那么要借助第三方包(如 aiohttp 包)。在 asyncio 中,基本的流程是一样的:在一个单线程程序中使用主循环依次激活队列里的协程。各个协程向前执行几步,然后把控制权让给主循环,主循环再激活队列里的下一个协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
import aiohttp

from flags import BASE_URL, save_flag, show, main


@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request("GET", url)
image = yield from resp.read()
return image


@asyncio.coroutine
def download_one(cc):
image = yield from get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc


def download_many(cc_list):
loop = asyncio.get_event_loop()
to_do = [download_one(cc) for cc in sorted(cc_list)]
wait_coro = asyncio.wait(to_do)
res, _ = loop.run_until_complete(wait_coro)
loop.close()

return len(res)


if __name__ == '__main__':
main(download_many)

这里阻塞的操作通过协程实现。为了使用 asyncio 包,必须把每个访问网络的函数改成异步版本,使用 yield from 处理网络操作,这样才能把控制权交还给事件循环。asyncio.wait(...) 协程的参数是一个由期物或协程构成的可迭代的对象。wait 会分别把各个协程包装进一个 task 对象。最终的结果是,wait 处理的所有对象都通过某种方式变成 Future 类的实例。wait 是协程函数,因此返回的是一个协程或生成器对象。为了驱动协程,把该对象传给 loop.run_until_complete(…) 方法。

yield from foo 语法能够防止阻塞,是因为当前协程(即包含 yield from 代码的委派生成器)暂停后,控制权回到事件循环手中,再驱动其他协程。foo 期物或者协程运行完毕后,把结果返回给暂停的协程,将其恢复。关于 yield from 的用法,之前介绍过:

  • 使用 yield from 链接的多个协程最终必须由不是协程的调用方驱动,调用方显式或隐式在最外层委派生成器上调用 next(…) 函数或者 .send(…) 方法
  • 链条中最内层的子生成器必须是简单生成器(只使用 yield)或可迭代对象

在 asyncio 包的 API 中使用 yield from 时,这两点都成立。但是需要注意以下细节:

  • 我们编写的协程链条始终通过把最外层委派生成器传给 asyncio 包 API 中的某个函数(例如 loop.run_until_complete(…))驱动
  • 我们编写的协程链条最终是通过 yield from 把职责委托给 asyncio 包中的某个协程函数或协程方法,或者其他库中实现高层协议的协程

概括起来就是:使用 asyncio 包时,我们编写的异步代码中包含让 asyncio 本身驱动的协程(即委派生成器),而生成器最终把职责委托给 asyncio 包或第三方库(如 aiohttp)中的协程。这种处理方式相当于架起了管道,让 asyncio 事件循环通过我们编写的协程驱动执行底层异步 IO 操作的库函数

Semaphore

Semaphore 对象维护着一个内部计数器,若在该对象上调用 .acquire() 方法,计数器则递减,如果在对象上调用 .release() 协程方法,计数器则递增。计数器的初始值在实例化 Semaphore 时设定。如果计数器的值大于 0,那么调用 .acquire() 方法不会阻塞,可是如果计数器为 0,那么 .acquire() 方法会阻塞调用这个方法的协程。

避免阻塞型调用

有两种方法能够避免阻塞型调用阻塞整个应用程序的进程:

  • 在单独的线程中执行各个阻塞型操作
  • 把每个阻塞型操作转换成非阻塞的异步调用使用

多个线程是可以的,但是多个线程是有开销的。为了降低内存的消耗,通常使用回调来实现异步调用。使用回调时,我们不等待响应,而是注册一个函数,在发生某件事时调用。这样,所有调用都是非阻塞的。当然,只有异步应用程序底层的事件循环能够依靠基础设置的中断、线程、轮询和后台进程等,确保多个并发请求能取得进展并最终完成,这样才能使用回调。

把生成器当做协程使用是异步编程的另一种方式。对事件循环来说,调用回调与在暂停的协程上调用 .send() 方法效果差不多。因为异步操作是交叉执行的,所以并发下载多张图像所需的总时间比依序下载少得多。

从回调到期物和协程

使用协程做面向事件编程,需要下一番功夫才能掌握。我们需要了解与经典的回调式编程相比,协程有哪些改进。回调式面向事件编程需要面对 回调地狱 这个问题,也就是说,如果一个操作需要依赖之前操作的结果,那么就得嵌套回调。在这种编程模式中,每个函数做一部分工作,设置下一个回调,然后返回,让事件循环继续执行。这样所有的本地上下文都会丢失。如果需要知道这个上下文,那么就必须依靠闭包,或者把它存储在外部数据结构中,以便在处理过程的不同阶段使用。

在这个问题上,协程能发挥很大的作用。在协程中,如果要连续执行 3 个异步操作,只需要使用 yield 3 次。让事件循环继续执行。准备好结果后,调用 .send() 方法激活协程。对于事件循环来说,这种做法与调用回调类似。但是对于使用协程式异步 API 的用户来说,情况就大为不同了:3 次操作都是在同一个函数定义体中,像是顺序代码,能够在处理过程中使用局部变量保留整个任务的上下文。

如下展示了这种实例:

1
2
3
4
5
6
7
8
9
10
11
12
@asyncio.coroutine
def three_stages(request1):
response1 = yield from api_call1(request1)

request2 = step1(response1)
response2 = yield from api_call2(request2)

request3 = step2(response2)
response3 = yield from api_call3(request3)
step3(response3)

loop.create_task(three_stages(request1))

而且对于回调式编程,为了处理错误,通常需要注册两个回调。一个用于处理操作成功时返回的结果,一个用于处理错误。一旦涉及错误处理,回调地域的危害程度就会迅速增大。而在协程编程模式中,如果异步调用会抛出异常,可以把响应的 yield from 表达式放在 try/except 块中处理。

但是这也有代价,因为我们不能使用常规的函数,必须使用协程,而且要习惯 yield from。只要函数中有 yield from,`函数就会变成协程,而协程不能直接调用,必须使用事件循环来显式排定协程的执行时间(或者在其他排定了执行时间的协程中使用 yield from 表达式把它激活)。

问题关键是必须知道何时应该使用 yield from,何时不应该使用。基本原则很简单,yield from 只能用于协程和 asyncio.Future 实例。

使用 asyncio 包编写服务器

asyncio 包提供了高层流 API,有现成的服务器可用,我们只需要实现一个处理程序(普通的回调或协程)。在使用 asyncio 编写 tcp 服务器时,一般的范式是:

  • 编写协程处理业务逻辑,该业务处理协程之后会被传递给 asyncio.start_server 函数。业务处理协程函数接收的两个参数: asyncio.StreamReader 对象和 asyncio.StreamWriter 对象。
  • 使用 asyncio.get_event_loop() 创建事件循环
  • 使用 asyncio.start_server 函数创建服务端协程,它接收业务处理的协程、监听的地址和端口、以及事件循环作为参数
  • 通过事件处理循环的 run_until_complete 驱动服务端协程,
  • loop.run_forever 运行事件循环

需要注意,run_until_complete 方法的参数是一个协程(start_server 方法返回的结果)或者是一个 Future 对象(server.wait_closed 方法返回的结果)。如果传给 run_until_complete 方法的参数是协程,会把协程包装在 Task 对象中。

另外需要注意,有些 I/O 方法是协程,必须由 yield from 驱动,而另一些则是普通的函数。

只有驱动协程,协程才能做事。而驱动 asyncio.coroutine 装饰的协程有两种方法:要么使用 yield from,要么传给 asyncio 包中某个参数为协程或期物的函数(例如 run_until_complete)。

除了防止阻塞调用之外,高并发的系统还必须把复杂的工作分成多步,以保持敏捷。

小结

这里总结了在 Python 中做并发编程的一种全新方式,这种方式使用 yield from、协程、期物和 asyncio 事件循环。其实异步库依赖于低层线程,但是这些库的用户无需创建线程,也无需知道用到了基础设施中的底层线程。异步系统能避免用户级线程开销,这是它比多线程系统管理更多并发连接的主要原因。

asyncio 包添加到标准库之后,协程和期物被确定为符合 Python 风格的异步代码编写方式。此外,asyncio 包为异步期物和事件循环定义了标准接口,为二者提供了实现参考。