模拟asyncio,学习异步编程

参考文章: 微信文章
原文章写得很精彩,但有些代码还是可以优化下的。而且这文章一直只有上篇,可惜了。


理论知识可以参考原文。接下来按个人见解,从代码角度进行解析:

  • 基础知识
  • 编写原生asyncio的使用方法,制定模拟方向
  • 编写阻塞代码,了解socket
  • 编写非阻塞socket代码,发现问题
  • 编写异步回调代码
  • 编写异步协程代码

基础知识:

  • 计算机资源:常分为CPU资源、内存资源、硬盘资源和网络资源
  • 进程阻塞:正在运行的程序,由于自身某个模块需要使用硬盘或网络I/O资源等,而系统又未及时响应,导致进程处于待机状态,直至等待事件作出回应后才会被唤醒。
  • 进程非阻塞:同理,在获取某些资源时,不会等待结果响应,而是继续处理其他模块。
    我们以socket为例,如下可获取阻塞与非阻塞两种编程
import socket
sock = socket.socket()

socket.setblocking(True)  # 默认就是阻塞。即套接字 建立连接/发送请求/接受请求 的时候,是阻塞的。
socket.setblocking(False)  # 设置为非阻塞,即上述请求过程不会阻塞,而是继续处理其他模块。

编写原生asyncio的使用方法,制定模拟方向

让我们立即来使用原生asyncio编写异步程序:

在此代码中,我们需要注意几个关键点,使我们想要实现的。

  • loop = asyncio.get_event_loop() 开启事件循环,异步"任务"将在此循环执行
  • asyncio.create_task() 将一个协程包装成一个"任务"排入期程异步执行
  • asyncio.gather() 同步执行"任务"
    可以参考python-asyncio文档
import asyncio
import aiohttp
import time

loop = asyncio.get_event_loop()

async def fetch():
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get('http://www.baidu.com') as response:
            print(await response.read())

async def multi_fetch():
    await asyncio.gather(*[asyncio.create_task(fetch()) for _ in range(10)])

if __name__ == '__main__':
    start = time.time()
    loop.run_until_complete(fetch())  # 执行一次
    # loop.run_until_complete(multi_fetch())  # 执行十次
    print(time.time() - start)

可以发现,执行十次的时间大概和执行一次的时间差不多。那么说明异步执行是成功的。
接下来我们就来一步一步的实现上述几个关键点,实现手写自己的异步程序


编写阻塞代码,了解socket

首先得实现一个阻塞程序,以socket为例。此例子比较简单,大致看一下即可。

import socket
import time

def blocking_socket(response=b''):
    sock = socket.socket()  # 默认为阻塞连接
    sock.connect(('www.baidu.com', 80))  # 建立百度TCP连接
    sock.send(b'GET / HTTP/1.0\r\n\r\n')  # 发送HTTP协议
    chunk = sock.recv(1024)  # 接收数据
    while chunk:
        response += chunk
        chunk = sock.recv(1024)
    return response

def mutil_blocking_socket():
    return [blocking_socket() for _ in range(10)].__len__()

if __name__ == '__main__':
    start = time.time()
    blocking_socket()  # 执行一次
    # mutil_blocking_socket()  # 执行十次
    print(time.time() - start)

用时的话,我们可以明显看出blocking_socket() 用时大致0.07s
而multi_blocking_socket() 用时大致0.7s,刚好是10倍左右。


编写非阻塞socket代码,发现问题

接着我们来实现非阻塞程序,仍然使用socket来实现
代码与刚刚有部分差异,主要就是多了一些while + try + except

import socket
import time

def no_blocking_socket(response=b''):
    sock = socket.socket()
    sock.setblocking(False)  # 设置非阻塞连接
    try:
        sock.connect(('www.baidu.com', 80))
    except BlockingIOError:  # 非阻塞式建立连接在此处会报错,捕获忽略即可
        pass

    while True:
        try:
            sock.send(b'GET / HTTP/1.0\r\n\r\n')  # 发送HTTP协议
            break
        except OSError:  # 此处会报错。因为套接字与百度服务器的TCP连接还没有建立,
                         # 但是套接字却是非阻塞的,故在未建立连接的情况下发送协议会报错,此处捕获此异常
            continue

    while True:
        try:
            chunk = sock.recv(1024)
            while chunk:
                response += chunk
                chunk = sock.recv(1024)
            return response
        except OSError:  # 同理,对方服务器未接收HTTP协议,不会返回数据,即使返回,也有时延。故此时接收数据会报异常,此处捕获此异常
            pass

def multi_no_blocking_socket():
    return [no_blocking_socket() for _ in range(10)].__len__()

if __name__ == '__main__':
    start = time.time()
    no_blocking_socket()  # 执行一次
    # multi_no_blocking_socket()  # 执行十次
    print(time.time() - start)

来比较时间,我们会惊讶的发现,no_blocking_socket() 执行一次大致0.07s
而multi_no_blocking_socket() 执行十次,大致是0.7s....
没错,花费时间和阻塞编程是一样的。我们的非阻塞编程并没有达到实际的效果。

