Python实现协程(六)

本节介绍 asyncio 剩余的一些常用操作:事件循环实现无限循环任务,在事件循环中执行普通函数以及协程锁。

一. 无限循环任务

事件循环的 run_until_complete 方法运行事件循环时,当其中的全部任务完成后,会自动停止循环。若想无限运行事件循环,可使用 asyncio 提供的 run_forever 方法:

import asyncio
import time
from datetime import datetime


async def work(loop, t):
    print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[work] start')
    await asyncio.sleep(t)  # 模拟IO操作
    print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[work] finished')
    loop.stop()  # 停止事件循环,stop后仍可重新运行


if __name__ == '__main__':
    loop = asyncio.get_event_loop()  # 创建任务,该任务会自动加入事件循环
    task = asyncio.ensure_future(work(loop, 1))
    print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[main]', task._state)
    loop.run_forever()  # 无限运行事件循环,直至loop.stop停止
    print(datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'), '[main]', task._state)
    loop.close()  # 关闭事件循环,只有loop处于停止状态才会执行

运行结果:使用 loop.run_forever() 启动无限循环时,task 实例会自动加入事件循环。如果注释掉 loop.stop() 方法,则 loop.run_forever() 之后的代码永远不会被执行,因为 loop.run_forever() 是个无限循环。

以上是单任务事件循环,将 loop 作为参数传入协程函数创建协程,在协程内部执行 loop.stop 方法停止事件循环。

下面的例子是多任务事件循环,使用回调函数执行 loop.stop() 停止事件循环:

import time
import asyncio
import functools
from datetime import datetime


def loop_stop(loop, future):  # 最后一个参数必须为future或task
    print(datetime.strftime(datetime.now(), '%H:%M:%S'), '[callback] stop loop by callback.')
    loop.stop()


async def work(t):
    print(datetime.strftime(datetime.now(), '%H:%M:%S'), '[work] coroutine start.')
    await asyncio.sleep(t)
    print(datetime.strftime(datetime.now(), '%H:%M:%S'), '[work] coroutine end.')


def main():
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(work(3), work(1))
    tasks.add_done_callback(functools.partial(loop_stop, loop))
    loop.run_forever()
    loop.close()


if __name__ == '__main__':
    start = time.time()
    main()
    end = time.time()
    print(f'耗时:{end - start}')

运行结果asyncio.gather 创建的搜集器,参数为任意数量的协程,任务搜集器本身也是 task / future 对象。

任务搜集器的 add_done_callback 方法用来添加回调函数,该函数只在事件循环中所有的任务都完成后运行一次。

注意:add_done_callback 的参数为回调函数,当回调函数定义了除 future 参数之外的任何参数后,必须使用偏函数。此处,使用 functools.partial 方法创建偏函数以便将 loop 作为参数加入回调函数。

loop.run_until_complete 方法本身也是调用 loop.run_forever 方法,然后通过回调函数调用 loop.stop 来终止事件循环。

二. 在事件循环中加入普函数

2.1 加入普通函数,并立即排定执行顺序

事件循环的 call_soon 方法可以将普通函数作为任务加入到事件循环,并立即排定任务的执行顺序:

import asyncio


def func(name):  # 普通函数
    print(f'[func] hello, {name}')


async def work(t, name):  # 协程函数
    print(f'[work] {name} start.')
    await asyncio.sleep(t)
    print(f'[work] {name} finished.')


def main():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(work(3, 'A'))
    loop.call_soon(func, 'word')
    loop.create_task(work(2, 'B'))
    loop.run_until_complete(work(3, 'C'))


if __name__ == '__main__':
    main()

运行结果loop.call_soon 将普通函数当作 task 加入到事件循环并排定执行顺序,该方法的第一个参数为普通函数的名字,普通函数的参数写在后面。

loop.run_until_complete(work(3, 'C')) 阻塞启动事件循环,而且又添加了一个任务。

2.2 加入普通函数,并在稍后执行

loop.call_later 方法同 loop.call_soon 一样,可将普通函数作为任务放到事件循环里。不同之处在于,call_later 可设置延迟执行,第一个参数为延迟时间:

import asyncio
 
def func(name):  # 普通函数
    print(f'[func] hello, {name}')
 
async def work(t, name):  # 协程函数
    print(f'[work] {name} start.')
    await asyncio.sleep(t)
    print(f'[work] {name} finished.')
 
def main():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(work(4, 'A'))
    loop.call_later(1, func, 'word1')
    loop.call_soon(func, 'word2')
    loop.create_task(work(2, 'B'))
    loop.call_later(3, func, 'word3')
    loop.run_until_complete(work(2, 'C'))
 
if __name__ == '__main__':
    main()

运行结果:事件循环运行到 work(2, 'C') 完成时终止循环,此时输出 hello, word3 的普通函数尚未执行,协程任务 A 仍处于暂停状态。

2.3 其它常用方法

  • call_soon 立即执行
  • call_later 延迟执行
  • call_at 在某时刻执行
  • loop.time 是事件循环内部的一个即时方法,返回值是时刻,数据类型为 float

将上例中的 call_later 使用 loop.time + call_at 实现:

def main():
    loop = asyncio.get_event_loop()
    start = loop.time()  # 时间循环内部时刻
    asyncio.ensure_future(work(4, 'A'))
    # loop.call_later(1, func, 'word1')
    # 上面注释这行等同于下面这行
    loop.call_at(start+1, func, 'word1')
    loop.call_soon(func, 'word2')
    loop.create_task(work(2, 'B'))
    # loop.call_later(3, func, 'word3')
    loop.call_at(start+3, func, 'word3')
    loop.run_until_complete(work(2, 'C'))

运行结果与 2.2 中的示例一致,不再赘述。这三个 call_xxx 方法的作用都是将函数作为任务排定到事件循环中,返回值都是 asyncio.events.TimerHandle 实例,注意它们不是协程任务,不能作为 loop.run_until_complete 的参数。

三. 协程锁

asyncio.lock 从字面意思来讲,应该被称为异步 IO 锁,之所以叫协程锁,是因为它通常写在子协程中,用来将协程内部的一段代码锁住,直到这段代码运行完毕解锁。

协程锁的固定用法是使用 async with 创建协程上下文环境,把需要加锁的代码写入其中。

注:with 是普通上下文管理器关键字,async with 是异步上下文管理器关键字。能够使用 with 关键字的对象须有 __enter____exit__ 方法,而能够使用 async with 关键字的对象须有
__aenter____aexit__ 方法。

async with 会自动运行 lock__aenter__ 方法,该方法会调用
acquire 方法上锁;在语句块结束时自动运行 __aexit__ 方法,该方法会调用 release 方法解锁。这和 with 一样,都是简化 try ... finally 语句。

import asyncio

l = []
lock = asyncio.Lock()  # 协程锁


async def coro_work(name):
    print(f'coroutine {name} start.')
    async with lock:
        print(f'{name} run with lock start.')
        if 'hi' in l:
            return name
        await asyncio.sleep(2)
        l.append('hi')
        print(f'{name} release lock.')
        return name


async def one():
    name = await coro_work('ONE')
    print(f'{name} finished.')


async def two():
    name = await coro_work('TWO')
    print(f'{name} finished')


def main():
    loop = asyncio.get_event_loop()
    tasks = asyncio.wait([one(), two()])
    loop.run_until_complete(tasks)
    print(l)


if __name__ == '__main__':
    main()

运行结果: 当协程 ONE 运行到 await asyncio.sleep(2) 处时,将让步 CPU 的使用权,协程 TWO 开始执行,但执行到 async with lock 时,会阻塞,因为 ONE 还没有释放协程锁,此刻线程进入阻塞状态,开始等待 ONE 释放协程锁。

锁被释放后,协程 ONE 结束运行,返回值作为 await coro_work('ONE') 表达式的值,赋值给 one 函数中的局部变量 name

至此协程 TWO 开始上锁执行,由于此时 if 条件判断返回 True ,将直接 return ,因此终端不会输出锁的释放提示。

至此关于 yieldyield from(await)@asyncio.coroutine(async)asyncio 的介绍已经完毕!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容