python异步IO和协程

@[toc]

IO模型

同步IO
  • 在IO过程中当前线程被挂起,当前线程其他需要CPU计算的代码无法执行
    • 一般的io是同步的
    • 多线程可解决该问题
  • 计算和IO任务可以由不同的线程负责
  • 但会带来线程创建、切换的成本,而且线程数不能无上限地增加
异步IO

当前线程只发出IO指令,但不等待其执行结束,而是先执行其他代码,避免线程因IO操作而阻塞

事件驱动模型
  • 一种编程范式,程序执行流由外部事件决定
  • 包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理
  • 可能的实现机制
    • 每收到一个请求,创建一个新的进程来处理该请求;
    • 每收到一个请求,创建一个新的线程来处理该请求;
    • 每收到一个请求,放入一个事件列表让主进程通过非阻塞IO方式来处理请求
  • 一般场景
    当程序中有许多任务,任务之间高度独立(不需要互相通信或等待彼此等),并且在等待事件到来时,某些任务会阻塞
事件列表模型
  • 主线程不断重复“读取请求-处理请求”这一过程– 进行IO操作时相关代码只发出IO请求,不等待IO结果,然后直接结束本轮事件处理,进入下一轮事件处理
  • 当IO操作完成后,将收到IO完成消息,在处理该消息时再获取IO操作结果
  • 在发出IO请求到收到IO完成消息期间,主线程并不阻塞,而是在循环中继续处理其他消息
  • 对于大多数<font color='red'>IO密集型</font>的应用程序,使用<font color='red'>异步IO</font>将大大提升系统的多任务处理能力

协程

  • Coroutine,peusdo-thread,micro-thread
  • “微线程”
  • 在一个线程中会有很多函数,一般将这些函数称为子程序,在子程序执行过程中可以中断去执行别的子程序,而别的子程序也可以中断回来继续执行之前的子程序,这个过程就称为协程
    • 执行函数A时,可以随时中断,进而执行函数B,然后中断B并继续执行A,且上述切换是自主可控的
    • 但上述过程并非函数调用(没有调用语句)
  • 表象上类似多线程,但协程本质上只有一个线程在运行
Event Loop
  • The event loop is running in a thread
  • It gets tasks from the queue
  • Each task calls the next step of a coroutine
  • If coroutine calls another coroutine (await
    <coroutine_name>), the current coroutine gets suspended and context switch occurs. Context of the current coroutine (variables, state) is saved and context of a called coroutine is loaded
  • If coroutine comes across a blocking code (I/O, sleep), the current coroutine gets suspended and control is passed back to the event loop
  • Event loop gets next tasks from the queue 2, …n
  • Then the event loop goes back to task 1 from where it left off


协程的优点
  • 无需线程上下文切换的开销,协程避免了无意义的调度,由此可以提高性能
  • 无需原子操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
    • 线程由操作系统调度,而协程则是在程序级别由程
    序员自己调度
  • 高并发+高扩展性+低成本
    • 一个CPU可以支持上万协程
    • 在高并发场景下的差异会更突出
协程的缺点
  • 程序员必须自己承担调度的责任
  • 协程仅能提高IO密集型程序的效率,但对于CPU密集型程序无能为力
  • Python2和Python3中实现有一定差别
    • 所用模块有区别
    • 相关生态还在不断成熟
  • 在CPU密集型程序中要充分发挥CPU利用率需要结合多进程和协程
协程的实现
  • 生成器的send()函数
    • 与next()作用类似,但可以发送值给对应的yield表达式
    • 支持外部程序与生成器的交互
  • next(g)就相当于g.send(None)
  • 注意第一次调用next()或send(None)相当于启动生成器,不能使用send()发送一个非None的值
    • 利用装饰器来解决该问题
    • 在装饰器中先调用一次next
def gtest():
    print('step-1')
    x=yield 1
    print(x)
    print('step-2')
    y=yield 2
    print(y)
    print('step-3')
    x=yield 3

g=gtest()
#print(next(g))
#print(next(g))
#print(next(g))

print(g.send(None))
print(g.send('x=test'))
print(g.send('y=test'))

第一次启动生成器,必须send(None)
之后按序输出

step-1
1
x=test
step-2
2
y=test
step-3
3
import functools

def next_deco(func):
    @functools.wraps(func)
    def wrapper(*args,**kwargs):
        resulted_g=func(*args,**kwargs)
        next(resulted_g)  #在装饰器中先调用一次next
        return resulted_g
    return wrapper

@next_deco
def food_factory():
    food_list = []
    while True:
        food = yield food_list
        food_list.append(food)
        print("We have ",food_list)

