把生成器当成协程
- 有了上一篇文章生成器的介绍,理解协程就容易多了。Python 2.2 引入了 yield 关键字实现了生成器函数。Python 2.5为生成器对象添加了额外的方法和功能,其中最值得关注的是
.send()
方法。 - 与
.__next__()
方法一样,.send() 方法致使生成器前进到下一个yield 语句。不过,.send() 方法还允许使用生成器的客户把数据发给自己,即不管传给 .send() 方法什么参数,那个参数都会成为生成器函数定义体中对应的 yield 表达式的值。也就是说,.send() 方法允许在客户代码和生成器之间双向交换数据。而 .__next__() 方法只允许客户从生成器中获取数据。
这是一项重要的“改进”,甚至改变了生成器的本性:像这样使用的话,生成器就变身为协程
。 - 生成器用于生成供迭代的数据, 协程是数据的消费者,虽然在协程中会使用 yield 产出值,但这与迭代无关。
- yield作用:
产出和让步
yield item 这行代码会产出一个值,提供给 next(...) 的调用方;
此外,还会作出让步,暂停执行生成器,让调用方继续工作,直到需要使用另一个值时再调用next()。调用方会从生成器中拉取值。
- 协程与生成器类似,都是定义体中包含 yield 关键字的函数。但是,在协程中,yield 通常出现在表达式的右边(例如,result = yield),
如果 yield关键字后面没有表达式,那么生成器产出 None
。 - 不管数据如何流动,yield 都是一种
流程控制工具
,使用它可以实现协作式多任务:协程可以把控制器让步给中心调度程序,从而激活其他的协程。
协程的行为
- 先来看个简单示例:
def coroutine_test():
print("start!")
x = yield # yield 在表达式中使用
print("end", x)
if __name__ == '__main__':
c = coroutine_test() # 调用函数得到生成器对象
print(c)
next(c) # 调用 next(...) 函数启动生成器改变生成器状态
c.send(42) # yield 表达式会计算出42,协程会恢复,一直运行到下一个 yield 表达式,或者终止
- output
<generator object coroutine_test at 0x010084E0>
start!
end 42
Traceback (most recent call last):
File "D:xxx.py", line 16, in <module>
c.send(42)
StopIteration
- 协程的四个状态:
- GEN_CREATED:等待开始执行
- GEN_RUNNING:解释器正在执行(多线程)
- GEN_SUSPENDED:在 yield 表达式处暂停
- GEN_CLOSED:执行结束
- 为什么一开始要调用next()方法呢?因为最开始协程没有运行,为
等待执行
状态,只有先让协程开始执行到暂停
状态,而send 方法的参数会成为暂停的 yield 表达式的值
,暂停后才能顺利传入参数,不过如果参数传入None(coroutine_test.send(None)
)也是可以启动协程的,为了不混淆建议使用next启动协程。 - 为了更好的理解协程,再来看个例子:
def coroutine_test(a):
print('-> Started: a =', a)
b = yield a
print('-> Received: b =', b)
print(a + b)
c = yield a + b
print(a + b)
print('-> Received: c =', c)
if __name__ == '__main__':
cor = coroutine_test(14)
print(getgeneratorstate(cor))
next(cor)
print(getgeneratorstate(cor))
cor.send(28)
try:
cor.send(30)
except StopIteration:
print("协程终止")
print(getgeneratorstate(cor))
- output
GEN_CREATED
-> Started: a = 14
GEN_SUSPENDED
-> Received: b = 28
42
42
-> Received: c = 30
协程终止
GEN_CLOSED
- 现在来详细说下这个例子的过程:
- 我们先创建了生成器对象cor(协程),这时cor状态为
GEN_CREATED
等待执行,此时a=14 - 通过next启动协程,程序运行到第一个yield终止,此时打印
Started: a = 14
,协程(cor)为GEN_SUSPENDED
(暂停)状态 - 协程暂停了这样就可以使用send传参:
cor.send(28)
,这时传入参数为28,需要注意的是此时b = yield a
中yield接收到了值是28而不是第一次传入的14,因为上一步到yield终止了,并没有计算表达式。此时程序运行到下个yield:c = yield a + b
,但是并没有计算表达式,此时a=14,b=28 - 接着运行
cor.send(30)
, 先计算表达式c = yield a + b
, c=30,由于程序直到结束也没有找到yield,所以报错(StopIteration) - 最后协程(cor)终止后其状态为
GEN_CLOSED
(关闭)
- 我们先创建了生成器对象cor(协程),这时cor状态为
-
图形化过程如下
- 通过上面的例子应该能比较清楚的理解协程的行为,下面再来看个例子加深理解:
def averager():
"""使用协程计算移动平均值"""
total, count, average = 0.0, 0, None
while True:
term = yield average
total += term
count += 1
average = total / count
- 这个协程中使用了无限循环,
只要调用方不断把值发给这个协程,它就会一直接收值,然后生成结果。
仅当调用方在协程上调用.close()
方法,或者没有对协程的引用而被垃圾回收程序回收时,这个协程才会终止。其中的yield 表达式用于暂停执行协程,把结果发给调用方;还用接收调用方后面发给协程的值,恢复无限循环。 - 这里使用协程的好处是函数并没有定义参数,但是内部能接收到传过来的参数值,这样就避免了使用外部参数,只需要定义局部变量即可,极大简化了代码。如果不这样做可能需要使用闭包,还需要定义外部的变量。下面来看看此例的使用:
if __name__ == '__main__':
avg = averager()
next(avg)
print(avg.send(10))
print(avg.send(20))
print(avg.send(30))
- output
10.0
15.0
20.0
- 虽说使用了无限循环,但是每次yield产出值后暂停了,所以这样做是合理的。有了之前的分析,得到的这个结果也不难理解。每次到计算平均值,到yield终止,并返回yield计算出的值。
- 可能有人会想:难道我每次都要使用next来启动协程吗?万一我忘记启动了那怎么办?当然,肯定有解决方法:
预激
,我们只需要一个简单的装饰器即可。 - 下面来看看这个简单的装饰器怎样实现:
def coroutine(func):
@wraps(func)
def wrapper(*args, **kwargs):
gen = func(*args, **kwargs)
next(gen)
return gen
return wrapper
- 这个装饰器就其实只做了一件有用的事:
next(gen)
,启动协程函数,然后返回生成器对象。给生成器函数加上该装饰器:
@coroutine
def averager():
"""使用协程计算移动平均值"""
total, count, average = 0.0, 0, None
while True:
term = yield average
total += term
count += 1
average = total / count
- 这样当运行
avg = averager()
时:先把生成器函数替换成装饰器中闭包函数,启动gen协程,返回gen生成器对象,这样就避免了每次都是需要新起next启动协程,这样做本质上并没有减少程序执行的流程,但是简化了代码,有了统一的预激装饰器。这样做的好处是显而易见的,所以很多异步框架都是使用了类似的装饰器,其作用也不仅限于启动协程。 - python3 出现 yield from,其优化了这一步,会自启动协程,这个后面再说。
- 到目前为止,我们看的例子有两个明显的漏洞。一是协程终止后会报错,二是在无限循环中协程只是暂停,并没有终止。显然开发者考虑到了这点,协程使用
close
方法终止,如下所示:
if __name__ == '__main__':
avg = averager()
print(avg.send(10))
print(getgeneratorstate(avg))
avg.close() # 终止协程
print(getgeneratorstate(avg))
print(avg.send(15))
- output
10.0
GEN_SUSPENDED
GEN_CLOSED
Traceback (most recent call last):
File "D:/xxx.py", line 47, in <module>
print(avg.send(15))
StopIteration
- 根据测试可知:使用close方法会终止协程,使协程切换为
GEN_CLOSED
(终止)状态。协程终止后无法再次使用,除非重新生成新的生成器(协程)。 - generator.close() 官方解释:
致使生成器在暂停的 yield 表达式处抛出 GeneratorExit 异常。
如果生成器没有处理这个异常,或者抛出了 StopIteration 异常(通常是指运行到结尾),调用方不会报错。
如果收到GeneratorExit 异常,生成器一定不能产出值,否则解释器会抛出RuntimeError 异常。
生成器抛出的其他异常会向上冒泡,传给调用方。
让协程返回值
- python3.3引入了yield from,其作用可不是仅仅预启动协程那么简单。
其主要原因之一与把异常传入嵌套的 协程有关。另一个原因是让协程更方便地返回值
。下面说说如何让协程使用return返回值。还是使用前面求平均值的例子,稍微做下改动如下:
def averager():
"""使用协程计算移动平均值"""
total, count, average = 0.0, 0, None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)
if __name__ == '__main__':
avg = averager()
next(avg)
print(avg.send(10))
print(avg.send(20))
print(avg.send(None))
- output
None
None
Traceback (most recent call last):
File "D:/xxx.py", line 52, in <module>
print(avg.send(None))
StopIteration: Result(count=2, average=15.0)
- 可以看出,当yield后面没有表达式时,返回为空(None),这里的yield也就可以理解为return,只不过协程只是暂停了,当执行
avg.send(None)
时,term=None,循环终止,协程直到结束没有找到yield而报错(StopIteration),唯一疑惑的点是此时的错误中加上了返回信息:StopIteration: Result(count=2, average=15.0)
,其实这是协程处理异常的一个行为:return 表达式的值会传给调用方,赋值给 StopIteration 异常的 value 属性。
- 既然返回信息传给了StopIteration 的value属性,那么下面测试验证一下:
if __name__ == '__main__':
avg = averager()
next(avg)
print(avg.send(10)) # None
print(avg.send(20)) # None
try:
avg.send(None)
except StopIteration as e:
print(e.value) # Result(count=2, average=15.0)
- 可见返回值确实赋值给了StopIteration 的value属性,这样得到返回值是不是感觉有点麻烦,显然通过这样的方式获取协程的返回值不太合理,而
yield from
正好解决了这个问题:yield from 结构会在内部自动捕获 StopIteration 异常,并且会把 value 属性的值变成 yield from 表达式的值。
- yield from x 表达式对 x 对象所做的第一件事是,
调用 iter(x),从中获取迭代器。因此,x 可以是任何可迭代的对象。
但是,这不是yield from的主要功能,yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。
- 为了使用yield from,先来引入几个概念:
委派生成器
包含 yield from <iterable> 表达式的生成器函数
子生成器
从 yield from 表达式中 <iterable> 部分获取的生成器。
调用方
“调用方”这个术语指代调用委派生成器的客户端代码。
- 如图,
委派生成器在 yield from 表达式处暂停时,调用方可以直接把数据发给子生成器,子生成器再把产出的值发给调用方。子生成器返回之后,解释器会抛出 StopIteration 异常,并把返回值附加到异常对象上,此时委派生成器会恢复。
完整代码如下所示:
from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
"""子生成器"""
total, count, average = 0.0, 0, None
while True:
term = yield
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)
def main(data):
"""调用方"""
results = {}
for key, values in data.items():
group = grouper(results, key)
next(group)
for v in values:
group.send(v)
group.send(None)
report(results)
def grouper(results, key):
"""委派生成器"""
while True:
results[key] = yield from averager()
print(results[key])
def report(results):
"""格式化输出"""
for key, result in sorted(results.items()):
group, unit = key.split(';')
print('{:2} {:5} averaging {:.2f}{}'.format(result.count, group, result.average, unit))
data = {
'girls;kg':
[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
'girls;m':
[1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
'boys;kg':
[39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
'boys;m':
[1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}
if __name__ == '__main__':
main(data)
- output
Result(count=10, average=42.040000000000006)
Result(count=10, average=1.4279999999999997)
Result(count=9, average=40.422222222222224)
Result(count=9, average=1.3888888888888888)
9 boys averaging 40.42kg
9 boys averaging 1.39m
10 girls averaging 42.04kg
10 girls averaging 1.43m
- 可能到这里你还是对yield from 一知半解,下面来详细说下这个流程,启动调用函数(main),第一次循环
key="girls;kg", values=[40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5]
,通过next(group)
启动委派生成器(group),委派生成器中通过yield from averager()
启动子生成器(averager),此时子生成器和委派生成器都是暂停状态。 - 然后循环values并发送数据,这里先先进行
group.send(40.9)
,group恢复运行,参数(40.9)传给yield from后面的averager,averager中yield接收到40.9,此时term=40.9,均值average = 40.9
, 计算完毕协程找到yield暂停;特别需要注意的是,只有当字生成器有返回值时,才会把这个返回值赋值给results[key]
,也就是说,只有当group.send(None)
,子生成器循环结束,其返回值(Result(count=10, average=42.040000000000006)
)才赋值给了results[key], 也就是一轮values一个返回值。这样结果也就好理解了。 - 可知,如果外层 for 循环的末尾没有 group.send(None),那么averager 子生成器永远不会终止,委派生成器 group 永远不会再次激活,因此永远不会为 results[key] 赋值。
- 外层 for 循环重新迭代时会新建一个 grouper 实例,然后绑定到 group 变量上。前一个 grouper 实例(以及它创建的尚未终止的averager 子生成器实例)被垃圾回收程序回收。
- 说了这么多,其实是要说明关键的一点:
如果子生成器不终止,委派生成器会在 yield from 表达式处永远暂停。
协程应用举例
- 说了这么多,现在来看下比较复杂的例子:
离散事件仿真(DES)
。DES是一种把系统建模成一系列事件的仿真类型。在离散事件仿真中,仿真“钟”向前推进的量不是固定的,而是直接推进到下一个事件模型的模拟时间。
- 我们抽象
模拟出租车的运营过程
,其中一个事件是乘客上车,下一个事件则是乘客下车。不管乘客坐了 多长时间,一旦乘客下车,仿真钟就会更新,指向此次运营的结束时间。这与连续仿真不同,连续仿真的仿真钟以固定的量(通常很小)不断向前推进。 - 仿真程序会创建几辆出租车,出租车首先驶离车库,四处徘徊,寻找乘客;拉到乘客后,行程开始;乘客下车后,继续四处徘徊。四处徘徊和行程所用的时间使用指数分布生成。为了让显示的信息更加整洁,时间使用取整的分钟数。
- 直接写这段程序可能比较困难,但是下手之前不妨先拆分不同部分,先理清出租车一次出车有哪些事件,如何建立一个完整的车队,指数分布该如何实现,最难的部分就是怎样模拟调度出租车运行。这里维护了一个优先队列,启动协程获取下个事件不断去运行。完整代码如下所示:
# -*- coding: utf-8 -*-
import argparse
import random
import collections
import queue
DEFAULT_NUMBER_OF_TAXIS = 3 # 默认出租车数量
DEFAULT_END_TIME = 180 # 一次出车事件时间消耗(minute)
SEARCH_DURATION = 5
TRIP_DURATION = 20 # 旅途消耗的时间
DEPARTURE_INTERVAL = 5 # 平均时间间隔
# 事件参数:时间,车辆编号,发现的情况
Event = collections.namedtuple("Event", "time proc action")
def taxi_process(ident, trips, start_time=0):
"""
单个车辆轨迹:每次状态变化时向仿真程序产出一个事件
:param ident: 车辆编号
:param trips: 车辆一次出车生意单数
:param start_time: 开始时间
"""
time = yield Event(start_time, ident, "leave garage") # 离开车库
for i in range(trips):
time = yield Event(time, ident, "pick up passenger") # 载客
time = yield Event(time, ident, "drop off passenger") # 乘客下车
yield Event(time, ident, "going home") # 出车结束回家
def computer_duration(previous_action):
"""指数分布计算操作耗时"""
if previous_action in ["leave garage", "drop off passenger"]:
interval = SEARCH_DURATION
elif previous_action == "pick up passenger":
interval = TRIP_DURATION
elif previous_action == "going home":
interval = 1
else:
raise ValueError("unknown previous_action: {}".format(previous_action))
return int(random.expovariate(1 / interval) + 1)
class Simulator:
"""模拟器"""
def __init__(self, proc_dict):
self.events = queue.PriorityQueue() # 事件优先队列
self.proc_dict = proc_dict
def run(self, end_time):
"""事件调度"""
print("----------------------------------启动协程------------------------------------")
for _, proc in sorted(self.proc_dict.items()):
first_event = next(proc)
print("first_event: {}".format(first_event))
self.events.put(first_event)
print("----------------------------------开始出车------------------------------------")
sim_time = 0
while sim_time < end_time:
if self.events.empty():
print("events is None, end")
break
current_event = self.events.get()
# 当前事件
sim_time, proc_id, previous_action = current_event
# 打印不同出租车事件随着时间轴的分布,同一车次的横坐标一致
print('taxi:', proc_id * ' ', current_event)
# 获取正在运行的事件生成器
active_proc = self.proc_dict[proc_id]
# 获取指数分布下一次事件的时间
next_time = sim_time + computer_duration(previous_action)
try:
# 启动协程,车辆进行到下一次事件并返回
next_event = active_proc.send(next_time)
except StopIteration:
# 单个车次事件运行结束删除车辆
del self.proc_dict[proc_id]
else:
self.events.put(next_event)
else:
msg = '*** end of simulation time: {} events pending ***'
print(msg.format(self.events.qsize())) # 打印没有完成的任务车辆。
def main(end_time=DEFAULT_END_TIME, taxis_num=DEFAULT_NUMBER_OF_TAXIS, seed=None):
if seed is not None:
# 设置随机数的规则定位
random.seed(seed)
# 建立车队
taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL) for i in range(taxis_num)}
# 车队放入模拟器中
sim = Simulator(taxis)
# 运行模拟器,车队开始运行
sim.run(end_time)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Taxi Fleet Simulator")
parser.add_argument("-e", "--end_time", type=int, default=DEFAULT_END_TIME,
help="simulation end time; default = {}".format(DEFAULT_END_TIME))
parser.add_argument("-t", "--taxis", type=int, default=DEFAULT_NUMBER_OF_TAXIS,
help=f'number of taxis running; default 'f'= {DEFAULT_NUMBER_OF_TAXIS}')
parser.add_argument('-s', '--seed', type=int, default=None, help="random generator seed (for testing)")
args = parser.parse_args()
main(args.end_time, args.taxis, args.seed)
- 这段代码建议好好写一写,有很多值得借鉴的地方,如果你有更好的方法,也不妨多去尝试。来看看某次运行结果:
----------------------------------启动协程------------------------------------
first_event: Event(time=0, proc=0, action='leave garage')
first_event: Event(time=5, proc=1, action='leave garage')
first_event: Event(time=10, proc=2, action='leave garage')
----------------------------------开始出车------------------------------------
taxi: Event(time=0, proc=0, action='leave garage')
taxi: Event(time=2, proc=0, action='pick up passenger')
taxi: Event(time=5, proc=1, action='leave garage')
taxi: Event(time=10, proc=2, action='leave garage')
taxi: Event(time=11, proc=1, action='pick up passenger')
taxi: Event(time=12, proc=2, action='pick up passenger')
taxi: Event(time=15, proc=2, action='drop off passenger')
taxi: Event(time=19, proc=2, action='pick up passenger')
taxi: Event(time=20, proc=1, action='drop off passenger')
taxi: Event(time=23, proc=1, action='pick up passenger')
taxi: Event(time=28, proc=0, action='drop off passenger')
taxi: Event(time=29, proc=0, action='pick up passenger')
taxi: Event(time=30, proc=2, action='drop off passenger')
taxi: Event(time=31, proc=2, action='pick up passenger')
taxi: Event(time=34, proc=2, action='drop off passenger')
taxi: Event(time=36, proc=2, action='pick up passenger')
taxi: Event(time=38, proc=2, action='drop off passenger')
taxi: Event(time=41, proc=2, action='pick up passenger')
taxi: Event(time=44, proc=0, action='drop off passenger')
taxi: Event(time=46, proc=0, action='going home')
taxi: Event(time=53, proc=2, action='drop off passenger')
taxi: Event(time=59, proc=2, action='pick up passenger')
taxi: Event(time=72, proc=2, action='drop off passenger')
taxi: Event(time=80, proc=2, action='going home')
taxi: Event(time=101, proc=1, action='drop off passenger')
taxi: Event(time=102, proc=1, action='pick up passenger')
taxi: Event(time=146, proc=1, action='drop off passenger')
taxi: Event(time=152, proc=1, action='pick up passenger')
taxi: Event(time=178, proc=1, action='drop off passenger')
taxi: Event(time=181, proc=1, action='going home')
*** end of simulation time: 0 events pending ***
- 从输出结果可以很清晰看到三辆出租车的模拟出车过程,出租车0第0分钟开始从车库出发,2分钟时接到了第一位乘客,28乘客下车,29分钟又接到了一位乘客,44分钟下车,46分钟回家。
- 为什么要用这个指数分布呢?我们不妨做下试验,以20作为指数因子,比较不同次数指数的平均值
print(sum([(random.expovariate(1 / 20) + 1) for i in range(10)]) / 10)
print(sum([(random.expovariate(1 / 20) + 1) for i in range(100)]) / 100)
print(sum([(random.expovariate(1 / 20) + 1) for i in range(1000)]) / 1000)
print(sum([(random.expovariate(1 / 20) + 1) for i in range(10000)]) / 10000)
print(sum([(random.expovariate(1 / 20) + 1) for i in range(100000)]) / 100000)
- 多次运行,你会发现,当运行次数越多,其平均值在概率上来说越准确,但是并不是运行次数越多就一定越准确。某一次结果如下:
27.2499797539986
19.265314895088892
20.31108815752231
21.094091857969136
20.961473189012956
- 至于该指数分布,有兴趣的也可以去尝试,去看看散点图分布。协程讲到这里也就差不多,这个示例的要旨是说明
如何在一个主循环中处理事件,以及如何通过发送数据驱动协程
,这是 asyncio 包底层的基本思想。在 asyncio 库中,协程(通常)使用@asyncio.coroutine
装饰器装饰而且始终使用yield from 结构驱动,而不通过直接在协程上调用 .send(...) 方法驱动。当然,在 asyncio 库的底层,协程使用 next(...) 函数和.send(...) 方法驱动,不过在用户代码中只使用 yield from 结构驱动协程运行。 - 归根结底,还是要弄懂协程的原理,你会发现新出的python框架对于异步都是尤其钟爱,例如fastapi。后续的框架支持异步已经成了基本要求。如果单单只看异步,就会发现go在异步这块做的更好,但是其底层原理还是差不多的,只不过更加严谨。