# -*- coding: utf-8 -*-
import asyncio
import functools
import os
import signal
batch_task_num = 10
aio_task_q = asyncio.Queue(maxsize=batch_task_num)
aio_running_q = asyncio.Queue(maxsize=batch_task_num)
produce_finish_event = asyncio.Event()
async def produce_tasks():
for i in range(30):
await aio_task_q.put(i)
print("push", i)
produce_finish_event.set()
def ask_exit(signame, loop):
print("got signal %s: exit" % signame)
loop.stop()
async def handle_task(task):
print("running", task)
await asyncio.sleep(task)
print("end", task)
await aio_running_q.get()
async def consume_tasks(loop):
tasks = []
while not aio_task_q.empty() or not produce_finish_event.is_set():
await aio_running_q.put(True)
task = await aio_task_q.get()
tasks.append(loop.create_task(handle_task(task)))
await asyncio.wait(tasks)
loop.stop()
def main():
loop = asyncio.get_event_loop()
# for signame in {"SIGINT", "SIGTERM", "SIGQUIT"}:
# loop.add_signal_handler(
# getattr(signal, signame),
# functools.partial(ask_exit, signame, loop))
loop.create_task(produce_tasks())
loop.create_task(consume_tasks(loop))
loop.run_forever()
if __name__ == "__main__":
print("Event loop running, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")
main()
- 实现原理
- 该函数利用
asycio模块中的队列以及事件
实现N个任务同时执行(抽水式)
- 不同之处:该版本利用了队列维持N个任务同时执行
- 通过asycio事件确认是否完成生产者是否生产完毕