Flask框架——基于Celery的后台任务

上篇文章我们学习了Flask框架——MongoEngine使用MongoDB数据库,这篇文章我们学习Flask框架——基于Celery的后台任务。

Celery

在Web开发中,我们经常会遇到一些耗时的操作,例如:上传/下载数据、发送邮件/短信,执行各种任务等等。这时我们可以使用分布式异步消息任务队列去执行这些任务。

Celery是一款非常简单、灵活、可靠的分布式异步消息队列工具,可以用于处理大量消息、实时数据以及任务调度。

Celery通过消息机制进行通信,一般使用中间人(Broker)作为客户端和职程(Worker)调节。

其工作流程如下图所示:



客户端发送消息任务给中间人(Broker),任务执行单元(Celery Worker)监控中间人中的任务队列,当中间人有消息任务时就分配任务给任务执行单元,任务执行单元在后台运行任务并返回请求。

注意:Celery可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。

Celery优点

  • 简单:上手比较简单,不需要配置文件就可以直接运行;
  • 高可用:如果出现丢失连接或连接失败,职程(Worker)和客户端会自动重试,并且中间人通过 主/主 主/从 的方式来进行提高可用性;
  • 快速:单个 Celery 进行每分钟可以处理数以百万的任务,而且延迟仅为亚毫秒(使用 RabbitMQ、 librabbitmq 在优化过后);
  • 灵活:Celery 的每个部分几乎都可以自定义扩展和单独使用,例如自定义连接池、序列化方式、压缩方式、日志记录方式、任务调度、生产者、消费者、中间人(Broker)等。

安装

Celery安装方式很简单,执行如下命令即可:

pip install celery

这里我们使用redis作为中间人,执行如下代码安装redis:

pip install redis

创建Celery程序

对比说明

(1)不使用Celery执行耗时任务,创建一个名为test.py文件,其示例代码如下:

import time

def add(a,b):                   
    time.sleep(5)               #休眠5秒
    return a+b      

if __name__ == '__main__':
    print('开始执行')
    result=add(2,3)             #调用add函数
    print('执行结束')
    print(result)

运行test.py文件,运行结果如下图:


(2)使用Celery执行耗时任务,创建一个名为tasks.py文件,示例代码如下:

import time
from celery import Celery

celery = Celery(                          #实例化Celery对象
    'tasks',                            #当前模块名
    broker='redis://localhost:6379/1',      #使用redis为中间人
    backend='redis://localhost:6379/2'      #结果存储
)

@celery.task()              #使用异步任务装饰器task
def add(a,b):
    time.sleep(5)           #休眠5秒
    return a+b

if __name__ == '__main__':
    print('开始执行')
    result=add.delay(2,3)           #调用add方法并使用delay延时函数    
    print('执行结束')
    print(result)

实例化Celery对象,其中第一个参数为当前模块名,第二个参数为中间人(Broker)的URL链接,第三个参数为中间人结果放回的存储URL链接,再调用add()方法时,需要使用delay延时函数。

运行tasks.py文件,运行结果如下图所示:


当我们运行tasks.py文件时,发现程序一下子就运行结束并返回任务id,

在终端执行如下代码运行Celery职程(Worker)服务:

celery -A tasks worker -l info

如下图所示:



虽然职程已经收到任务并且在分配到子进程运行了,但是发现该任务没有运行结束,这时因为Celery不支持在windows下运行任务,需要借助eventlet来完成,执行如下安装eventlet:

pip install eventlet 

安装成功后,执行如下代码运行Celery职程(Worker)服务:

celery -A tasks worker -l info -P eventlet  -c 10

运行结果如下:


Celery配置

大多数情况下,使用默认的配置即可满足我们的开发,不需要修改配置,当我们需要修改配置时,可以通过update进行配置,在上面的tasks.py添加如下代码:

celery.conf.update(
    task_serializer='json',
    accept_content=['json'],   
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

其中:

  • accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;

  • task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;

  • result_serializer:结果序列化格式,默认值为json;

  • timezone:配置Celery以使用自定义时区;

  • enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。

除了上面的配置参数,Celery还提供了很多很多配置参数,大家可以在官方配置文档中查看

Celery的配置信息比较多,通常情况下,我们会在tasks.py同级目录下为创建Celery的配置文件, 这里命名为celeryconfig.py,示例代码如下:

broker_url = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/2'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

执行如下代码加载配置:

import celeryconfig
app.config_from_object('celeryconfig')

在Flask项目中使用Celery

首先创建一个名为mycelery.py文件,该文件用来实例化Celery对象,示例代码如下:

from celery import Celery

def make_celery(app):
    celery = Celery(                        #实例化Celery
        'tasks',
        broker='redis://localhost:6379/1',      #使用redis为中间人
        backend='redis://localhost:6379/2'      #结果存储
    )
    class ContextTask(celery.Task):             #创建ContextTask类并继承Celery.Task子类
        def __call__(self, *args, **kwargs):    
            with app.app_context():                 #和Flask中的app建立关系
                return self.run(*args, **kwargs)    #返回任务
    celery.Task = ContextTask                   #异步任务实例化ContextTask
    return celery                               #返回celery对象

首先自定义一个名为make_celery()方法,该方法传入Flask程序中的app,在方法中实例化Celery,并创建一个名为ContextTask类用来和Flask中的app建立关系,最后返回celery。

创建名为tasks.py文件,该文件用来存放我们的耗时任务,示例代码如下:

import time
from app import celery
@celery.task            #使用异步任务装饰器task
def add(x, y):
    time.sleep(5)       #休眠5秒
    return x + y

这里我们通过休眠的方式来模拟耗时的下载任务。

Flask程序app.py文件示例代码如下:

from flask import Flask
import tasks
from mycelery import make_celery

app = Flask(__name__)
celery = make_celery(app)               #调用make_celery方法并传入app使celery和app进行关联

@app.route('/')
def hello():
    tasks.add.delay(1,2)                #调用tasks文件中的add()异步任务方法
    return '请求正在后台处理中,您可以去处理其他事情'

if __name__ == '__main__':
    app.run(debug=True)

app.py文件很简单,就调用make_celery方法使celery和app进行关联,并在视图函数中使用tasks中的异步任务方法。

在终端执行如下代码运行Celery职程(Worker)服务:

celery -A tasks worker -l info -P eventlet  -c 10

启动Flask程序,访问http://127.0.0.1:5000/后在终端查Worker服务,如下图所示:


这样就成功使用Celery把耗时任务交给后台来处理,避免了不必要的耗时等待(如下载数据任务)。

当我们不使用Celery时,用户在执行耗时任务时,用户可能要等耗时任务完成后,才能进行其他操作。

好了,Flask框架——基于Celery的后台任务就讲到这里了,感谢观看,下篇文章继续学习Flask框架的其他知识。
公众号:白巧克力LIN

该公众号发布Python、数据库、Linux、Flask、自动化测试、Git等相关文章!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容