Pythton 3.11中如何限制asyncio.gather的并发量?

import asyncio
import time


async def say_hello_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def run_with_limited_number_of_tasks(concurrency):
    semaphore = asyncio.Semaphore(concurrency)

    async def sem_task(task):
        async with semaphore:
            return await task

    print(f"starts at {time.strftime('%X')}")
    await asyncio.gather(*(sem_task(task) for task in [say_hello_after(1, f"hello from {i}") for i in range(6)]))
    print(f"ends at {time.strftime('%X')}")

解析

这段代码通过使用Python的asyncio.Semaphore来限制同时运行的协程数量。Semaphore是一种计数信号量,用于控制对共享资源的并发访问。
在代码中,run_with_limited_number_of_tasks函数接受一个concurrency参数,它代表允许同时运行的协程数量。semaphore = asyncio.Semaphore(concurrency)创建了一个信号量对象,并设置了允许的最大计数为concurrency

当调用say_hello_after函数时,它会创建一个asyncio.sleep异步任务,并在延迟后打印一条消息。然后,run_with_limited_number_of_tasks通过调用asyncio.gather来同时运行多个say_hello_after协程。在这里,它通过列表推导式创建了6个say_hello_after协程,然后用sem_task函数将它们包装在信号量上下文管理器中。

现在,让我们详细解释为什么这段代码可以限制在concurrency个协程以内同时运行:

asyncio.gather开始运行时,它会依次调用sem_task来运行协程,并且每次调用都会获得一个信号量资源。由于初始时信号量对象的计数为concurrency,因此最多可以同时有concurrency个协程在运行。

如果有空闲的信号量资源(计数大于零),则sem_task会立即获得一个信号量资源,并立即启动相应的say_hello_after协程。

如果当前正在运行的协程数量已经达到concurrency个,那么当有新的say_hello_after协程要运行时,它将等待,直到有一个信号量资源可用。这就实现了对同时运行协程数量的限制。

一旦某个say_hello_after协程完成(asyncio.sleep延迟结束),它将释放占用的信号量资源,使得另一个等待中的sem_task可以获得该资源,并启动新的say_hello_after协程。

总结来说,asyncio.Semaphore的作用在于控制同时运行的协程数量。这在某些情况下是很有用的,特别是当你需要限制对共享资源或外部服务的并发访问时,以避免资源竞争和提高效率。

提问:(sem_task(task) for task in [say_hello_after(1, f"hello from {i}") for i in range(6)])里在调用say_hello_after(1, f"hello from {i}")时不就应该已经执行了这个函数了吗?为什么它还能被传入sem_task并且被await呢?而且此时await tasktask后面并没有加括号,说明它此时已经是一个asyncio.Task了,为什么呢?

在表达式(sem_task(task) for task in [say_hello_after(1, f"hello from {i}") for i in range(6)])中,的确会立即调用say_hello_after(1, f"hello from {i}"),这将返回一个coroutine对象而不是执行该协程。这是因为协程函数(函数定义中包含async关键字的函数)返回的是一个协程对象,而不是立即执行协程。

当我们用asyncio.gather调用这些协程时,它实际上会将这些协程对象包装在一个Task对象中。在Python的asyncio库中,Task是一个表示正在进行的协程的对象,它封装了协程对象的执行状态。

所以,即使在asyncio.gather的参数中调用了say_hello_after(1, f"hello from {i}"),实际上是在创建一个Task对象并将其包含在asyncio.gather中。asyncio.gather会同时运行这些Task对象,并在它们完成时返回结果(或抛出异常)。

总结起来,在这段代码中:

  • 列表推导式创建了6个say_hello_after协程对象并以他们为入参传递给了协程函数sem_task
  • asyncio.gather将这些协程对象封装为Task对象,并同时运行它们。
  • 通过await task等待每个协程对象的完成,实现了限制同时运行的协程数量在concurrency以内的目的。
  • 这种使用列表推导式和asyncio.gather来同时运行多个协程并限制并发数的方式在asyncio中是一种常见的技术。

提问:“当我们用asyncio.gather调用这些协程时,它实际上会将这些协程对象包装在一个Task对象中。”那也应该是sem_task的返回值被包装成Task,而不是sem_task函数的入参say_hello_after(1, f"hello from {i}")被转化为Task,为什么还是可以被await呢?

Python3的asyncio库并没有被要求被await的一定要是asyncio.Task对象,协程对象也是可以被await的。不过当一个Task对象被await的时候,事件循环(event loop)并没有被阻塞;而当一个协程对象被await的时候,包围此await语句的所有代码都将暂停执行直到await的协程对象执行完毕(可理解代码段此刻变为串行执行)。所以,当我们基于上面定义的两个函数来运行下面的代码时:

if __name__ == '__main__':
    asyncio.run(run_with_limited_number_of_tasks(3))

我们会得到下面的结果:

starts at 21:38:25
hello from 0
hello from 1
hello from 2
hello from 3
hello from 4
hello from 5
ends at 21:38:27

从上面的结果可以看出,我们的代码是在一定基础上“按顺序”执行的,但是实际执行所花的时间并不是6秒(所有任务各1秒),而是按3个一组将任务分为2组来运行的,因此耗时为2秒。

如果我们不是await的一个协程对象,而是一个Task对象:

if __name__ == '__main__':
    async def await_tasks():
        print(f"starts at {time.strftime('%X')}")
        await asyncio.gather(*[say_hello_after(1, f"hello from {i}") for i in range(6)])
        print(f"ends at {time.strftime('%X')}")
    asyncio.run(await_tasks())

那么它的结果则更为随机,而且6个任务的执行总时间为1秒:

starts at 21:45:11
hello from 0
hello from 2
hello from 5
hello from 4
hello from 1
hello from 3
ends at 21:45:12
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352