background.py & concurrency.py
实际上笔者对于异步的深层原理了解并不透彻,还只停留在勉强会用的水平。日后会对这方面只是进行系统性自下而上的学习。
后台任务
代码十分简单,在此仅贴出来
class BackgroundTask:
def __init__(
self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
) -> None:
self.func = func
self.args = args
self.kwargs = kwargs
self.is_async = asyncio.iscoroutinefunction(func)
async def __call__(self) -> None:
if self.is_async:
await self.func(*self.args, **self.kwargs)
else:
await run_in_threadpool(self.func, *self.args, **self.kwargs)
class BackgroundTasks(BackgroundTask):
def __init__(self, tasks: typing.Sequence[BackgroundTask] = []):
self.tasks = list(tasks)
def add_task(
self, func: typing.Callable, *args: typing.Any, **kwargs: typing.Any
) -> None:
task = BackgroundTask(func, *args, **kwargs)
self.tasks.append(task)
async def __call__(self) -> None:
for task in self.tasks:
await task()
异步
提供了一种异步循环方式,还有一个线程池
T = typing.TypeVar("T")
async def run_until_first_complete(*args: typing.Tuple[typing.Callable, dict]) -> None:
# 直到其中一个完成
# 在StreamingResponse中使用过
# await run_until_first_complete(
# (self.stream_response, {"send": send}),
# (self.listen_for_disconnect, {"receive": receive}),
# )
tasks = [handler(**kwargs) for handler, kwargs in args]
(done, pending) = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
[task.cancel() for task in pending]
[task.result() for task in done]
async def run_in_threadpool(
func: typing.Callable[..., T], *args: typing.Any, **kwargs: typing.Any
) -> T:
# 线程池
loop = asyncio.get_event_loop()
if contextvars is not None: # pragma: no cover
# 确保我们运行在相同的上下文中
child = functools.partial(func, *args, **kwargs)
context = contextvars.copy_context()
func = context.run
args = (child,)
elif kwargs: # pragma: no cover
# run_in_executor不接受“kwargs”,因此将它们绑定到这里
func = functools.partial(func, **kwargs)
return await loop.run_in_executor(None, func, *args)
class _StopIteration(Exception):
pass
def _next(iterator: Iterator) -> Any:
# 我们不能从线程池迭代器内部触发‘StopIteration’,
# 然后在上下文外部捕获它,所以我们强制它们进入不同的异常类型。
try:
return next(iterator)
except StopIteration:
raise _StopIteration
async def iterate_in_threadpool(iterator: Iterator) -> AsyncGenerator:
while True:
try:
yield await run_in_threadpool(_next, iterator)
except _StopIteration:
break