- aysnc_run.py
# -*- coding: utf-8 -*-
import asyncio
import functools
import os
import signal
from collections import deque
from ctypes import *
q = deque()
task_num = 5
run_num = 0
def ask_exit(signame, loop):
print("got signal %s: exit" % signame)
loop.stop()
def done_callback(loop, futu):
loop.stop()
async def handle_task(task):
global run_num
print(task, "start")
await asyncio.sleep(task)
print(task, "end")
run_num -= 1
async def handle_tasks(loop):
global run_num
exit_flag = False
tasks = []
while not exit_flag:
while run_num < 20:
run_num += 1
try:
task = q.popleft()
except IndexError:
exit_flag = True
break
tasks.append(loop.create_task(handle_task(task)))
await asyncio.sleep(2)
await asyncio.wait(tasks)
loop.stop()
def main():
loop = asyncio.get_event_loop()
# 注释中只对linux系统生效,用于接收信号做相应的处理
# for signame in {"SIGINT", "SIGTERM", "SIGQUIT"}:
# loop.add_signal_handler(
# getattr(signal, signame),
# functools.partial(ask_exit, signame, loop))
loop.create_task(handle_tasks(loop))
loop.run_forever()
if __name__ == "__main__":
print("Event loop running, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")
for i in range(500):
q.append(i)
main()
-
实现原理
- 全文维护 task_num(最大任务数量)、run_num(正在执行的任务数量)、队列q(可抽离当生产者)
-
while run_num < 20
:当最大任务数量小于20,则从队列取出任务加入协程并将run_num+1 - 当某一协程任务结束后,run_num-1
- 当任务数量大于20,则
await asyncio.sleep(2)
等待2S后再次检查正在执行的任务数量 - 当队列为空,则等待所有任务结束后主动退出
await asyncio.wait(tasks)
、loop.stop()
-
不足点
- 当队列为空时,则不会再添加新的任务(正在执行的任务结束后,直接退出程序)
-
知识点
- python的协程是安全的,则无需对队列、全局变量加锁