我们来分析一下,为什么会造成这种现象。我们发现这套非阻塞代码与之前的阻塞代码相比,

多了三处 try: ... except: ...
多了两处 while True: ...

代码确实是非阻塞编程,也就是程序不会在网络IO模块处阻塞,但是程序也没有把空闲下来的时间花在"正确"的地方

在非阻塞的情况下,CUP把空闲下来的时间不停的去while + try + except试错,也就是程序会不停的尝试发送协议,直到发送成功。不停的尝试接收数据,直到接收成功。

这样就导致我们的非阻塞编程,实际上和阻塞编程是一样的,唯一要说不同。就是非阻塞程序中的CPU可能会 "忙一点"。


编写异步回调代码

那么我们可以注意到了问题的关键,就是我们不知道什么时候程序是 "准备就绪了",也就是什么时候可以发送协议(套接字可写),什么时候可以接收数据(套接字可读)

其实操作系统已经帮我们实现了,它将事件的 I/O 都封装成了事件,包括可读事件/可写事件
那么我们就可以立即想到,最好的方法就是 当我们的套接字状态发生改变,就能够立即执行接下来的读写步骤。
所以此处,我们就需要 "回调"!

这里说明下python中selectors模块,用于注册事件的回调

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

sock = socket.socket()  # 获取套接字

selector = DefaultSelector()  # 获取selector对象

selector.register(sock.fileno(), EVENT_WRITE, on_send)  # fileno()获取当前socket套接字的文件描述符,并绑定事件EVENT_WRITE,回调函数为on_send
selector.unregister(sock.fileno())  # 注销事件绑定
selector.register(sock.fileno(), EVENT_READ, on_recv)  # 同理,绑定事件EVENT_READ,回调函数为on_recv

while True:
  events = selector.select()  # 此处的events是操作系统返回的事件,也就是我们绑定的事件被触发了,
                               # 此处是阻塞获取的。也就是用一个事件循环的阻塞,来代替我们的while True: ...
  for sock, mask in events:
    sock.data()  #sock.data为绑定的回调函数,也就是上面的on_send和on_recv

故我们就不需要自己手动while True + try...except来监控事件状态的改变,转而将这件工作交给selector的事件循环。此处我们将其命名为loop

接下来就是回调编程了

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

class Crawler:
    def __init__(self, flag=10):
        self.flag = flag
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('www.baidu.com', 80))
        except BlockingIOError:
            pass
    # fileno()获取当前socket套接字的文件描述符,并绑定事件回调 
    selector.register(self.sock.fileno(), EVENT_WRITE, self.on_send) 
    def on_send(self):
        selector.unregister(self.sock.fileno())
        self.sock.send(b'GET / HTTP/1.0\r\n\r\n')
        selector.register(self.sock.fileno(), EVENT_READ, self.on_recv)

    def on_recv(self):
        chunk = self.sock.recv(1024)
        if chunk:
            self.response += chunk
        else:
            global stop_loop
            stop_loop -= self.flag
            selector.unregister(self.sock.fileno())

def loop():  # 事件循环,由操作系统返回那个事件发生了,对应执行那些事件的回调。
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

if __name__ == '__main__':
    start = time.time()
    Crawler(10).fetch()  # 执行一次
    # [Crawler(1).fetch() for _ in range(10)]  # 执行十次
    loop()
    print(time.time() - start)

我们来捋一下代码执行的流程:

1、首先实例化一个Crawler对象,然后执行此实例的fetch方法
2、fetch方法发起了与百度服务器的连接,然后注册了回调函数。
3、此时会走到loop()函数来,执行事件循环。直到我们与对方服务器的连接建立成功,则此时OS会返回事件,我们则执行对应的回调事件 sock.data() ,对应 self.on_send()
4、执行on_send,首先注销上一个事件,然后发送协议,再接着注册可读事件。继续进入等待
5、此时继续进入loop事件循环,直到触发注册事件,执行回调函数on_recv,由于一次只接收1024,故可能会接收多次,也就是会触发多次on_recv事件的回调,直接接收完成。
6、接收完成,我们令全局变量stop_loop - flag,来停止loop事件循环。程序结束。

然后我们来看下时间,我们会发现执行一次的时间和执行十次的时间基本是差不多的。说明我们编写的程序是没有问题的。

回调式异步编程,成功!


编写异步协程代码

至此,我们已经实现了回调式异步编程,但是我们思考下第一个例子,是基于协程的异步编程,故我们现在来调整代码,编写协程。

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

class Future:
    # 用于存放未来可能出现的数据,当出现时执行一次回调函数
    # 此中的result仅作为一个中转,实际还是通过回调返回给协程

    def __init__(self):
        self.result = None
        self.callback = None

    def set_callback(self, func):
        self.callback = func

    def set_result(self, result):
        self.result = result
        self.callback(self) if self.callback else None

