Python中的异步asyncio

本文主要参考这个

对于以下代码,如果在jupyter notebook/lab中执行,需要将

asyncio.run(main())

替换为:

await main()

因为,jupyter notebook本身就包含了异步的event loop,不能再加一个了,否则会报错:

RuntimeError: asyncio.run() cannot be called from a running event loop

import asyncio
import time

async def count():
    print("one")
    await asyncio.sleep(1)
    print("two")

async def main():
    await asyncio.gather(count(),count(),count())
if __name__ == "__main__":
    s = time.perf_counter()
#     asyncio.run(main())
    await main()
    elapsed = time.perf_counter() - s
    print(f'excuted in {elapsed:0.2f} seconds')
one
one
one
two
two
two
excuted in 1.00 seconds

异步的规则

  • async def 定义了一个协程(coroutine)或者异步生成器。async with async for也是合法的

  • await关键词将控制权转给event loop。例如下面的例子里,遇到await f() 则会在等待f(x)的结果的同时,暂停g(x)的执行,然后执行别的程序。

async def g():
    # Pause here and come back to g() when f() is ready
    r = await f()
    return r
  • 通过async def定义的函数是一个协程,可以使用await, return, yield 也可以都不使用。

    • 当使用了await 和/或 return时,则创建了一个协程函数,要调用一个协程函数,需要使用await
    • 也可以使用yield创建一个异步生成器(最近的python版本中才可以),然后可以使用async for来迭代这个生成器。这个用法比较少见
    • 不能使用yield from 会报SyntaxError
  • 在协程之外使用await会报错

import random
import asyncio

# ANSI colors
c = (
    "\033[0m",   # End of color
    "\033[36m",  # Cyan
    "\033[91m",  # Red
    "\033[35m",  # Magenta
)

async def makerandom(idx: int, threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}).")
    i = random.randint(0, 10)
    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
        await asyncio.sleep(idx+1)
        i = random.randint(0, 10)
    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    return i

async def main():
    res  = await asyncio.gather(*(makerandom(i,10 - i - 1) for i in range(3)))
    return res

