概念
协程又称微线程,是一种轻量级线程,其拥有自己的寄存器上下文和栈,所以在调度切换时,协程能保存上一次上一次调用的状态。因为协程是跑在线程里的,所以和线程一样也无法利用多核资源,但是又因为他小,所以可以开很多的协程,其在IO操作密集的地方有着很大的优势,所以这在爬虫、并发处理时的效率很高
如果把线程比作一个要执行很多函数的python程序,那么协程就像是给程序里的函数赋予一旦遇到阻塞就可以跳转到别的任务中运行的能力,并且将来能够回到原来函数里上次离开的地方继续执行(从这里可以看出协程的实现离不开生成器),从而在单个程序里减少了不少时间的浪费,而且因为是在单个线程里,所以函数之间的切换是没有开销的,这也就大大提高了单个线程的使用效率
协程也可以理解为有多个入口的函数、可以暂停的函数,并且可以向暂停的地方传入值(可以暂停,这可以联想到生成器),例如我们可以通过生成器实现切换调用函数,举例:
def aaa():
print(1)
yield
print(2)
yield
def bbb():
print(3)
yield
print(4)
yield
li = [aaa(), bbb()]
while li:
task = li.pop(0)
try:
next(task)
li.append(task)
except:
pass
# 1
# 3
# 2
# 4
协程特点
- 操作系统不可见
- 本质是单线程,多个任务在一个线程上来回切换
- 通过规避IO操作,达到一条线程中的IO操作降到最低的目的
协程框架
协程的编码比多线程、多进程的编码要复杂些,因为其不像线程和进程是由系统来调度的,协程由程序员来调度,因此才需要一些协程框架来帮我们在实现协程的同时,代码也不会十分复杂
greenlet
封装了调度切换的方法,使我们能够手动自定义地进行任务调度
gevent
基于greenlet,能够自动帮我们规避IO操作,但该框架有个很大的问题就是使用了猴子补丁,改写了很多内置模块的方法,可能导致原有的一些功能受影响
asyncio
python3.4以后新增的模块,基于生成器语法完成切换,并且能够自动规避IO操作,并且其在实现自动帮我们调度的同时,也能让我们像写同步代码那样写异步代码,是目前异步IO实现最常用的模块,是实现像go、node那样高并发的基础
greenlet
可以通过greenlet
下的greenlet
对象来管理协程的调度切换,其中我们可以使用其提供switch
方法来进行协程的手动切换,举例:
import greenlet
def run1():
print("12")
t2.switch() #切换到t2协程中执行
print("34")
t2.switch()
def run2():
print("56")
t1.switch() #切换到t1协程中执行
print("78")
t1 = greenlet.greenlet(run1) #生成协程对象
t2 = greenlet.greenlet(run2)
t1.switch() #切换到t1协程中执行
结果:
12
56
34
78
从结果可以看出协程的来回切换效果
gevent
gevent
在greenlet
模块的基础上帮我们实现了协程的自动调度,通过spawn
来定义协程,并将所有协程加入到joinall
里(只在这几个协程间切换),而添加到协程任务中的函数中如果遇到IO操作时,就会自动帮我们切换调度其他任务,举例:
import time
import gevent
def run1():
print("run1 start")
gevent.sleep(2) # gevent实现的异步sleep方法
print("run1 end")
def run2():
print("run2 start")
gevent.sleep(2)
print("run2 end")
def run3():
print("run3 start")
gevent.sleep(2)
print("run3 end")
start = time.time()
gevent.joinall(
[
gevent.spawn(run1), #spawn里还有第二个参数是传给函数的参数
gevent.spawn(run2),
gevent.spawn(run3)
]
)
print("cost time:", time.time() - start)
# run1 start
# run2 start
# run3 start
# run1 end
# run3 end
# run2 end
# cost time: 2.00173282623291
可以看出总体只花费了2s就完成了任务,并且是在单线程的情况下
IO切换场景协程
前面的例子中通过gevent
里定义的IO处理方法来实现协程的调度,但IO操作的情况有很多,如果我们的代码中存在很多IO操作,并且希望在不改变原本代码的情况下,可以在程序开头调用monkey.patch_all()
,举例:
from gevent import monkey
monkey.patch_all()
该方法通过猴子补丁,帮我们内置的一些IO相关方法都改写成支持协程的类型,从而实现自动调度
猴子补丁
gevent
实现自动协程调度,往往需要我们在程序中加上monkey.patch_all()
,该方法实际上就是使用了猴子补丁(通过重写某些原本的代码来实现一些对应的功能)来替换了一些相关的内置模块方法,从而能够以协程方式运行代码,通过下面代码就可以看出在使用猴子补丁以后,相关的socket
方法就被改写了:
from gevent import monkey
import socket
print(socket.socket)
monkey.patch_all()
print(socket.socket)
# 结果:
# <class 'socket.socket'>
# <class 'gevent._socket3.socket'>
由于猴子补丁会修改原本的方法,因此在一些其他使用到相关方法的地方也可能会受到影响,这点需要注意
注:
patch_all
是对几乎所有IO操作添加补丁(可以查看源码只对指定的操作添加补丁),而线程的补丁会导致多线程的阻塞,因此建议使用monkey.patch_all(thread=False)
,即不对线程添加猴子补丁
gevent实现并发爬虫
import time,requests
import gevent
from gevent import monkey
def run(url):
headers = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"
}
res = requests.get(url,headers)
print("获取内容长度:",len(res.text))
# print(res.text)
monkey.patch_all() #相当于给程序里所有IO操作打上标记,让其自动切换
start_time = time.time()
gevent.joinall( #协程爬虫
[
gevent.spawn(run,"https://www.baidu.com"),
gevent.spawn(run,"https://www.baidu.com"),
gevent.spawn(run,"https://www.baidu.com"),
gevent.spawn(run,"https://www.baidu.com"),
gevent.spawn(run,"https://www.baidu.com"),
gevent.spawn(run,"https://www.baidu.com"),
]
)
# url = [
# "https://www.baidu.com",
# "https://www.baidu.com",
# "https://www.baidu.com",
# "https://www.baidu.com",
# "https://www.baidu.com",
# "https://www.baidu.com"
# ]
# for each in url: #可以对比协程爬虫和同步爬虫之间速度上的差距
# run(each)
print(time.time() - start_time)
gevent实现并发Socket
- 服务端:
import socket, gevent
import os, time
from gevent import monkey
def run(conn):
try:
while True:
data = conn.recv(1024)
print(data)
conn.send(data)
except Exception as ex:
print(ex)
finally:
conn.close()
monkey.patch_all() #实现自动判断IO切换
server = socket.socket()
server.bind(('localhost',6969))
server.listen(5)
print("start...")
while True:
conn, addr = server.accept()
gevent.spawn(run, conn) #每当有新连接就创建一个新的协程
- 客户端:
import socket
client = socket.socket()
client.connect(('localhost',6969))
while True:
msg = input()
if len(msg) == 0:continue
client.send(msg.encode('utf-8'))
while True:
data = client.recv(1024)
if len(data) < 1024:
print(data.decode())
break
client.close()
async/await
async
/await
是python3.4提供的新的关键字,用于实现原生协程语法,协程本质是利用了生成器来完成,而该语法本质上也是利用了生成器,只是为了更好地分辨函数的功能为协程还是生成器而设计的,从而使语义更加明确,语法举例:
async def cor_fun(n):
return n
async def get_n(n):
res = await cor_fun(n)
print(res)
return res
cor = get_n(10)
print(cor.send(None))
-
async
用于声明一个实现协程的函数 -
await
相当于yield from
语法,但后面必须跟一个awaitable
对象
注:
之所以引入该语法是因为生成器可以作为生成器,也可以实现协程,从而导致语法看起来十分混乱,因此引入该语法将协程和生成器区分开。此时async
函数就不能是生成器,比如在async
函数里写yield
就会报错:
async def cor_fun(n):
yield n
# async函数必须为协程,不能使用生成器的yield语法
async def get_n(n):
res = await cor_fun(n)
return res
cor = get_n(10)
print(cor.send(None))
# TypeError: object async_generator can't be used in 'await' expression
而await
也只能在声明async
的函数里才能够使用
awaitable对象
await
后面接收的必须是awaitable
对象,因此必须实现__await__
魔法方法才行,该对象接口如下:
class Awaitable(metaclass=ABCMeta):
__slots__ = ()
@abstractmethod
def __await__(self):
yield
@classmethod
def __subclasshook__(cls, C):
if cls is Awaitable:
return _check_methods(C, "__await__")
return NotImplemented
生成器不能本身不是awaitable
对象,所以不能加到await
语法后面,但可以使用types
模块下的coroutine
函数将生成器包装成awaitable
对象,举例:
import types
# 将生成器包装成协程
@types.coroutine
def cor_fun(n):
yield n
async def get_n(n):
res = await cor_fun(n)
return res
cor = get_n(10)
print(cor.send(None))
async/await参考
asyncio
python3.4提供的新模块,其实现了异步IO编程最核心的部分,其内部重写了大部分IO操作,使其能够支持协程,简单示例:
import asyncio
import time
async def do():
await asyncio.sleep(2)
# 需要使用asyncio实现的支持协程的sleep方法
start = time.time()
loop = asyncio.get_event_loop()
# 事件循环
loop.run_until_complete(do())
# 监听任务
print(time.time() - start)
# 2.0026493072509766
注:
协程三要点:事件循环+回调(驱动生成器)+epoll(IO多路复用)
asyncio相关API
wait
将一个任务队列转成future
对象,该对象可以传入事件循环当中进行监听,举例:
import asyncio
import time
async def do():
await asyncio.sleep(2)
start = time.time()
tasks = [do() for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
# 传入一批任务队列
print(time.time() - start)
# 2.0036449432373047
gather
和wait
方法类似,也是将一批任务转成future
对象,举例:
import asyncio
import time
async def do():
await asyncio.sleep(2)
start = time.time()
loop = asyncio.get_event_loop()
task = loop.create_task(do())
loop.run_until_complete(asyncio.gather(*[task]))
# 使用gather等待
print(time.time() - start)
# 2.003610372543335
gather
/wait
区别
- 传入格式:
wait
传入一个可迭代对象,gather
依次传入多个任务 - 功能:
gather
相比于wait
,功能更加丰富,其能够进行任务分组,举例:
from functools import partial
import asyncio
import time
async def do(i):
await asyncio.sleep(2)
start = time.time()
loop = asyncio.get_event_loop()
g1 = [do(i) for i in range(2)]
g2 = [do(i) for i in range(2, 4)]
g1 = asyncio.gather(*g1)
g2 = asyncio.gather(*g2)
loop.run_until_complete(asyncio.gather(g1, g2))
# 将任务分为g1、g2两组
print(time.time() - start)
# 2.002645492553711
事件循环对象相关API
create_task
返回一个Task
对象,该对象能够之间添加到事件循环当中监听,举例:
import asyncio
import time
async def do():
await asyncio.sleep(2)
start = time.time()
loop = asyncio.get_event_loop()
task = loop.create_task(do())
# 返回一个Task对象,继承于Future类
print(type(task), type(task).__bases__)
loop.run_until_complete(task)
print(time.time() - start)
# <class '_asyncio.Task'> (<class '_asyncio.Future'>,)
# 2.002613067626953
run_forever
启动协程,并不停地执行协程
run_until_complete
启动协程,并在运行完指定任务后停止
call_soon
立即执行任务,举例:
import asyncio
def sleep(n):
print("sleep {}s done".format(n))
def stop(loop):
loop.stop()
loop = asyncio.get_event_loop()
loop.call_soon(sleep, 2)
loop.call_soon(stop, loop)
loop.run_forever()
# sleep 2s done
call_later
等待一定时间后执行任务,举例:
import asyncio
def sleep(n):
print("sleep {}s done".format(n))
def stop(loop):
loop.stop()
loop = asyncio.get_event_loop()
loop.call_later(2, sleep, 2)
# 2s后执行
loop.call_later(3, stop, loop)
# 3s后执行
loop.run_forever()
# sleep 2s done
call_at
在指定时间执行任务(以当前时间戳为基准,call_later
底层就是调用该方法):
import asyncio
def sleep(n):
print("sleep {}s done".format(n))
def stop(loop):
loop.stop()
loop = asyncio.get_event_loop()
loop.call_at(loop.time() + 2, sleep, 2)
# 2s后执行
loop.call_at(loop.time() + 3, stop, loop)
# 3s后执行
loop.run_forever()
# sleep 2s done
call_soon_threadsafe
在保证线程安全的前提下执行任务
Task对象相关API
result
获取任务执行结果
add_done_callback
任务执行完毕的回调函数,举例:
import asyncio
import time
async def do():
await asyncio.sleep(2)
def callback(future):
print(future, "callback")
start = time.time()
loop = asyncio.get_event_loop()
task = loop.create_task(do())
task.add_done_callback(callback)
# 设置任务完成回调
loop.run_until_complete(task)
print(time.time() - start)
# <Task finished coro=<do() done, defined at xxx.py:xxx> result=None> callback
# 2.002718210220337
由于回调默认只传入一个future
参数,如果我们想要传入更多参数,可以使用functools
模块下的partial
方法,举例:
from functools import partial
import asyncio
import time
async def do():
await asyncio.sleep(2)
def callback(param, future):
print(param)
start = time.time()
loop = asyncio.get_event_loop()
task = loop.create_task(do())
task.add_done_callback(partial(callback, "param"))
# 多传入一个param参数
loop.run_until_complete(task)
print(time.time() - start)
# param
# 2.0026185512542725
done
判断任务是否完成
cancel
取消某个任务
all_tasks
Task
类提供的静态方法,可以获取所有任务,举例:
import asyncio
import time
async def do(i):
print("{} start...".format(i))
await asyncio.sleep(i)
print("{} end...".format(i))
start = time.time()
loop = asyncio.get_event_loop()
g1 = [do(i) for i in range(2)]
g2 = [do(i) for i in range(2, 4)]
g1 = asyncio.gather(*g1)
g2 = asyncio.gather(*g2)
try:
loop.run_until_complete(asyncio.gather(g1, g2))
except KeyboardInterrupt:
# 键盘强制中断时,获取所有任务,并取消
tasks = asyncio.Task.all_tasks()
for task in tasks:
print(task.cancel())
loop.stop()
loop.run_forever()
# 停止协程以后需要再启动协程,否则关闭的时候会报错
finally:
loop.close()
# 使用gather等待
print(time.time() - start)
# 0 start...
# 1 start...
# 2 start...
# 3 start...
# 0 end...
# 1 end...
# 2 end...
# False
# False
# False
# True
# 3.011915445327759
协程锁
asyncio
下提供了协程锁,举例:
import asyncio
lock = asyncio.Lock()
# 协程锁
async def get(url):
async with lock:
# with await lock:
# 两种写法或者自己await lock.acquire()都行
path = "/"
if "/" in url:
url, path = url.split("/")
if not path:
path = "/"
host, port = url.split(":")
reader, writer = await asyncio.open_connection(host, int(port))
# 使用asyncio提供的connection
writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf-8"))
data = b""
async for raw_line in reader:
data += raw_line
data = data.decode("utf-8")
return data
其他如条件变量、信号量等都有提供的协程的接口,因为是单线程,所以Rlock
没有提供
协程通信
可以使用asyncio
提供的队列,举例:
import asyncio
async def get(queue):
res = await queue.get()
print(res)
async def set(queue, n):
await asyncio.sleep(n/10)
await queue.put(n)
queue = asyncio.Queue(10)
loop = asyncio.get_event_loop()
tasks = []
g1 = asyncio.gather(*[set(queue, i) for i in range(5)])
g2 = asyncio.gather(*[get(queue) for i in range(5)])
loop.run_until_complete(asyncio.gather(g1, g2))
# 0
# 1
# 2
# 3
# 4
协程中提供的队列和普通list、队列区别就是他可以限制协程里的最大长度,如果不需要协程限流,直接用普通的list就行
线程池中执行协程
通过run_in_executor
方法,可以将线程池的任务添加到协程当中,举例:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def sleep(n):
print("sleep {}s done".format(n))
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor()
tasks = []
for i in range(5):
task = loop.run_in_executor(executor, sleep, i)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
loop.run_forever()
# sleep 0s done
# sleep 1s done
# sleep 2s done
# sleep 3s done
# sleep 4s done
asyncio模拟http请求
import asyncio
import socket
async def get(url):
path = "/"
if "/" in url:
url, path = url.split("/")
if not path:
path = "/"
host, port = url.split(":")
reader, writer = await asyncio.open_connection(host, int(port))
# 使用asyncio提供的connection
writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf-8"))
data = b""
async for raw_line in reader:
data += raw_line
data = data.decode("utf-8")
return data
async def main():
tasks = []
for i in range(5):
url = "127.0.0.1:5000/"
tasks.append(get(url))
for task in asyncio.as_completed(tasks):
# 一旦完成就输出
res = await task
print(res)
import time
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print(time.time() - start)
包装成Task
对象:
async def main():
tasks = []
for i in range(5):
url = "127.0.0.1:5000/"
tasks.append(asyncio.ensure_future(get(url)))
# 包装成task对象
for task in asyncio.as_completed(tasks):
res = await task
print(res)
协程实现原理
协程并不是让本来阻塞的方法不阻塞,而是重新实现了和会阻塞的方法功能差不多,且支持协程的方法,例如time.sleep
本身是阻塞的,但是我们可以结合生成器实现不阻塞的sleep
:
import time
def async_sleep(n):
yield time.time() + n
# 返回结束时间
def sleep(n):
# 外面包一层sleep函数是为了可以执行其他内容,当然在async_sleep里也可以执行
print(f"{n} start...")
yield from async_sleep(n)
print(f"{n} end...")
def sleep_loop(*tasks):
# 通过事件循环监听所有异步睡眠任务
async_di = {}
# 存放异步睡眠任务
for task in tasks:
ret = next(task)
# 获取结束时间
async_di[ret] = task
while async_di:
min_time = min(async_di)
# 监听获取最短的睡眠时间
time.sleep(max(min_time - time.time(), 0))
# 睡最短的时间,然后继续执行该任务
try:
next(async_di[min_time])
except StopIteration:
pass
del async_di[min_time]
main_start = time.time()
tasks = [sleep(i) for i in range(1, 3)]
sleep_loop(*tasks)
# 循环监听所有的睡眠任务
print(f"main time:{time.time() - main_start}")
# 1 start...
# 2 start...
# 1 end...
# 2 end...
# main time:2.0016887187957764
可以看出协程并不是让原本阻塞的方法不阻塞,而是程序员对可能阻塞的代码进行修改调度来规避阻塞,例如上面的sleep
方法,在真正sleep
的时候还是阻塞的,只是我们一旦遇到sleep
的阻塞代码,就存入异步队列,并且每次只执行最短的sleep
,且在执行完以后立刻执行其他的内容,然后再去继续找最短的sleep
代码执行
asyncio参考:
https://www.liaoxuefeng.com/wiki/1016959663602400/1017970488768640
https://www.cnblogs.com/wupeiqi/p/12834355.html
进程/线程/协程对比
- 进程:数据隔离,数据不安全,操作系统级别,开销很大,能利用多核
- 线程:数据共享,数据不安全,操作系统级别,开销小,不能利用多核,一些文件相关的IO操作只有操作系统能感知到
- 协程:数据共享,数据安全,用户级别,开销很小,不能利用多核,协程的切换全部基于用户,只有用户级别能够感受到的IO操作才会用协程来切换规避(如sleep、网页请求等)
用户级别的协程优势:
- 减轻操作系统的负担
- 一个线程如果开了多个协程,给操作系统的印象就是线程很忙,因此也会多分配一些时间片执行,从而提高效率
异步web框架
aiohttp
asyncio
没有实现http请求的协程处理,而aiohttp
就是在此基础上实现了http
的协程
sanic
基于asyncio
实现的高并发web服务器
tornado
由于兼容python2/3,所以基于生成器实现的高并发处理