import asyncio
import time
async def say_hello_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def run_with_limited_number_of_tasks(concurrency):
semaphore = asyncio.Semaphore(concurrency)
async def sem_task(task):
async with semaphore:
return await task
print(f"starts at {time.strftime('%X')}")
await asyncio.gather(*(sem_task(task) for task in [say_hello_after(1, f"hello from {i}") for i in range(6)]))
print(f"ends at {time.strftime('%X')}")
解析
这段代码通过使用Python的asyncio.Semaphore
来限制同时运行的协程数量。Semaphore是一种计数信号量,用于控制对共享资源的并发访问。
在代码中,run_with_limited_number_of_tasks
函数接受一个concurrency
参数,它代表允许同时运行的协程数量。semaphore = asyncio.Semaphore(concurrency)
创建了一个信号量对象,并设置了允许的最大计数为concurrency
。
当调用say_hello_after
函数时,它会创建一个asyncio.sleep
异步任务,并在延迟后打印一条消息。然后,run_with_limited_number_of_tasks
通过调用asyncio.gather
来同时运行多个say_hello_after
协程。在这里,它通过列表推导式创建了6个say_hello_after
协程,然后用sem_task
函数将它们包装在信号量上下文管理器中。
现在,让我们详细解释为什么这段代码可以限制在concurrency个协程以内同时运行:
当asyncio.gather
开始运行时,它会依次调用sem_task
来运行协程,并且每次调用都会获得一个信号量资源。由于初始时信号量对象的计数为concurrency
,因此最多可以同时有concurrency
个协程在运行。
如果有空闲的信号量资源(计数大于零),则sem_task
会立即获得一个信号量资源,并立即启动相应的say_hello_after
协程。
如果当前正在运行的协程数量已经达到concurrency
个,那么当有新的say_hello_after
协程要运行时,它将等待,直到有一个信号量资源可用。这就实现了对同时运行协程数量的限制。
一旦某个say_hello_after
协程完成(asyncio.sleep
延迟结束),它将释放占用的信号量资源,使得另一个等待中的sem_task
可以获得该资源,并启动新的say_hello_after
协程。
总结来说,asyncio.Semaphore
的作用在于控制同时运行的协程数量。这在某些情况下是很有用的,特别是当你需要限制对共享资源或外部服务的并发访问时,以避免资源竞争和提高效率。
提问:(sem_task(task) for task in [say_hello_after(1, f"hello from {i}") for i in range(6)])
里在调用say_hello_after(1, f"hello from {i}")
时不就应该已经执行了这个函数了吗?为什么它还能被传入sem_task
并且被await
呢?而且此时await task
的task
后面并没有加括号,说明它此时已经是一个asyncio.Task
了,为什么呢?
在表达式(sem_task(task) for task in [say_hello_after(1, f"hello from {i}") for i in range(6)])
中,的确会立即调用say_hello_after(1, f"hello from {i}")
,这将返回一个coroutine对象而不是执行该协程。这是因为协程函数(函数定义中包含async关键字的函数)返回的是一个协程对象,而不是立即执行协程。
当我们用asyncio.gather
调用这些协程时,它实际上会将这些协程对象包装在一个Task
对象中。在Python的asyncio
库中,Task
是一个表示正在进行的协程的对象,它封装了协程对象的执行状态。
所以,即使在asyncio.gather
的参数中调用了say_hello_after(1, f"hello from {i}")
,实际上是在创建一个Task对象并将其包含在asyncio.gather
中。asyncio.gather
会同时运行这些Task
对象,并在它们完成时返回结果(或抛出异常)。
总结起来,在这段代码中:
- 列表推导式创建了6个
say_hello_after
协程对象并以他们为入参传递给了协程函数sem_task
。 -
asyncio.gather
将这些协程对象封装为Task
对象,并同时运行它们。 - 通过
await task
等待每个协程对象的完成,实现了限制同时运行的协程数量在concurrency
以内的目的。 - 这种使用列表推导式和
asyncio.gather
来同时运行多个协程并限制并发数的方式在asyncio
中是一种常见的技术。
提问:“当我们用asyncio.gather
调用这些协程时,它实际上会将这些协程对象包装在一个Task
对象中。”那也应该是sem_task
的返回值被包装成Task
,而不是sem_task
函数的入参say_hello_after(1, f"hello from {i}")
被转化为Task
,为什么还是可以被await
呢?
Python3的asyncio
库并没有被要求被await
的一定要是asyncio.Task
对象,协程对象也是可以被await
的。不过当一个Task
对象被await
的时候,事件循环(event loop)并没有被阻塞;而当一个协程对象被await
的时候,包围此await
语句的所有代码都将暂停执行直到await
的协程对象执行完毕(可理解代码段此刻变为串行执行)。所以,当我们基于上面定义的两个函数来运行下面的代码时:
if __name__ == '__main__':
asyncio.run(run_with_limited_number_of_tasks(3))
我们会得到下面的结果:
starts at 21:38:25
hello from 0
hello from 1
hello from 2
hello from 3
hello from 4
hello from 5
ends at 21:38:27
从上面的结果可以看出,我们的代码是在一定基础上“按顺序”执行的,但是实际执行所花的时间并不是6秒(所有任务各1秒),而是按3个一组将任务分为2组来运行的,因此耗时为2秒。
如果我们不是await
的一个协程对象,而是一个Task
对象:
if __name__ == '__main__':
async def await_tasks():
print(f"starts at {time.strftime('%X')}")
await asyncio.gather(*[say_hello_after(1, f"hello from {i}") for i in range(6)])
print(f"ends at {time.strftime('%X')}")
asyncio.run(await_tasks())
那么它的结果则更为随机,而且6个任务的执行总时间为1秒:
starts at 21:45:11
hello from 0
hello from 2
hello from 5
hello from 4
hello from 1
hello from 3
ends at 21:45:12