random.seed(444)
r1,r2,r3 = await main()
print(f"r1: {r1}, r2: {r2}, r3: {r3}")
�[36mInitiated makerandom(0).
�[36mmakerandom(0) == 4 too low; retrying.
�[91mInitiated makerandom(1).
�[91mmakerandom(1) == 4 too low; retrying.
�[35mInitiated makerandom(2).
�[35mmakerandom(2) == 0 too low; retrying.
�[36mmakerandom(0) == 4 too low; retrying.
�[91mmakerandom(1) == 7 too low; retrying.
�[36mmakerandom(0) == 4 too low; retrying.
�[35mmakerandom(2) == 4 too low; retrying.
�[36mmakerandom(0) == 8 too low; retrying.
�[91m---> Finished: makerandom(1) == 10�[0m
�[36mmakerandom(0) == 7 too low; retrying.
�[36mmakerandom(0) == 8 too low; retrying.
�[35mmakerandom(2) == 4 too low; retrying.
�[36mmakerandom(0) == 7 too low; retrying.
�[36mmakerandom(0) == 1 too low; retrying.
�[36mmakerandom(0) == 6 too low; retrying.
�[35m---> Finished: makerandom(2) == 9�[0m
�[36mmakerandom(0) == 3 too low; retrying.
�[36mmakerandom(0) == 9 too low; retrying.
�[36mmakerandom(0) == 7 too low; retrying.
�[36m---> Finished: makerandom(0) == 10�[0m
r1: 10, r2: 10, r3: 9

设计模式

链式协程

协程的一个特点是可以链接起来,因为协程是awaitable的,另一个协程可以await它。这就使得程序能够分解为小的、可循环的协程。看例子:在这个例子中,一个任务是由一系列的协程完成的,每个协程都对结果有所贡献。

import asyncio
import random
import time

async def part1(n: int) -> str:
    i = random.randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")
    return result

async def part2(n: int, arg: str) -> str:
    i = random.randint(0, 10)
    print(f"part2{n, arg} sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-2 derived from {arg}"
    print(f"Returning part2{n, arg} == {result}.")
    return result

async def chain(n: int) -> None:
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")

async def main(*args):
    await asyncio.gather(*(chain(n) for n in args))

if __name__ == "__main__":
    random.seed(444)
    args = [1, 2, 3]
    start = time.perf_counter()
    await (main(*args))
    end = time.perf_counter() - start
    print(f"Program finished in {end:0.2f} seconds.")
part1(1) sleeping for 4 seconds.
part1(2) sleeping for 4 seconds.
part1(3) sleeping for 0 seconds.
Returning part1(3) == result3-1.
part2(3, 'result3-1') sleeping for 4 seconds.
Returning part1(1) == result1-1.
part2(1, 'result1-1') sleeping for 7 seconds.
Returning part1(2) == result2-1.
part2(2, 'result2-1') sleeping for 4 seconds.
Returning part2(3, 'result3-1') == result3-2 derived from result3-1.
-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).
Returning part2(2, 'result2-1') == result2-2 derived from result2-1.
-->Chained result2 => result2-2 derived from result2-1 (took 8.00 seconds).
Returning part2(1, 'result1-1') == result1-2 derived from result1-1.
-->Chained result1 => result1-2 derived from result1-1 (took 11.00 seconds).
Program finished in 11.00 seconds.

使用队列

asyncio提供了queue类,与queue模块的相关类很像。假设有很多生产者都向队列中生产物品,他们相互不关联。每个生产者随机地向队列里生产的物品,且不会提前通知。一群消费者从队列里贪婪地,不等待信号就从队列里取东西。
在这个问题里,消费者和生产者没有关联。消费者不知道有多少生产者,也不知道队列里有多少东西。
每个物品的生产和消耗都需要花费时间。这种场景下就需要队列。

注意:由于queue.Queue()的线程安全性,队列一般用在多线程编程中。一般而言,在异步编程中,你不需要关系线程安全性,除非你将多线程和异步结合在一起。

程序如下.
关于q.join的讨论参见StackOverflow

import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()

async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()

if __name__ == "__main__":

    random.seed(444)
    ns = {'nprod':5,'ncon':10}
    start = time.perf_counter()
#     asyncio.run(main(**ns.__dict__))
    await main(**ns)
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")
Producer 0 sleeping for 4 seconds.
Producer 2 sleeping for 7 seconds.
Producer 3 sleeping for 4 seconds.
Producer 4 sleeping for 10 seconds.
Consumer 0 sleeping for 7 seconds.
Consumer 1 sleeping for 8 seconds.
Consumer 2 sleeping for 4 seconds.
Consumer 3 sleeping for 7 seconds.
Consumer 4 sleeping for 1 seconds.
Consumer 5 sleeping for 6 seconds.
Consumer 6 sleeping for 9 seconds.
Consumer 7 sleeping for 3 seconds.
Consumer 8 sleeping for 9 seconds.
Consumer 9 sleeping for 7 seconds.
Producer 3 added <edbbf43c8f> to queue.
Producer 3 sleeping for 10 seconds.
Producer 0 added <c3c534841d> to queue.
Producer 0 sleeping for 0 seconds.
Consumer 4 got element <edbbf43c8f> in 0.00209 seconds.
Consumer 4 sleeping for 1 seconds.
Consumer 7 got element <c3c534841d> in 0.00034 seconds.
Consumer 7 sleeping for 0 seconds.
Producer 0 added <18a095afe9> to queue.
Producer 0 sleeping for 1 seconds.
Consumer 7 got element <18a095afe9> in 0.00081 seconds.
Consumer 7 sleeping for 9 seconds.
Producer 0 added <974eeee56f> to queue.
Producer 0 sleeping for 0 seconds.
Consumer 2 got element <974eeee56f> in 0.00042 seconds.
Consumer 2 sleeping for 5 seconds.
Producer 0 added <d9ec40973c> to queue.
Consumer 4 got element <d9ec40973c> in 0.00012 seconds.
Consumer 4 sleeping for 10 seconds.
Producer 2 added <562a1af9c6> to queue.
Producer 2 sleeping for 5 seconds.
Consumer 3 got element <562a1af9c6> in 0.00023 seconds.
Consumer 3 sleeping for 8 seconds.
Producer 4 added <a2081ac293> to queue.
Producer 4 sleeping for 2 seconds.
Consumer 9 got element <a2081ac293> in 0.00076 seconds.
Consumer 9 sleeping for 5 seconds.
Producer 4 added <34f8bf21e5> to queue.
Producer 4 sleeping for 5 seconds.
Producer 2 added <c405429c67> to queue.
Producer 2 sleeping for 0 seconds.
Consumer 0 got element <34f8bf21e5> in 0.00043 seconds.
Consumer 0 sleeping for 3 seconds.
Consumer 5 got element <c405429c67> in 0.00024 seconds.
Consumer 5 sleeping for 1 seconds.
Producer 2 added <6311405957> to queue.
Producer 2 sleeping for 5 seconds.
Consumer 1 got element <6311405957> in 0.00013 seconds.
Consumer 1 sleeping for 6 seconds.
Producer 3 added <1d41a6d6ac> to queue.
Producer 3 sleeping for 10 seconds.
Consumer 8 got element <1d41a6d6ac> in 0.00024 seconds.
Consumer 8 sleeping for 5 seconds.
Producer 4 added <c190a596b1> to queue.
Producer 4 sleeping for 6 seconds.
Producer 2 added <de35cb5819> to queue.
Consumer 6 got element <c190a596b1> in 0.00025 seconds.
Consumer 6 sleeping for 4 seconds.
Consumer 2 got element <de35cb5819> in 0.00078 seconds.
Consumer 2 sleeping for 10 seconds.
Producer 4 added <a6d8a2323c> to queue.
Producer 4 sleeping for 8 seconds.
Consumer 5 got element <a6d8a2323c> in 0.00048 seconds.
Consumer 5 sleeping for 10 seconds.
Producer 3 added <a181582882> to queue.
Producer 3 sleeping for 10 seconds.
Consumer 7 got element <a181582882> in 0.00040 seconds.
Consumer 7 sleeping for 7 seconds.
Producer 4 added <50ecadf350> to queue.
Producer 4 sleeping for 6 seconds.
Consumer 3 got element <50ecadf350> in 0.00046 seconds.
Consumer 3 sleeping for 8 seconds.
Producer 3 added <dfe2ec3bdd> to queue.
Consumer 0 got element <dfe2ec3bdd> in 0.00019 seconds.
Consumer 0 sleeping for 3 seconds.
Producer 4 added <61abd6c15b> to queue.
Producer 4 sleeping for 6 seconds.
Consumer 9 got element <61abd6c15b> in 0.00050 seconds.
Consumer 9 sleeping for 9 seconds.
Producer 4 added <5773f1cc4f> to queue.
Producer 4 sleeping for 3 seconds.
Consumer 4 got element <5773f1cc4f> in 0.00083 seconds.
Consumer 4 sleeping for 2 seconds.
Producer 4 added <49b57af2c2> to queue.
Consumer 9 got element <49b57af2c2> in 0.00026 seconds.
Consumer 9 sleeping for 10 seconds.
Program completed in 45.99805 seconds.

异步的本质

异步的本质就是加强版的生成器。await的功能和yield类似。都是在程序执行时跳出,但是同时保留上下文,等待返回时继续执行。await更像是yield from。不过yield from x()也只是 for i in x():yield i的语法糖。

另外一个生成器的特征时,可以通过.send()方法向生成器中传递数据,也就允许生成器相互之间非阻塞式的调用。(协程也就可以了)不过这一点一般不用担心。如果要深究,可以看PEP 342.

event loop

event loop可以看作是一个while True的循环,不停的查看协程状态,如果协程等待的东西完成了就唤醒它。实现起来就是:
asyncio.run(main())
这个函数在Python3.7中引入。当所有协程完成后,自动关闭。
在之前的版本中会有这样的代码:

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

现在除非你要精细操作这个loop,否则是不需要的了。

关于event loop有几点需要注意

  1. 协程要绑定到event loop才有用,一般都是放到一个函数中,然后这个函数放到asyncio.run()
  2. 一般在单核上跑单线程的event loop已经够了,如果你想要多核运行,参照这个
  3. 你可以自己实现event loop

异步爬虫

使用异步模块aiohttp来进行

什么时候用异步

异步和多进程可以共存,一起使用。参照这里, 这里

异步和多线程,推荐使用异步。多线程调试困难,并且难以大规模使用。因为线程是系统资源

当你的任务有多个IO限制的子任务时,你可以考虑异步。
限制使用异步的主要原因是有些库不支持异步,因为你要找到能够awaitable的函数。
这个列表给出了支持异步的库。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容