Python: asyncio与celery的结合

目前python asyncio的生态已经可以支撑大部分业务开发了,但异步任务这块,celery一是还没支持asyncio(记得5.0版本是计划支持的)二是还没看到好用的代替品,所以依赖异步任务的场景还是需要集成celery。

event loop

在考虑celery前,asyncio是自带异步任务的,一些小的异步任务可以通过ensure_feature丢到event loop里,比如starlette就支持在response上挂backgroud task

celery

因为celery不支持asyncio,所以worker在调用task需要使用loop.run_until_complete来运行asyncio代码,这样实际上worker在跟broker交互时是syncio,实际执行任务时是asyncio,性能上不会比同步代码差,而且执行任务过程可以不用再拆分大task到小task来实现并发(直接利用loop运行异步的小task),一段简单的包装代码:

from celery import shared_task


def task(
    *task_args, **task_kwargs
) -> typing.Callable[[typing.Callable], typing.Callable]:
    def decorator(func: typing.Callable):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if asyncio.iscoroutinefunction(func):
                return asyncio.get_event_loop().run_until_complete(
                    func(*args, **kwargs)
                )
            else:
                return func(*args, **kwargs)

        # setattr(wrapper, "origion_func", func)
        return shared_task(*task_args, **task_kwargs)(wrapper)

    return decorator


@task()
async def hello():
    await asyncio.sleep(1)

hello.delay()

坑一

由于celery worker是fork出来的子进程,然而event_loop是不支持fork的,如果在全局代码里生成了event_loop则子进程无法使用fork的event_loop,这时可以去掉全局代码的event_loop,也可以通过celery loader的hook重新生成event_loop:

class CeleryLoader(AppLoader):
    def on_worker_process_init(self):
        # uvloop.install()
        asyncio.set_event_loop(asyncio.new_event_loop())
        async_run(init_db())

celery = Celery("celery", loader=CeleryLoader)

坑二

如果不通过delay而是直接调用task函数的话(比如单元测试),由于是无法在一个running状态的loop里再调用run_until_complete的,最直接的解决方案是拿到注册的task的原始函数:

async def run_task(t, *args, **kwargs):
    f = t._get_current_object().run.origion_func  # 在task装饰器里注入origin_func变量,见上文
    if asyncio.iscoroutinefunction(f):
        return await f(*args, **kwargs)
    else:
        return f(*args, **kwargs)

总结

通过一点点的妥协,我们是可以在asyncio的生态享受到celery的便捷的,包括celery-beater等等都没问题。希望celery早日正式加入asyncio生态。

注:复制自个人博客:https://strongbugman.github.io/2022/01/14/asyncio-celery/ 非盗文

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容