fg=food_factory()
#fg.send(None)
fg.send('apple')
fg.send('banana')
fg.send('pear')
fg.send('orange')

yield food_list,所以会输出food_list的值,同时send的消息会返回到food中,并再次添加给food_list

We have  ['apple']
We have  ['apple', 'banana']
We have  ['apple', 'banana', 'pear']
We have  ['apple', 'banana', 'pear', 'orange']
通过gevent实现协程
  • 基于greenlet
  • spawn构建新协程
  • monkey.pach_all将第三方库标记为IO非阻塞
  • 通过协程池控制协程数目
import gevent

def foo():
    print('running in foo')
    gevent.sleep(2)#模拟io
    print('com back from bar in to foo')
    return 'foo'

def bar():
    print('running in bar')
    gevent.sleep(1)#模拟io
    print('com back from foo in to bar')
    return 'bar'

def func():
    print('in func of no io')
    return 'func'

def fund():
    print('in fund of no io')
    return 'fund'

jobs=[gevent.spawn(foo),gevent.spawn(bar),gevent.spawn(func),gevent.spawn(fund)]
gevent.joinall(jobs)
for job in jobs:
    print(job.value) #能够保证返回的顺序

首先按foo,bar,func,fund的顺序执行
在foo中遇到两秒阻塞,迅速执行bar,遇到一秒阻塞,迅速执行func和fund。
结束之后bar的一秒阻塞首先结束,执行之后语句,最后执行foo的剩余语句。
最后的返回结果gevent可以保证返回顺序。

running in foo
running in bar
in func of no io
in fund of no io
com back from foo in to bar
com back from bar in to foo
foo
bar
func
fund
import gevent
from gevent import socket   #asyncio

urls=['www.apple.com.cn','www.buaa.edu.cn','www.google.com','www.baidu.com']
jobs=[gevent.spawn(socket.gethostbyname,url) for url in urls]
gevent.joinall(jobs,timeout=10)
for url,ip in zip(urls,[job.value for job in jobs]):
    print('{}\t{}'.format(url,ip))

可以顺序输出结果,获取网址的ip

www.apple.com.cn        210.192.117.229
www.buaa.edu.cn 10.212.30.215 
www.google.com  31.13.72.1    
www.baidu.com   220.181.38.149
通过asyncio实现协程
  • python3.4引入 – 用asyncio提供的@asyncio.coroutine将任务标记为coroutine类型,然后在coroutine内部用yield from调用另一个coroutine实现异步操作
  • Python3.5开始引入了asyncawait进一步
    简化语法
    • 把@asyncio.coroutine替换为async
    • 把yield from替换为await
  • Python3.7进一步变化…
import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main_1():
    print(f"started at {time.strftime('%X')}")

    await say_after(2, 'hello')
    await say_after(1, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main_1())

async def main_2():
    task1 = asyncio.create_task(
        say_after(2, 'hello'))

    task2 = asyncio.create_task(
        say_after(1, 'world'))

    print(f"started at {time.strftime('%X')}")

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main_2())

函数1保证输出顺序,函数2不保证输出顺序。

started at 14:41:00
hello
world
finished at 14:41:03
started at 14:41:03
world
hello
finished at 14:41:05
import asyncio
import random

async def get_page(url,i):
    #print("start visit {}".format(url))
    await asyncio.sleep(random.randint(1,10))#nio
    #print("get the html page")
    return i

def print_status(future):#指定回调函数,运行结束后马上处理
    print("%s" % future.result(),end=' ')

if __name__=='__main__':
    loop=asyncio.get_event_loop()
    tasks=[]
    for i in range(100):
        tasks.append(loop.create_task(get_page('www.baidu.com/',i)))
    for task in tasks:
        task.add_done_callback(print_status)#注意与执行顺序的不同,等所有任务执行结束后再获取结果
    loop.run_until_complete(asyncio.wait(tasks))
    
    print()
    
    for task in tasks:
        print(task.result(),end=' ')

    print()

指定运行结束之后马上处理的回调函数,输出按照实际运行顺序输出。
未指定的按照顺序输出loop

8 25 27 57 52 98 99 17 43 89 84 36 59 14 23 46 41 71 13 12 97 44 87 21 39 83 76 78 7 35 33 62 54 5 91 90 42 82 1 68 29 95 28 
50 93 10 40 80 3 69 32 64 60 56 4 45 85 18 75 15 20 88 81 38 74 37 34 65 86 48 51 24 72 96 19 63 31 30 16 6 49 61 26 22 9 58 
79 92 70 55 73 47 67 94 11 66 2 77 0 53 

0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
通过aiofiles实现文件的异步读写
pip install aiofiles
async with aiofiles.open(path,mode='r') as f: contents = await f.read()
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容