基本概念
- 并发:一个时间段内,有多个程序在同一个CPU上运行(但任意时刻只有一个在CPU上运行);
- 并行:任意时间点上,有多个程序同时运行在多个CPU上;
- 同步:代码调用I/O操作时,必须等待I/O操作完成才返回;
- 异步:代码调用I/O操作时,不必等I/O完成就返回;
- 阻塞:调用函数时当前线程被挂起;
- 非阻塞:调用函数时当前线程不会被挂起(立即返回)。
UNIX五种I/O模型
- 阻塞式I/O
- 非阻塞式I/O
- I/O多路复用
- 信号驱动式I/O
- 异步I/O
阻塞式I/O
- 应用程序被阻塞,直到数据被复制到应用进程缓冲区才返回(如socket_http中的
client.connect()
,CPU停止运行等待接入); - 程序阻塞的过程中,其他程序仍然可以运行(不是整个操作系统阻塞),因此不会影响CPU利用率。
非阻塞式I/O
- 应用进程执行系统调用后,如数据未就绪则内核会返回一个错误码,应用进程可以继续执行(如socket_http中的
client.setblocking(False)
,请求连接后立即返回); - 返回不代表(如网路请求中的三次握手)已完成,需要CPU循环不断询问连接是否建立(
client.send("...")
,可能会抛出异常,需要异常+循环处理); - 由于使用非阻塞式I/O需要不断请求内核态,CPU(需要处理更多系统调用)开销很大;
- 内核态接收网络请求后退出循环,把数据复制到用户态;
- 对于轮询过程中需要执行其他操作的场景,性能比阻塞式I/O好。
I/O多路复用
- 单个进程(线程)具备处理多个I/O事件的能力,避免高并发场景下多进程/多线程创建和切换的开销,有select,poll,epoll三种;
- 单个进程(线程)同时监听多个句柄的状态,状态发生变化时(内核数据到达)可以马上处理,其中句柄状态变化的回调由程序员完成;
- 本质上是同步I/O,读写时间就绪后自己负责进行读写(这个过程是阻塞的);
- 其中select、poll需要轮询所有句柄,随数量增多性能下降;在高并发场景下优先选用epoll,但在并发性不高、连接很活跃(频繁开启关闭)时select比epoll好。
信号驱动式I/O
- 应用进程使用
sigaction
系统调用,发生I/O时内核立即返回,应用程序可以继续执行(等待数据阶段非阻塞); - 当内核中有数据到达时向应用程序发出
SIGIO
信号,应用程序接收到信号在信号处理程序中调用recvfrom
,将数据从内核复制到应用程序中; - 由于不需要轮询系统调用,信号驱动I/O的CPU利用率比非阻塞式I/O更高。
异步I/O
- 应用程序执行aio_read系统调用后立即返回,可以继续执行,不会阻塞;
- 内核在所有操作完成后向应用进程发出信号(句柄状态变化的回调由系统完成);
- 与信号驱动I/O的区别在于:异步I/O的信号是通知应用进程I/O完成,而信号驱动I/O的信号是通知应用进程可以开始I/O。
Select,回调,事件循环
- 回调函数:提供函数供一定条件满足后调用(回调函数中都是非I/O操作,性能很高);
- 事件循环:不断循环列表请求句柄状态,发现状态变化时执行回调函数;
- 高并发:驱动程序运行的loop是单线程运行(不会有内存消耗和切换问题)、非阻塞的,只会对就绪句柄执行回调函数,不会等待I/O(除非所有句柄都在等待)。
import time
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
selector = DefaultSelector() # select函数更高层次的封装,根据环境可以自动选择select、poll或epoll
urls = []
stop = False
class Fetcher:
def connected(self, key):
'''
回调函数
:param key:
:return:
'''
# 执行回调函数时,首先要对句柄取消注册
selector.unregister(key.fd)
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
# 注册句柄,监听读状态,执行回调函数readable
selector.register(self.client.fileno(), EVENT_READ, self.readable)
def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd) # 数据读取完成
data = self.data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
self.client.close()
urls.remove(self.spider_url)
if not urls:
global stop
stop = True
def get_url(self, url):
self.spider_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"
# 建立socket连接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# select需要非阻塞IO
self.client.setblocking(False)
try:
self.client.connect((self.host, 80))
except BlockingIOError as e:
pass
# 注册句柄,当发生写事件时,执行回调函数connected
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
def loop():
# 事件循环,不停请求socket的状态并调用对应的回调函数
# select本身是不支持register模式(selector是对select的封装,提供了register)
# socket状态变化以后的回调由程序员完成
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data # 执行注册时执行的回调函数
call_back(key)
# 异步
fetcher = Fetcher()
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
urls.append(url)
fetcher = Fetcher()
fetcher.get_url(url)
loop()
print(time.time() - start_time)
# 同步(注意self.client.setblocking(True))
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
get_(url)
print(time.time() - start_time)
协程
以上几种I/O模型存在以下问题:
- 回调:代码可读性差,共享状态管理困难,异常处理困难;
- 多线程:线程间同步、锁并发性能差,线程创建消耗内存大、切换开销大;
- 同步:并发度低。
因此可考虑使用协程:
- 采用同步的方式编写异步(事件循环 + I/O多路复用)代码代替回调,使用单线程切换任务(不再需要锁);
- 自主编写调度函数,并发性能远高于线程间切换;
- 调度函数有多个入口:遇到I/O操作把当前函数暂停、切换到另一个函数执行,在适当时候恢复。
- 使用生成器(见“迭代器,生成器”)结合事件循环可实现协程;
- 协程 + 事件循环的效率不比回调 + 事件循环高,其目的在于简便地解决回调复杂的问题。
async与await
为了将语义变得更加明确,Python 3.5后引入了async和await关键词用于定义原生协程;
import types
async def downloader(url): # 使用原生协程
return "ywh"
@types.coroutine
def downloader(url): # 使用生成器实现协程
yield "ywh"
async def download_url(url): # async和await必须成对使用
html = await downloader(url) # await:执行费时操作(生成器不能直接用于await,要加上装饰器或async)
return html
if __name__ == "__main__":
coro = download_url("http://www.imooc.com")
# next(None)
coro.send(None) # 使用原生协程只能使用send(None)
生成器实现协程
获取生成器的状态:
import inspect
def gen_func():
yield 1
return "ywh"
if __name__ == "__main__":
gen = gen_func()
print(inspect.getgeneratorstate(gen))
next(gen)
print(inspect.getgeneratorstate(gen))
使用生成器实现协程:
- 一般的生成器只能作为生产者,实现为协程则可以消费外部传入的数据;
- 使用
value = yield from xxx
的生成器表示返回值给调用方、且调用方通过send方法传值给生成器函数; - 主函数中不能添加耗时的逻辑,如把I/O操作通过
yield from
做异步处理; - 最终实现通过同步的方式编写异步代码:在适当的时候暂停、恢复启动函数。
def gen_func():
value = yield 1
return "ywh"
import inspect
import socket
def get_socket_data(): # 模拟从socket中获取数据,唤醒downloader
yield "ywh" # 如发生异常,则会抛出给downloader
def downloader(url): # 主方法中不能添加耗时操作
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)
try:
client.connect((host, 80)) # 阻塞不会消耗cpu
except BlockingIOError as e:
pass
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
source = yield from get_socket_data() # 暂停,直到socket获取到数据再往下执行
html_data = source.decode("utf8").split("\r\n\r\n")[1]
print(html_data)
def download_html(html):
html = yield from downloader()
if __name__ == "__main__":
# 协程的调度:事件循环 + 协程模式(单线程)
pass