class Task:
    # 用于启动协程,该类实例初始化时传入为协程对象,执行self.process方法
    # 调用协程的send方法,启动协程,并最后绑定回调函数

    def __init__(self, co_routine):
        self.co_routine = co_routine
        future = Future()
        self.process(future)

    def process(self, future):
        try:
            next_future = self.co_routine.send(future.result)
        except StopIteration:
            return
        next_future.set_callback(self.process)

class Crawler:
    def __init__(self, flag=10):
        self.flag = flag
        self.sock = None
        self.response = b''

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('www.baidu.com', 80))
        except BlockingIOError:
            pass

        future = Future()

        def _on_send():
            future.set_result(None)

        def _on_recv():
            future.set_result(self.sock.recv(1024))

        selector.register(self.sock.fileno(), EVENT_WRITE, _on_send)
        yield future
        selector.unregister(self.sock.fileno())
        self.sock.send(b'GET / HTTP/1.0\r\n\r\n')
        selector.register(self.sock.fileno(), EVENT_READ, _on_recv)
        while True:
            chunk = yield future  # 在此处轮询EVENT_READ事件,直至所有数据加载完毕
            if chunk:
                self.response += chunk
            else:
                global stop_loop
                stop_loop -= self.flag
                return self.response

def loop():
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

if __name__ == '__main__':
    start = time.time()
    Task(Crawler(10).fetch())  # 传入协程fetch,使用Task实例化调用协程的send方法来启动协程
    # [Task(Crawler(1).fetch()) for _ in range(10)]  # 同理,启动十个协程任务
    loop()
    print(time.time() - start)

我们继续来整理下代码执行的流程:

1、首先实例化一些Crawler对象,调用调用此实例的fetch函数得到一个协程。此时协程是没有执行的。(协程需要send等触发才会执行)

2、将此协程装于Task用来创建任务实例,在任务中会主动触发协程的send函数来启动协程

3、此时协程已触发,注册事件 selector.register(self.sock.fileno(), EVENT_WRITE, _on_send),然后返回future,此时协程到此暂停

4、返回的future会添加任务的回调函数,也就是self.precess()。而loop也开始了事件轮询,当套接字的文件描述符状态变为可写状态时,触发回调方法_on_send

5、_on_send方法执行Future中的set_result方法,此时在此方法中会调用一次future注册的回调函数,继续触发任务Task中协程的send方法,回到协程上次暂停的状态

6、回来后,首先注销事件EVENT_WRITE。发送HTTP协议请求。再注册事件selector.register(self.sock.fileno(), EVENT_READ, _on_recv)

7、loop事件轮询,当套接字的文件描述符变为可读状态时,触发回调方法_on_recv

8、_on_recv方法执行Future中的set_result方法,此时在方法中会初始化result为sock.recv(1024)的值,并执行注册的回调函数,将此结果继续传递至协程上回暂停的地方

9、由于一直没有注销事件EVENT_READ,故会一直驱动事件轮询直至结束

10、Task、Future、Crawler、loop这四个就这么神奇的串联在一起了,不可思议的说。

不过这样写貌似不太好看,虽然感觉也可以,但是很多模块其实都是可以拆离开的

下面就是一个拆分版本,就不细细的分析流程啦

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

def fetch(sock):
    sock.setblocking(False)
    try:
        sock.connect(('www.baidu.com', 80))
    except BlockingIOError:
        pass

    future = Future()

    def _on_send():
        future.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, _on_send)
    yield from future
    selector.unregister(sock.fileno())
    return future

def read(sock, future, flag, response=b''):
    def _on_recv():
        future.set_result(sock.recv(1024))

    selector.register(sock.fileno(), EVENT_READ, _on_recv)
    chunk = yield from future
    while chunk:
        response += chunk
        chunk = yield from future
    selector.unregister(sock.fileno())
    global stop_loop
    stop_loop -= flag
    return response


def loop():
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

class Future:
    def __init__(self):
        self.result = None
        self.callback = None

    def set_callback(self, func):
        self.callback = func

    def set_result(self, result):
        self.result = result
        self.callback(self) if self.callback else None

    def __iter__(self):
        yield self
        return self.result

class Task:
    def __init__(self, co_routine):
        self.co_routine = co_routine
        future = Future()
        self.process(future)

    def process(self, future):
        try:
            next_future = self.co_routine.send(future.result)
        except StopIteration:
            return
        next_future.set_callback(self.process)

class Crawler:
    def __init__(self, flag):
        self.flag = flag

    def fetch(self):
        sock = socket.socket()
        future = yield from fetch(sock)
        sock.send(b'GET / HTTP/1.0\r\n\r\n')
        response = yield from read(sock, future, self.flag)
        print(response)

if __name__ == '__main__':
    start = time.time()
    Task(Crawler(10).fetch())
    # [Task(Crawler(1).fetch()) for _ in range(10)]
    loop()
    print(time.time() - start)

代码放到github上了:
https://github.com/CzaOrz/ioco/tree/master/open_source_project/异步教程学习/异步协程

Life is short. You need Python

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容