1 协程
1.1 简介
协程
,又称微线程
,纤程。英文名Coroutine
。
协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。
子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。
协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。
注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:
def A():
print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'
假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:
1
2
x
y
3
z
但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。
看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?
1.2 协程优势&分类
1.2.1 优势
- 最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,
没有线程切换的开销
,和多线程比,线程数量越多,协程的性能优势就越明显。 - 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
- 因为协程是一个线程执行,那怎么利用多核CPU呢,最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
1.2.2 分类
协程有两种,一种无栈协程
,python中以 asyncio
为代表, 一种有栈协程,python 中 以 gevent
为代表
有栈线程 | 无栈线程 | 备注 | |
---|---|---|---|
示例 | lua thread python gevent |
C# yield return C# async\await python asyncio |
无 |
是否拥有单独的上下文 | 是 | 否 | 上下文包括寄存器、栈帧 |
局部变量保存位置 | 栈 | 堆 | 无栈协程的局部变量保存在堆上,比如generator的数据成员 |
优点 | 1. 每个协程有单独的上下文,可以在任意的嵌套函数中任何地方挂起此协程。 2. 不需要编译器做语法支持,通过汇编指令即可实现 |
1. 不需要为每个协程保存单独的上下文,内存占用低。 2. 切换成本低,性能更高 |
无 |
缺点 | 1. 需要提前分配一定大小的堆内存保存每个协程上下文,所以会出现内存浪费或者栈溢出。 2. 上下文拷贝和切换成本高,性能低于无栈协程 |
1. 需要编译器提供语义支持,比如C# yield return语法糖。 2. 只能在这个生成器内挂起此协程,无法在嵌套函数中挂起此协程。 3. 关键字有一定传染性,异步代码必须都有对应的关键字。作为对比,有栈协程只需要做对应的函数调用 |
无栈协程无法在嵌套函数中挂起此协程,有栈协程由于是通过保存和切换上下文包括寄存器和执行栈实现,可以在协程函数的嵌套函数内部yield这个协程并唤醒。 |
1.3 generator协程
Python对协程的支持还非常有限,用在generator
中的yield
可以一定程度上实现协程。虽然支持不完全,但已经可以发挥相当大的威力了。
来看例子:
传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:
import time
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
next(c)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)
执行结果:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
注意到consumer函数是一个generator(生成器),把一个consumer传入produce后:
- 首先调用c.next()启动生成器;
- 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
- consumer通过yield拿到消息,处理,又通过yield把结果传回;
- produce拿到consumer处理的结果,继续生产下一条消息;
- produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
整个流程无锁,由一个线程执行,produce和consumer协
作完成任务,所以称为协程
,而非线程的抢占式多任务。
1.4 gevent协程
Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent
为Python提供了比较完善的协程支持。
gevent
是第三方库,通过greenlet
实现协程,其基本思想是:
当一个
greenlet
遇到IO
操作时,比如访问网络,就自动切换到其他的greenlet
,等到IO
操作完成,再在适当的时候切换回来继续执行。由于IO
操作非常耗时,经常使程序处于等待状态,有了gevent
为我们自动切换协程,就保证总有greenlet
在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent
需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch
完成:
from gevent import monkey; spawn
monkey.patch_all() # 替换标准库以使用 gevent 兼容的版本
def f(n):
for i in range(n):
print(gevent.getcurrent(), i)
g1 = spawn(f, 5)
g2 = spawn(f, 5)
g3 = spawn(f, 5)
g1.join()
g2.join()
g3.join()
运行结果:
<Greenlet at 0x10e49f550: f(5)> 0
<Greenlet at 0x10e49f550: f(5)> 1
<Greenlet at 0x10e49f550: f(5)> 2
<Greenlet at 0x10e49f550: f(5)> 3
<Greenlet at 0x10e49f550: f(5)> 4
<Greenlet at 0x10e49f910: f(5)> 0
<Greenlet at 0x10e49f910: f(5)> 1
<Greenlet at 0x10e49f910: f(5)> 2
<Greenlet at 0x10e49f910: f(5)> 3
<Greenlet at 0x10e49f910: f(5)> 4
<Greenlet at 0x10e49f4b0: f(5)> 0
<Greenlet at 0x10e49f4b0: f(5)> 1
<Greenlet at 0x10e49f4b0: f(5)> 2
<Greenlet at 0x10e49f4b0: f(5)> 3
<Greenlet at 0x10e49f4b0: f(5)> 4
可以看到,3个greenlet是依次运行而不是交替运行。
要让greenlet交替运行,可以通过gevent.sleep()交出控制权:
def f(n):
for i in range(n):
print (gevent.getcurrent(), i)
gevent.sleep(0)
执行结果:
<Greenlet at 0x10cd58550: f(5)> 0
<Greenlet at 0x10cd58910: f(5)> 0
<Greenlet at 0x10cd584b0: f(5)> 0
<Greenlet at 0x10cd58550: f(5)> 1
<Greenlet at 0x10cd584b0: f(5)> 1
<Greenlet at 0x10cd58910: f(5)> 1
<Greenlet at 0x10cd58550: f(5)> 2
<Greenlet at 0x10cd58910: f(5)> 2
<Greenlet at 0x10cd584b0: f(5)> 2
<Greenlet at 0x10cd58550: f(5)> 3
<Greenlet at 0x10cd584b0: f(5)> 3
<Greenlet at 0x10cd58910: f(5)> 3
<Greenlet at 0x10cd58550: f(5)> 4
<Greenlet at 0x10cd58910: f(5)> 4
<Greenlet at 0x10cd584b0: f(5)> 4
3个greenlet交替运行,把循环次数改为500000,让它们的运行时间长一点,然后在操作系统的进程管理器中看,线程数只有1个。
当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下:
from gevent import monkey
monkey.patch_all()
import gevent
import urllib.request
def f(url):
print('GET: {}'.format(url))
resp = urllib.request.urlopen(url)
data = resp.read()
print('{} bytes received from {}.'.format(len(data), url))
gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])
运行结果:
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
45661 bytes received from https://www.python.org/.
14823 bytes received from https://github.com/.
304034 bytes received from https://www.yahoo.com/.
从结果看,3个网络操作是并发执行的,而且结束顺序不同,但只有一个线程。
注意
:gevent
只能在Unix/Linux
下运行,在Windows
下不保证正常安装和运行。
1.5 asyncio
1.5.1 简介
asyncio
是用来编写并发代码的库,使用 async/await
语法。
asyncio
被用作多个提供高性能 Python
异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio
往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
1.5.2 asyncio函数
使用协程中的一般概念:
-
event_loop
:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件的时候,函数就会被循环执行 -
coroutine
:协程对象,我们可以将协程对象注册到事件循环中,它会被事件循环调用。可以使用async
关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象 -
task
:任务,它是对协程对象的进一步封装,包含了任务的各个状态 -
future
:代表将来执行或还没有执行的任务,实际上和task
没有本质区别 -
async
:定义一个协程,不会立即执行
-
await
:用来挂起阻塞方法的执行
事件循环函数(包括循环的创建、运行和停止)
-
asyncio.get_running_loop()
:函数返回当前 OS 线程中正在运行的事件循环。 -
asyncio.get_event_loop()
:函数获取当前事件循环。 -
asyncio.set_event_loop(loop)
:函数将loop
设置为当前 OS 线程的当前事件循环。 -
asyncio.new_event_loop()
:函数创建一个新的事件循环。 -
loop.run_until_complete(future)
:函数运行直到future (Future 的实例)
被完成。 -
loop.run_forever()
:函数运行事件循环直到stop()
被调用。 -
loop.stop()
:函数停止事件循环。 -
loop.is_running()
:函数返回True
如果事件循环当前正在运行。 -
loop.is_closed()
:函数如果事件循环已经被关闭,返回 True 。 -
loop.close()
:函数关闭事件循环。 -
loop.create_future()
:函数创建一个附加到事件循环中的asyncio.Future
对象。 -
loop.create_task(coro, *, name=None)
:函数安排一个协程
的执行。返回一个 Task 对象。 -
loop.set_task_factory(factory)
:函数设置一个 task 工厂 , 被用于loop.create_task()
。 -
loop.get_task_factory()
:函数返回一个任务工厂,或者如果是使用默认值则返回 None。
1.5.3 async\await
async
关键字:
-
async
用于声明一个函数为异步函数
。这意味着该函数在执行过程中可能会遇到需要等待的操作(如I/O操作、网络请求、等待用户输入等),而不会阻塞整个程序的执行。 - 异步函数内部可以包含
await
表达式,用于等待其他异步操作完成。
await
关键字:
-
await
用于等待一个异步操作(即一个Future
、Task
、或其他可等待对象,如另一个异步函数)的完成。它只能在异步函数内部使用。 - 当
await
表达式被调用时,它会暂停当前异步函数的执行,直到等待的异步操作完成。在等待期间,事件循环会继续运行,并可能执行其他异步任务。 - 一旦等待的异步操作完成,
await
表达式会获取其结果(或捕获其抛出的异常),然后异步函数会从await
表达式之后继续执行。
挂起与恢复:
- 当
await
一个异步操作时,当前协程(即异步函数)会被挂起,控制权交还给事件循环。事件循环可以调度其他协程运行,包括正在等待的异步操作本身。 - 当异步操作完成时,事件循环会恢复挂起的协程,并从
await
表达式之后继续执行。
关于await
后面跟的函数的性质:
-
await
后面必须跟一个可等待对象,这通常是一个异步函数(用async def
定义)的调用结果,因为它会返回一个Future或Task
对象,这些对象是可等待的。 - 如果
await
后面跟的不是异步函数或可等待对象,那么Python解释器会抛出一个TypeError
,因为非异步函数或不可等待对象不能被await。 - 调用非异步函数时,如果该函数执行时间较长,那么它会阻塞当前线程(在
asyncio
中,这通常意味着阻塞当前协程的事件循环),直到函数执行完成。因此,在异步编程中,应该尽量避免在异步函数内部调用非异步的、可能阻塞的函数。
1.5.4 asyncio基本操作
1.5.4.1 asyncio协程对象
使用async
定义一个协程对象,并创建一个事件循环对象
import asyncio
#定义协程对象
async def get_request(url):
print("正在请求的url是:",url)
print('请求成功的url:',url)
return url
#得到协程对象
coroutine_obj=get_request('www.baidu.com')
#创建一个事件循环对象
loop=asyncio.get_event_loop()
#将协程对象注册到loop中,并启动loop
loop.run_until_complete(coroutine_obj)
loop.close()
1.5.4.2 task对象
task对象需要loop对象基础上建立起来
import asyncio
#定义协程对象
async def get_request(url):
print("正在请求的url是:",url)
print('请求成功的url:',url)
return url
#得到协程对象
coroutine_obj=get_request('www.baidu.com')
#创建一个事件循环对象
loop=asyncio.get_event_loop()
#基于loop创建了一个task对象
task=loop.create_task(coroutine_obj)
print(task)
#基于loop注册任务
loop.run_until_complete(task)
print(task)
loop.close()
1.5.4.3 future对象
主要函数:
-
asyncio.Future(*, loop=None)
:函数是一个 Future 代表一个异步运算的最终结果。线程不安全。 -
asyncio.isfuture(obj)
:函数用来判断如果 obj 为一个asyncio.Future
类的示例、 asyncio.Task 类的实例或者一个具有_asyncio_future_blocking
属性的对象,返回 True。 -
asyncio.ensure_future(obj, *, loop=None)
:函数创建新任务。 -
asyncio.wrap_future(future, *, loop=None)
:函数将一个concurrent.futures.Future
对象封装到asyncio.Future
对象中。
Future
对象相关函数:
-
fut.result()
:函数返回 Future 的结果。 -
fut.set_result(result)
:函数将 Future 标记为 完成 并设置结果。 -
fut.set_exception(exception)
:函数将 Future 标记为 完成 并设置一个异常。 -
fut.done()
:函数如果 Future 为已 完成 则返回 True 。 -
fut.cancelled()
:函数是如果 Future 已取消则返回 True -
fut.add_done_callback(callback, *, context=None)
:函数添加一个在 Future 完成 时运行的回调函数。 -
fut.remove_done_callback(callback)
:函数从回调列表中移除 callback 。 -
fut.cancel()
:函数取消 Future 并调度回调函数。 -
fut.exception()
:函数返回 Future 已设置的异常。 -
fut.get_loop()
:函数返回 Future 对象已绑定的事件循环。
future对象与task对象不同的是创建基于asyncio空间来创建的
import asyncio
#定义协程对象
async def get_request(url):
print("正在请求的url是:",url)
print('请求成功的url:',url)
return url
#得到协程对象
coroutine_obj=get_request('www.baidu.com')
#创建一个事件循环对象
loop=asyncio.get_event_loop()
#基于loop创建了一个task对象
future=asyncio.ensure_future(coroutine_obj)
print(future)
loop.run_until_complete(future)
print(future)
loop.close()
或者示例:
import sys
import asyncio
import time
# 一个对future进行赋值的函数
async def slow_operation(future, num):
await asyncio.sleep(1)
# 给future赋值
future.set_result('Future'+ str(num) +' is done!')
def main():
loop = asyncio.get_event_loop()
# 创建一个future
future1 = loop.create_future()
# 使用ensure_future 创建Task
asyncio.ensure_future(slow_operation(future1, 1))
future2 = loop.create_future()
asyncio.ensure_future(slow_operation(future2, 2))
# gather Tasks,并通过run_uniti_complete来启动、终止loop
loop.run_until_complete(asyncio.gather(future1, future2))
print(future1.result())
print(future2.result())
loop.close()
if __name__ == "__main__":
main()
1.5.4.4 绑定回调
在使用task
或者future
绑定回调时,需要先定义回调函数
回调函数中返回的result
方法就是任务对象
中封装的协程对象
对应的函数返回值
注意:
回调函数必须有返回值,不然result
方法就没有值
def callback_func(task):
print(task.result())
在使用task
或者future
绑定回调时,都可以使用方法绑定task.add_done_callback(callback_func)
import asyncio
#定义协程对象
async def get_request(url):
print("正在请求的url是:",url)
print('请求成功的url:',url)
return url
#得到协程对象
coroutine_obj=get_request('www.baidu.com')
loop=asyncio.get_event_loop()
future=asyncio.ensure_future(coroutine_obj)
#把回调函数绑定到任务对象中
future.add_done_callback(callback_func)
loop.run_until_complete(future)
loop.close()
1.5.4.5 异步多任务
在一个异步函数中,可以不止一次挂起,也就是可以用多个await
多任务时,对于run_until_complete
方法需要这样用asyncio.wait()
方法处理:loop.run_until_complete(asyncio.wait(task_list))
代码示例:
import time
import asyncio
async def get_request(url):
print("正在请求的url是:",url)
#在异步协程中如果出现了同步模块相关代码,那么就无法实现异步
# time.sleep(2)
#当在asyncio中遇到阻塞操作就必须进行手动挂起
await asyncio.sleep(2)
print('请求成功的url:',url)
start_time=time.time()
urls=['www.baidu.com','www.sogou.com','www.goubanjia.com']
#任务列表
task_list=[]
for url in urls:
coroutine_obj=get_request(url)
future=asyncio.ensure_future(coroutine_obj)
task_list.append(future)
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
loop.close()
print(time.time()-start_time)
1.5.4.6 asyncio.gather和asyncio.wait 区别
asyncio.gather
和asyncio.wait
都是asyncio
库中用于处理异步任务(协程)的重要函数,但它们之间存在一些关键的区别。以下是这两个函数之间的主要差异:
- 返回值
-
asyncio.gather
:该函数并发运行多个协程,并等待它们全部完成。它返回一个Future
对象,当所有协程都完成时,这个Future
对象的结果是一个包含所有协程返回值的列表(列表的顺序与传入的协程列表相同)。如果任何一个协程抛出异常,则gather
会立即取消所有其他仍在运行的协程,并重新抛出那个异常。 -
asyncio.wait
:该函数也是用于并发运行多个协程,但它返回的是一个包含两个集合的元组:done和pending
。done是一个已完成的协程(Future或Task)列表,而pending
是一个尚未完成的协程列表。这意味着需要从done
列表中的每个协程中调用result()
方法来获取结果,而pending
列表中的协程则需要你进一步处理或等待。
-
- 使用场景
-
asyncio.gather
:当需要同时运行多个协程并立即获取它们的结果时,gather
是一个很好的选择。它简化了结果收集的过程,并允许同步的方式处理异步操作的结果。 -
asyncio.wait
:当需要更细粒度的控制时,比如在某个超时后取消未完成的协程,或者要分别处理已完成和未完成的协程时,wait
是一个更合适的选择。然而,使用wait
需要更多的手动操作来收集结果或处理未完成的协程。
-
- 额外功能
-
asyncio.gather
:除了基本的并发执行和结果收集功能外,gather
还支持一些额外的参数,如return_exceptions
(默认为False),当设置为True
时,如果协程抛出异常,异常会被捕获并作为结果列表中的一项返回,而不是中断整个gather操作。 -
asyncio.wait
:wait
函数也接受一些参数,如timeout(指定等待时间,默认为None,表示无限期等待)和return_when
(指定在什么条件下返回,如FIRST_COMPLETED、ALL_COMPLETED等)。这些参数提供了对协程执行过程的更多控制。
-
以下是使用asyncio.gather和asyncio.wait的示例代码:
import asyncio
async def task(name, delay):
print(f"{name} started")
await asyncio.sleep(delay)
return f"{name} finished"
async def main():
# 使用 asyncio.gather
tasks = [task("A", 1), task("B", 2), task("C", 3)]
results = await asyncio.gather(*tasks)
print(results) # 输出: ['A finished', 'B finished', 'C finished']
# 使用 asyncio.wait
tasks = [task("D", 1), task("E", 2), task("F", 3)]
done, pending = await asyncio.wait(tasks)
for d in done:
print(d.result()) # 分别打印每个已完成协程的结果
# 处理 pending 列表中的协程(这里只是打印出来)
for p in pending:
print(f"Pending: {p}")
asyncio.run(main())