Python 经验 - I/O 多路复用与协程

基本概念

  • 并发:一个时间段内,有多个程序在同一个CPU上运行(但任意时刻只有一个在CPU上运行);
  • 并行:任意时间点上,有多个程序同时运行在多个CPU上;
  • 同步:代码调用I/O操作时,必须等待I/O操作完成才返回;
  • 异步:代码调用I/O操作时,不必等I/O完成就返回;
  • 阻塞:调用函数时当前线程被挂起;
  • 非阻塞:调用函数时当前线程不会被挂起(立即返回)。

UNIX五种I/O模型

五种I/O模型
  • 阻塞式I/O
  • 非阻塞式I/O
  • I/O多路复用
  • 信号驱动式I/O
  • 异步I/O

阻塞式I/O

阻塞式I/O
  • 应用程序被阻塞,直到数据被复制到应用进程缓冲区才返回(如socket_http中的client.connect(),CPU停止运行等待接入);
  • 程序阻塞的过程中,其他程序仍然可以运行(不是整个操作系统阻塞),因此不会影响CPU利用率。

非阻塞式I/O

非阻塞式I/O
  • 应用进程执行系统调用后,如数据未就绪则内核会返回一个错误码,应用进程可以继续执行(如socket_http中的client.setblocking(False),请求连接后立即返回);
  • 返回不代表(如网路请求中的三次握手)已完成,需要CPU循环不断询问连接是否建立(client.send("..."),可能会抛出异常,需要异常+循环处理);
  • 由于使用非阻塞式I/O需要不断请求内核态,CPU(需要处理更多系统调用)开销很大;
  • 内核态接收网络请求后退出循环,把数据复制到用户态;
  • 对于轮询过程中需要执行其他操作的场景,性能比阻塞式I/O好。

I/O多路复用

I/O多路复用
  • 单个进程(线程)具备处理多个I/O事件的能力,避免高并发场景下多进程/多线程创建和切换的开销,有select,poll,epoll三种;
  • 单个进程(线程)同时监听多个句柄的状态,状态发生变化时(内核数据到达)可以马上处理,其中句柄状态变化的回调由程序员完成;
  • 本质上是同步I/O,读写时间就绪后自己负责进行读写(这个过程是阻塞的);
  • 其中select、poll需要轮询所有句柄,随数量增多性能下降;在高并发场景下优先选用epoll,但在并发性不高、连接很活跃(频繁开启关闭)时select比epoll好。

信号驱动式I/O

信号驱动式I/O
  • 应用进程使用sigaction系统调用,发生I/O时内核立即返回,应用程序可以继续执行(等待数据阶段非阻塞);
  • 当内核中有数据到达时向应用程序发出SIGIO信号,应用程序接收到信号在信号处理程序中调用recvfrom,将数据从内核复制到应用程序中;
  • 由于不需要轮询系统调用,信号驱动I/O的CPU利用率比非阻塞式I/O更高。

异步I/O

异步I/O
  • 应用程序执行aio_read系统调用后立即返回,可以继续执行,不会阻塞;
  • 内核在所有操作完成后向应用进程发出信号(句柄状态变化的回调由系统完成);
  • 与信号驱动I/O的区别在于:异步I/O的信号是通知应用进程I/O完成,而信号驱动I/O的信号是通知应用进程可以开始I/O。

Select,回调,事件循环

  • 回调函数:提供函数供一定条件满足后调用(回调函数中都是非I/O操作,性能很高);
  • 事件循环:不断循环列表请求句柄状态,发现状态变化时执行回调函数;
  • 高并发:驱动程序运行的loop是单线程运行(不会有内存消耗和切换问题)、非阻塞的,只会对就绪句柄执行回调函数,不会等待I/O(除非所有句柄都在等待)。
import time
import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()        # select函数更高层次的封装,根据环境可以自动选择select、poll或epoll
urls = []
stop = False

class Fetcher:
    def connected(self, key):
        '''
        回调函数
        :param key:
        :return:
        '''
        # 执行回调函数时,首先要对句柄取消注册
        selector.unregister(key.fd)     
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
        
        # 注册句柄,监听读状态,执行回调函数readable
        selector.register(self.client.fileno(), EVENT_READ, self.readable)      

    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)     # 数据读取完成
            data = self.data.decode("utf8")
            html_data = data.split("\r\n\r\n")[1]
            print(html_data)
            self.client.close()
            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True

    def get_url(self, url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # select需要非阻塞IO
        self.client.setblocking(False)    
        try:
            self.client.connect((self.host, 80))        
        except BlockingIOError as e:
            pass

        # 注册句柄,当发生写事件时,执行回调函数connected
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)


