目录:
一、基于生成器的协程
二、协程状态
三、协程预激装饰器
四、终止协程和异常处理
五、协程返回值
六、yield from
七、greenlet协程
八、gevent协程
Python并发之协程
协程(coroutine)可以在执行期间暂停(suspend),这样就可以在等待外部数据处理完成之后(例如,等待网络I/O数据),从之前暂停的地方恢复执行(resume)。
一、基于生成器的协程
- 生成器
2001年,Python 2.2 通过了 PEP 255 -- "Simple Generators" ,引入了 yield 关键字实现了生成器函数。
yield item:yield包含”产出“和”让步“两个含义。生成器中 yield x 这行代码会 产出 一个值,提供给 next(...) 的调用方。此外,还会作出让步,暂停执行生成器,让调用方继续工作,直到需要使用另一个值时再调用 next(...)。
- 协程
2005年,Python 2.5 通过了 PEP 342 -- "Coroutines via Enhanced Generators",给生成器增加了.send()、.throw()和.close()方法,第一次实现了基于生成器的协程函数(generator-based coroutines)。
send(item):生成器的调用方可以使用send()方法发送数据,发送的数据会成为生成器中yield表达式的值。此时称生成器为协程。协程指与调用方协作的过程,产出由调用方提供的值。
示例:
def simple_coroutine():
print("->协程开始")
x = yield
print("->协程收到x:", x)
my_coro = simple_coroutine()
print(my_coro)
print(next(my_coro))
my_coro.send(50)
运行结果解释:
① yield在表达式中的使用:如果协程只需从客户那里接收数据,那么产出的值是None(这个值是隐式指定的,因为yield关键字右边没有表达式)。
② 首先要需要调用next(...)。因为生成器还没启动,未在yield语句处暂停,所以一开始无法发送数据。
③ 调用send()方法后,协程定义体中的yield表达式会计算出50。调用send()方法后协程会恢复,一直运行到下一个yield表达式,或者终止。
二、协程状态
GEN_CREATED:创建状态(等待开始执行)
GEN_SUSPENDED:挂起状态(在yield表达式处暂停)
GEN_RUNNING:运行状态(正在被解释器执行。只有在多线程应用中才能看到这个状态)
GEN_CLOSED:关闭状态(执行结束)
协程激活方法
- 1.调用next():next(generator)
- 2.发送None:generator.send(None)
示例:
In[1]: from inspect import getgeneratorstate
def simple_coroutine():
print("->协程开始")
x = yield
print(getgeneratorstate(my_coro2))
print("->协程收到x:", x)
my_coro = simple_coroutine()
next(my_coro)
print(getgeneratorstate(my_coro))
my_coro.send(50)
In[2]: print(getgeneratorstate(my_coro))
三、协程预激装饰器
1.@wraps装饰器
Python装饰器(decorator)在实现的时候,被装饰后的函数其实已经是另外一个函数了(函数名等函数属性会发生改变)。为了消除这样的影响,Python的functools包中提供了一个叫wraps的decorator来消除这样的副作用。写一个decorator的时候,最好在实现之前加上functools的wrap,它能保留原有函数的名称和docstring。
示例:
from functools import wraps
def my_decorater(func):
@wraps(func)
def wrapper(*args, **kwargs):
"""
wrapper.doc
:param args:
:param kwargs:
:return:
"""
print("calling decorater start")
func(*args, **kwargs)
print("calling decorater end")
return wrapper
@my_decorater
def exam():
"""
exam.doc
:return:
"""
print("example....")
exam()
print("func_name:", exam.__name__)
print("func_doc:", exam.__doc__)
2.协程预激装饰器的实现
from functools import wraps
def coroutine_activator(function):
@wraps(function)
def wrapper(*args, **kwargs):
generator = function(*args, **kwargs)
next(generator)
return generator
return wrapper
示例:
def coroutine_activator(function):
@wraps(function)
def wrapper(*args, **kwargs):
generator = function(*args, **kwargs)
next(generator)
return generator
return wrapper
@coroutine_activator
def averager():
total = 0.0
count = 0
average = 0.0
while True:
term = yield average
total += term
count += 1
average = total / count
cor_avg = averager()
print(getgeneratorstate(cor_avg))
四、终止协程和异常处理
协程中未处理的异常会向上冒泡,传给next()函数或.send()方法的调用方(即触发协程的对象)。
1.generator.throw(exc_type[, exc_value[, traceback]])
- 1.该方法会使生成器在暂停的yield表达式处抛出指定的异常。
- 2.如果生成器处理了该异常,代码会继续执行到下一个yield表达式,而产出的值会成为generator.throw()方法的返回值。
- 3.如果没有处理这个异常,异常则会向上冒泡。
示例:
class DemoException(Exception):
pass
def demo_exc_handling():
print("-->coroutine started")
while True:
try:
x = yield
# except GeneratorExit:
# print("-->GeneratorExit has been handled. Call close methond...")
except DemoException:
print("-->DemoException has been handled. Continuing...")
else:
print("-->coroutine received:{!r}".format(x))
finally:
print("-->end...")
exc_coro = demo_exc_handling()
next(exc_coro)
print(getgeneratorstate(exc_coro))
exc_coro.throw(DemoException)
print(getgeneratorstate(exc_coro))
exc_coro.throw(ZeroDivisionError)
print(getgeneratorstate(exc_coro))
2.generator.close()
- 1.该方法会使生成器在暂停的yield表达式处抛出GeneratorExit异常。
- 2.如果生成器没有处理这个异常,或者抛出了StopIteration异常(通常是指运行到结尾),调用方不会报错。
- 3.如果收到GeneratorExit异常,生成器一定不能产出值,否则解释器会抛出RuntimeError异常。
- 4.生成器抛出的其他异常会向上冒泡,传给调用方。
coro = demo_exc_handling()
next(coro)
coro.send(10)
coro.close()
coro.send(20)
五、协程return值
有些协程在被激活后,每次驱动(drive)协程时,不会产出值,而是在最后(协程正常终止时)返回一个值(通常是某种累加值)。
from collections import namedtuple
Result = namedtuple('Result', 'count average')
def averager():
total = 0.0
count = 0
average = 0.0
while True:
term = yield average
# 为了返回值,协程必须正常终止。这里使用条件判断,以便退出累计循环
if term is None:
break
total += term
count += 1
average = total / count
# 返回一个 namedtuple,包含 count 和 average 两个字段。在 Python 3.3 之前,如果生成器返回值,解释器会报句法错误
return Result(count, average)
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)
coro_avg.send(20)
coro_avg.send(30)
#捕获StopIteration异常,获取 averager 返回的值:
try:
coro_avg.send(None)
except StopIteration as exc_data:
r = exc_data.value
print(r)
六、yield from
- 在生成器gen中使用yield from subgen()时,子生成器subgen会获得控制权,把产出的值传给gen的调用方,即调用方可以直接控制subgen。与此同时,gen会阻塞,等待subgen终止。
- yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码。通过这个结构,协程可以把功能委托给子生成器。
- 调用方:调用委派生成器的客户端代码。
- 委派生成器:包含 yield from <iterable> 表达式语句的生成器函数(包含yield from语句的生成器)。
- 子生成器:从 yield from 表达式中 <iterable>部分获取的生成器(yield from后面的表达式)。
简单示例:
def gen():
yield from "AB"
yield from range(1, 3)
print(list(gen()))
示例:
Result = namedtuple('Result', 'count average')
# 子生成器
def averager():
total = 0.0
count = 0
average = 0.0
while True:
term = yield average
if term is None:
break
total += term
count += 1
average = total / count
return Result(count, average)
# 委派生成器
def grouper(results, key):
while True:
results[key] = yield from averager()
# 客户端代码
def main(data):
results = {}
for key, values in data.items():
group = grouper(results, key)
# 激活委派生成器
next(group)
for value in values:
group.send(value)
group.send(None)
print(results)
data = {"girls:kg": [49.9, 39.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
"girls:m": [1.6, 1.5, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
"boys:kg": [43.9, 48.5, 44.3, 44.2, 50.3, 40.5, 40.6, 30.8, 50, 55.5],
"boys:m": [1.6, 1.4, 1.4, 1.3, 1.6, 1.42, 1.55, 1.66, 1.43, 1.60],
}
main(data)
运行结果解释:
委派生成器在yield from 表达式处暂停时,调用方可以直接把数据发给子生成器,子生成器再把产出的值发给调用方。子生成器返回之后,解释器会抛出StopIteration异常,并把返回值附加到异常对象上,此时委派生成器会恢复运行。
注意:
- 如果子生成器不终止,委派生成器会在yield from处永远暂停。
- 因为委派生成器相当于管道,所以可以把任意数量个委派生成器连接在一起:一个委派生成器使用yield from调用一个子生成器,而那个子生成器本身也是委派生成器,使用yield from调用另一个子生成器,以此类推。最终,这个链条要以一个只使用yield表达式的简单生成器结束;不过,也能以任何可迭代的对象结束。
- 任何yield from链条都必须由客户驱动,在最外层委派生成器上调用next(...)函数或.send(...)方法。
yield from的意义
- 1.子生成器产出的值都直接传给委派生成器的调用方(即客户端代码)。
- 2.使用send()方法发给委派生成器的值都直接传给子生成器。如果发送的值是None,那么会调用子生成器的next()方法。如果发送的值不是None,那么会调用子生成器的send()方法。如果调用的方法抛出StopIteration异常,那么委派生成器恢复运行。任何其他异常都会向上冒泡,传给委派生成器。
- 3.生成器退出时,生成器(或子生成器)中的return expr表达式会触发StopIteration(expr)异常抛出。
- 4.yield from表达式的值是子生成器终止时传给StopIteration异常的第一个参数。
- 5.传入委派生成器的异常,除了GeneratorExit之外都传给子生成器的throw()方法。如果调用throw()方法时抛出StopIteration异常,委派生成器恢复运行。StopIteration之外的异常会向上冒泡,传给委派生成器。
- 6.如果把GeneratorExit异常传入委派生成器,或者在委派生成器上调用close()方法,那么在子生成器上调用close()方法,如果它有的话。如果调用close()方法导致异常抛出,那么异常会向上冒泡,传给委派生成器;否则,委派生成器抛出GeneratorExit异常。
七、greenlet协程
greenlet由来
虽然CPython(标准Python)能够通过生成器来实现协程,但使用起来还并不是很方便。 与此同时,Python的一个衍生版 Stackless Python。实现了原生的协程,它更利于使用。 于是,大家开始将 Stackless 中关于协程的代码 。单独拿出来做成了CPython的扩展包。 这就是 greenlet 的由来,因此 greenlet 是底层实现了原生协程的 C扩展库。
import greenlet
# 创建greenlet协程
coroutine = greenlet.greenlet()
# 运行greenlet协程
coroutine.switch()
示例:生产者-消费者模式
import greenlet
import random
def producer():
while True:
item = random.randint(0, 99)
print("生成数字{}".format(item))
c.switch(item) # 将data传给c,并切换到c
def consumer():
while True:
item = p.switch() # 切换到p,等待传入数值
print("消费数字{}".format(item))
if __name__ == "__main__":
c = greenlet.greenlet(consumer)
p = greenlet.greenlet(producer)
c.switch()
greenlet 的优势:
- 1.高性能的原生协程。
- 2语义更加明确的显式切换。
- 3.直接将函数包装成协程,保持原有代码风格。
八、gevent协程
gevent是什么:http://www.gevent.org/
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成。
import gevent
# 创建协程
gevent.spawn(参数)
# gevent协程运行列表
gevent.joinall([协程列表])
示例:生产者-消费者模式
import gevent
from gevent.queue import Queue
import random
def Consumer(queue):
while True:
item = queue.get()
print("消费数字{}".format(item))
def Producer(queue):
while True:
item = random.randint(0, 99)
queue.put(item)
print("生产数字{}".format(item))
queue = Queue(3)
p = gevent.spawn(Producer, queue)
c = gevent.spawn(Consumer, queue)
gevent.joinall([p, c])