参考文章: 微信文章
原文章写得很精彩,但有些代码还是可以优化下的。而且这文章一直只有上篇,可惜了。
理论知识可以参考原文。接下来按个人见解,从代码角度进行解析:
- 基础知识
- 编写原生asyncio的使用方法,制定模拟方向
- 编写阻塞代码,了解socket
- 编写非阻塞socket代码,发现问题
- 编写异步回调代码
- 编写异步协程代码
基础知识:
- 计算机资源:常分为CPU资源、内存资源、硬盘资源和网络资源
- 进程阻塞:正在运行的程序,由于自身某个模块需要使用硬盘或网络I/O资源等,而系统又未及时响应,导致进程处于待机状态,直至等待事件作出回应后才会被唤醒。
- 进程非阻塞:同理,在获取某些资源时,不会等待结果响应,而是继续处理其他模块。
我们以socket为例,如下可获取阻塞与非阻塞两种编程
import socket
sock = socket.socket()
socket.setblocking(True) # 默认就是阻塞。即套接字 建立连接/发送请求/接受请求 的时候,是阻塞的。
socket.setblocking(False) # 设置为非阻塞,即上述请求过程不会阻塞,而是继续处理其他模块。
编写原生asyncio的使用方法,制定模拟方向
让我们立即来使用原生asyncio编写异步程序:
在此代码中,我们需要注意几个关键点,使我们想要实现的。
-
loop = asyncio.get_event_loop()
开启事件循环,异步"任务"将在此循环执行 -
asyncio.create_task()
将一个协程包装成一个"任务"排入期程异步执行 -
asyncio.gather()
同步执行"任务"
可以参考python-asyncio文档
import asyncio
import aiohttp
import time
loop = asyncio.get_event_loop()
async def fetch():
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get('http://www.baidu.com') as response:
print(await response.read())
async def multi_fetch():
await asyncio.gather(*[asyncio.create_task(fetch()) for _ in range(10)])
if __name__ == '__main__':
start = time.time()
loop.run_until_complete(fetch()) # 执行一次
# loop.run_until_complete(multi_fetch()) # 执行十次
print(time.time() - start)
可以发现,执行十次的时间大概和执行一次的时间差不多。那么说明异步执行是成功的。
接下来我们就来一步一步的实现上述几个关键点,实现手写自己的异步程序
编写阻塞代码,了解socket
首先得实现一个阻塞程序,以socket为例。此例子比较简单,大致看一下即可。
import socket
import time
def blocking_socket(response=b''):
sock = socket.socket() # 默认为阻塞连接
sock.connect(('www.baidu.com', 80)) # 建立百度TCP连接
sock.send(b'GET / HTTP/1.0\r\n\r\n') # 发送HTTP协议
chunk = sock.recv(1024) # 接收数据
while chunk:
response += chunk
chunk = sock.recv(1024)
return response
def mutil_blocking_socket():
return [blocking_socket() for _ in range(10)].__len__()
if __name__ == '__main__':
start = time.time()
blocking_socket() # 执行一次
# mutil_blocking_socket() # 执行十次
print(time.time() - start)
用时的话,我们可以明显看出blocking_socket() 用时大致0.07s
而multi_blocking_socket() 用时大致0.7s,刚好是10倍左右。
编写非阻塞socket代码,发现问题
接着我们来实现非阻塞程序,仍然使用socket来实现
代码与刚刚有部分差异,主要就是多了一些while + try + except
import socket
import time
def no_blocking_socket(response=b''):
sock = socket.socket()
sock.setblocking(False) # 设置非阻塞连接
try:
sock.connect(('www.baidu.com', 80))
except BlockingIOError: # 非阻塞式建立连接在此处会报错,捕获忽略即可
pass
while True:
try:
sock.send(b'GET / HTTP/1.0\r\n\r\n') # 发送HTTP协议
break
except OSError: # 此处会报错。因为套接字与百度服务器的TCP连接还没有建立,
# 但是套接字却是非阻塞的,故在未建立连接的情况下发送协议会报错,此处捕获此异常
continue
while True:
try:
chunk = sock.recv(1024)
while chunk:
response += chunk
chunk = sock.recv(1024)
return response
except OSError: # 同理,对方服务器未接收HTTP协议,不会返回数据,即使返回,也有时延。故此时接收数据会报异常,此处捕获此异常
pass
def multi_no_blocking_socket():
return [no_blocking_socket() for _ in range(10)].__len__()
if __name__ == '__main__':
start = time.time()
no_blocking_socket() # 执行一次
# multi_no_blocking_socket() # 执行十次
print(time.time() - start)
来比较时间,我们会惊讶的发现,no_blocking_socket() 执行一次大致0.07s
而multi_no_blocking_socket() 执行十次,大致是0.7s....
没错,花费时间和阻塞编程是一样的。我们的非阻塞编程并没有达到实际的效果。
我们来分析一下,为什么会造成这种现象。我们发现这套非阻塞代码与之前的阻塞代码相比,
多了三处 try: ... except: ...
多了两处 while True: ...
代码确实是非阻塞编程,也就是程序不会在网络IO模块处阻塞,但是程序也没有把空闲下来的时间花在"正确"的地方
在非阻塞的情况下,CUP把空闲下来的时间不停的去while + try + except
试错,也就是程序会不停的尝试发送协议,直到发送成功。不停的尝试接收数据,直到接收成功。
这样就导致我们的非阻塞编程,实际上和阻塞编程是一样的,唯一要说不同。就是非阻塞程序中的CPU可能会 "忙一点"。
编写异步回调代码
那么我们可以注意到了问题的关键,就是我们不知道什么时候程序是 "准备就绪了",也就是什么时候可以发送协议(套接字可写),什么时候可以接收数据(套接字可读)。
其实操作系统已经帮我们实现了,它将事件的 I/O 都封装成了事件,包括可读事件/可写事件。
那么我们就可以立即想到,最好的方法就是 当我们的套接字状态发生改变,就能够立即执行接下来的读写步骤。
所以此处,我们就需要 "回调"!
这里说明下python中selectors模块,用于注册事件的回调
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
sock = socket.socket() # 获取套接字
selector = DefaultSelector() # 获取selector对象
selector.register(sock.fileno(), EVENT_WRITE, on_send) # fileno()获取当前socket套接字的文件描述符,并绑定事件EVENT_WRITE,回调函数为on_send
selector.unregister(sock.fileno()) # 注销事件绑定
selector.register(sock.fileno(), EVENT_READ, on_recv) # 同理,绑定事件EVENT_READ,回调函数为on_recv
while True:
events = selector.select() # 此处的events是操作系统返回的事件,也就是我们绑定的事件被触发了,
# 此处是阻塞获取的。也就是用一个事件循环的阻塞,来代替我们的while True: ...
for sock, mask in events:
sock.data() #sock.data为绑定的回调函数,也就是上面的on_send和on_recv
故我们就不需要自己手动while True + try...except
来监控事件状态的改变,转而将这件工作交给selector
的事件循环。此处我们将其命名为loop
。
接下来就是回调编程了
import socket
import time
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector()
stop_loop = 10
class Crawler:
def __init__(self, flag=10):
self.flag = flag
self.sock = None
self.response = b''
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('www.baidu.com', 80))
except BlockingIOError:
pass
# fileno()获取当前socket套接字的文件描述符,并绑定事件回调
selector.register(self.sock.fileno(), EVENT_WRITE, self.on_send)
def on_send(self):
selector.unregister(self.sock.fileno())
self.sock.send(b'GET / HTTP/1.0\r\n\r\n')
selector.register(self.sock.fileno(), EVENT_READ, self.on_recv)
def on_recv(self):
chunk = self.sock.recv(1024)
if chunk:
self.response += chunk
else:
global stop_loop
stop_loop -= self.flag
selector.unregister(self.sock.fileno())
def loop(): # 事件循环,由操作系统返回那个事件发生了,对应执行那些事件的回调。
while stop_loop:
events = selector.select()
for sock, mask in events:
sock.data()
if __name__ == '__main__':
start = time.time()
Crawler(10).fetch() # 执行一次
# [Crawler(1).fetch() for _ in range(10)] # 执行十次
loop()
print(time.time() - start)
我们来捋一下代码执行的流程:
1、首先实例化一个Crawler对象,然后执行此实例的fetch
方法
2、fetch方法发起了与百度服务器的连接,然后注册了回调函数。
3、此时会走到loop()函数来,执行事件循环。直到我们与对方服务器的连接建立成功,则此时OS会返回事件,我们则执行对应的回调事件 sock.data()
,对应 self.on_send()
4、执行on_send
,首先注销上一个事件,然后发送协议,再接着注册可读事件。继续进入等待
5、此时继续进入loop事件循环,直到触发注册事件,执行回调函数on_recv
,由于一次只接收1024,故可能会接收多次,也就是会触发多次on_recv事件的回调,直接接收完成。
6、接收完成,我们令全局变量stop_loop - flag
,来停止loop事件循环。程序结束。
然后我们来看下时间,我们会发现执行一次的时间和执行十次的时间基本是差不多的。说明我们编写的程序是没有问题的。
回调式异步编程,成功!
编写异步协程代码
至此,我们已经实现了回调式异步编程,但是我们思考下第一个例子,是基于协程的异步编程,故我们现在来调整代码,编写协程。
import socket
import time
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector()
stop_loop = 10
class Future:
# 用于存放未来可能出现的数据,当出现时执行一次回调函数
# 此中的result仅作为一个中转,实际还是通过回调返回给协程
def __init__(self):
self.result = None
self.callback = None
def set_callback(self, func):
self.callback = func
def set_result(self, result):
self.result = result
self.callback(self) if self.callback else None
class Task:
# 用于启动协程,该类实例初始化时传入为协程对象,执行self.process方法
# 调用协程的send方法,启动协程,并最后绑定回调函数
def __init__(self, co_routine):
self.co_routine = co_routine
future = Future()
self.process(future)
def process(self, future):
try:
next_future = self.co_routine.send(future.result)
except StopIteration:
return
next_future.set_callback(self.process)
class Crawler:
def __init__(self, flag=10):
self.flag = flag
self.sock = None
self.response = b''
def fetch(self):
self.sock = socket.socket()
self.sock.setblocking(False)
try:
self.sock.connect(('www.baidu.com', 80))
except BlockingIOError:
pass
future = Future()
def _on_send():
future.set_result(None)
def _on_recv():
future.set_result(self.sock.recv(1024))
selector.register(self.sock.fileno(), EVENT_WRITE, _on_send)
yield future
selector.unregister(self.sock.fileno())
self.sock.send(b'GET / HTTP/1.0\r\n\r\n')
selector.register(self.sock.fileno(), EVENT_READ, _on_recv)
while True:
chunk = yield future # 在此处轮询EVENT_READ事件,直至所有数据加载完毕
if chunk:
self.response += chunk
else:
global stop_loop
stop_loop -= self.flag
return self.response
def loop():
while stop_loop:
events = selector.select()
for sock, mask in events:
sock.data()
if __name__ == '__main__':
start = time.time()
Task(Crawler(10).fetch()) # 传入协程fetch,使用Task实例化调用协程的send方法来启动协程
# [Task(Crawler(1).fetch()) for _ in range(10)] # 同理,启动十个协程任务
loop()
print(time.time() - start)
我们继续来整理下代码执行的流程:
1、首先实例化一些Crawler对象,调用调用此实例的fetch函数得到一个协程。此时协程是没有执行的。(协程需要send等触发才会执行)
2、将此协程装于Task用来创建任务实例,在任务中会主动触发协程的send函数来启动协程
3、此时协程已触发,注册事件 selector.register(self.sock.fileno(), EVENT_WRITE, _on_send)
,然后返回future
,此时协程到此暂停
4、返回的future会添加任务的回调函数,也就是self.precess()
。而loop也开始了事件轮询,当套接字的文件描述符状态变为可写状态时,触发回调方法_on_send
5、_on_send
方法执行Future中的set_result方法,此时在此方法中会调用一次future注册的回调函数,继续触发任务Task中协程的send方法,回到协程上次暂停的状态
6、回来后,首先注销事件EVENT_WRITE
。发送HTTP协议请求。再注册事件selector.register(self.sock.fileno(), EVENT_READ, _on_recv)
7、loop事件轮询,当套接字的文件描述符变为可读状态时,触发回调方法_on_recv
8、_on_recv
方法执行Future中的set_result
方法,此时在方法中会初始化result为sock.recv(1024)
的值,并执行注册的回调函数,将此结果继续传递至协程上回暂停的地方
9、由于一直没有注销事件EVENT_READ
,故会一直驱动事件轮询直至结束
10、Task、Future、Crawler、loop这四个就这么神奇的串联在一起了,不可思议的说。
不过这样写貌似不太好看,虽然感觉也可以,但是很多模块其实都是可以拆离开的
下面就是一个拆分版本,就不细细的分析流程啦
import socket
import time
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector()
stop_loop = 10
def fetch(sock):
sock.setblocking(False)
try:
sock.connect(('www.baidu.com', 80))
except BlockingIOError:
pass
future = Future()
def _on_send():
future.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, _on_send)
yield from future
selector.unregister(sock.fileno())
return future
def read(sock, future, flag, response=b''):
def _on_recv():
future.set_result(sock.recv(1024))
selector.register(sock.fileno(), EVENT_READ, _on_recv)
chunk = yield from future
while chunk:
response += chunk
chunk = yield from future
selector.unregister(sock.fileno())
global stop_loop
stop_loop -= flag
return response
def loop():
while stop_loop:
events = selector.select()
for sock, mask in events:
sock.data()
class Future:
def __init__(self):
self.result = None
self.callback = None
def set_callback(self, func):
self.callback = func
def set_result(self, result):
self.result = result
self.callback(self) if self.callback else None
def __iter__(self):
yield self
return self.result
class Task:
def __init__(self, co_routine):
self.co_routine = co_routine
future = Future()
self.process(future)
def process(self, future):
try:
next_future = self.co_routine.send(future.result)
except StopIteration:
return
next_future.set_callback(self.process)
class Crawler:
def __init__(self, flag):
self.flag = flag
def fetch(self):
sock = socket.socket()
future = yield from fetch(sock)
sock.send(b'GET / HTTP/1.0\r\n\r\n')
response = yield from read(sock, future, self.flag)
print(response)
if __name__ == '__main__':
start = time.time()
Task(Crawler(10).fetch())
# [Task(Crawler(1).fetch()) for _ in range(10)]
loop()
print(time.time() - start)
代码放到github上了:
https://github.com/CzaOrz/ioco/tree/master/open_source_project/异步教程学习/异步协程
Life is short. You need Python