def loop():
    # 事件循环,不停请求socket的状态并调用对应的回调函数
    # select本身是不支持register模式(selector是对select的封装,提供了register)
    # socket状态变化以后的回调由程序员完成
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data        # 执行注册时执行的回调函数
            call_back(key)    

# 异步
fetcher = Fetcher()
start_time = time.time()
for url in range(20):
    url = "http://shop.projectsedu.com/goods/{}/".format(url)
    urls.append(url)
    fetcher = Fetcher()
    fetcher.get_url(url)
loop()
print(time.time() - start_time)

# 同步(注意self.client.setblocking(True))
start_time = time.time()
for url in range(20):
    url = "http://shop.projectsedu.com/goods/{}/".format(url)
    get_(url)
print(time.time() - start_time)

协程

以上几种I/O模型存在以下问题:

  • 回调:代码可读性差,共享状态管理困难,异常处理困难;
  • 多线程:线程间同步、锁并发性能差,线程创建消耗内存大、切换开销大;
  • 同步:并发度低。

因此可考虑使用协程:

  • 采用同步的方式编写异步(事件循环 + I/O多路复用)代码代替回调,使用单线程切换任务(不再需要锁);
  • 自主编写调度函数,并发性能远高于线程间切换;
  • 调度函数有多个入口:遇到I/O操作把当前函数暂停、切换到另一个函数执行,在适当时候恢复。
  • 使用生成器(见“迭代器,生成器”)结合事件循环可实现协程;
  • 协程 + 事件循环的效率不比回调 + 事件循环高,其目的在于简便地解决回调复杂的问题。

async与await

为了将语义变得更加明确,Python 3.5后引入了async和await关键词用于定义原生协程;

import types

async def downloader(url):              # 使用原生协程
    return "ywh"

@types.coroutine
def downloader(url):                    # 使用生成器实现协程
    yield "ywh"     

async def download_url(url):            # async和await必须成对使用
    html = await downloader(url)        # await:执行费时操作(生成器不能直接用于await,要加上装饰器或async)
    return html


if __name__ == "__main__":
    coro = download_url("http://www.imooc.com")
    # next(None)
    coro.send(None)     # 使用原生协程只能使用send(None)

生成器实现协程

获取生成器的状态:

import inspect
def gen_func():
    yield 1
    return "ywh"

if __name__ == "__main__":
    gen = gen_func()
    print(inspect.getgeneratorstate(gen))
    next(gen)
    print(inspect.getgeneratorstate(gen))

使用生成器实现协程:

  • 一般的生成器只能作为生产者,实现为协程则可以消费外部传入的数据;
  • 使用value = yield from xxx的生成器表示返回值给调用方、且调用方通过send方法传值给生成器函数;
  • 主函数中不能添加耗时的逻辑,如把I/O操作通过yield from做异步处理;
  • 最终实现通过同步的方式编写异步代码:在适当的时候暂停、恢复启动函数。
def gen_func():
    value = yield 1
    return "ywh"
import inspect
import socket

def get_socket_data():      # 模拟从socket中获取数据,唤醒downloader
    yield "ywh"             # 如发生异常,则会抛出给downloader

def downloader(url):        # 主方法中不能添加耗时操作
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.setblocking(False)

    try:
        client.connect((host, 80))      # 阻塞不会消耗cpu
    except BlockingIOError as e:
        pass

    selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
    source = yield from get_socket_data()       # 暂停,直到socket获取到数据再往下执行
    html_data = source.decode("utf8").split("\r\n\r\n")[1]
    print(html_data)

def download_html(html):
    html = yield from downloader()

if __name__ == "__main__":
    # 协程的调度:事件循环 + 协程模式(单线程)
    pass
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容

  • 一. 操作系统概念 操作系统位于底层硬件与应用软件之间的一层.工作方式: 向下管理硬件,向上提供接口.操作系统进行...
    月亮是我踢弯得阅读 5,965评论 3 28
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,535评论 0 5
  • 轻量级线程:协程 在常用的并发模型中,多进程、多线程、分布式是最普遍的,不过近些年来逐渐有一些语言以first-c...
    Tenderness4阅读 6,360评论 2 10
  • 前言 很多朋友对异步编程都处于“听说很强大”的认知状态。鲜有在生产项目中使用它。而使用它的同学,则大多数都停留在知...
    星星在线阅读 2,856评论 2 39
  • 转一篇驹神的关于异步编程和Asyncio的文章。这是上篇,共三篇。原文地址:http://aju.space/20...
    SeanCheney阅读 9,793评论 2 99