Flask:在Flask中使用Celery实现异步任务

摘要:FlaskCelery

Celery简介

Celery是一个专注于实时处理任务调度的分布式任务队列,使用Celery的常见场景:

  • Web应用:当用户触发一个操作需要较长时间才能执行完成,可以把这个任务交给Celery异步执行,这段时间不需要用户等待,提高网站吞吐量和降低响应时间
  • 定时任务:Celery可以快速在不同机器设定不同的定时任务
  • 需要异步执行任务的其他场景:所有不需要必须同步完成的附加工作都可以异步完成,比如发送短信/邮件,推送消息,清理缓存等

Celery组件

(1)Celery包含的组件如下
  • Celery Beat:任务调度器,负责周期性的将需要执行的任务发送给任务队列
  • Celery Worker:消费者,负责执行任务,通常部署在多个服务器起多个消费者提升执行效率
  • Broker:消息代理(消息中间件),接受生产者发来的任务消息,再分发给消费者
  • Producer:生产者,负责调用Celery的API,函数或者装饰器产生任务,发送给消息中间件
  • Result Backend:任务处理完后保存的状态信息和结果,支持Redis,MongoDB,RabbitMQ存储等方式

Celery的各组件的工作流程如下


(2)消息代理的选择

Celery推荐使用RabbitMQ,Redis,如果选择Redis则存在断电停机数据丢失的问题

(3)数据序列化

数据在消息队列的传输需要序列化和反序列化,Celery支持json,yaml,msgpack,默认是json,其中msgpack是一个二进制类似json的序列化方案,比json数据结构更小更快


快速开始:Flask + Celery异步发送邮件

(1)安装准备

安装准备,以redis作为消息代理和结果存储,在安装celery,flask

pip install celery
pip install redis

版本如下

celery --version
5.1.2 (sun-harmonics)

flask --version
Python 3.7.6
Flask 1.1.1
Werkzeug 1.0.0
(2)Flask应用脚本

构建一个Web应用实现给输入的邮箱发送邮件,发送邮件采用Python自带模块smtplibemail,前端使用HTML渲染,Flask后台完成POST表单获取邮箱数据发送邮件,分别对比同步执行和使用Celery异步执行的响应的时间

import os
import sys
import traceback
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header

sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from flask import Flask, render_template, request, redirect, url_for, session
from celery import Celery

from settings import Config

app = Flask(__name__)
app.config.from_object(Config)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'  # 消息代理
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'  # 任务结果写入redis
app.config['CELERY_TASK_SERIALIZER'] = 'msgpack'  # 序列化方式
app.config['CELERY_ACCEPT_CONTENT'] = ['msgpack', 'json']  # 如果指定了msgpack序列化方式,需要增加msgpack为可接受

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)  # 使用flask app的config

@app.route("/", methods=["GET", "POST"])
def index():
    if request.method == "GET":
        return render_template("index.html")
    email = request.form.get("email")
    if request.form.get("submit") == "Send":
        # send_async_email.delay("直接发送", email)
        import time
        t = time.time()
        # send_async_email("直接发送", email)
        send_async_email.delay("直接发送", email)
        # TODO somethine
        print(time.time() - t)
    elif request.form.get("submit") == "Send in 1 minute":
        send_async_email.apply_async(["延迟1分分钟发送", email], countdown=60)
    return redirect(url_for('index'))


@celery.task(name='main.send_async_email')
def send_async_email(message, to_email):
    send_mail(message, to_email)


def send_mail(message, to_email):
    conn = None
    try:
        conn = smtplib.SMTP_SSL(app.config["SMTP_HOST"], app.config["SMTP_PORT"])
        conn.login(app.config["FROM_EMAIL_ACCOUNT"], app.config["FROM_EMAIL_PASSWORD"])
        msg = MIMEMultipart()
        subject = Header('测试邮件', 'utf-8').encode()
        msg['Subject'] = subject
        msg['From'] = app.config["FROM_EMAIL_ACCOUNT"]
        msg['To'] = to_email
        text = MIMEText(message, 'plain', 'utf-8')
        msg.attach(text)
        conn.sendmail(app.config["FROM_EMAIL_ACCOUNT"], to_email, msg.as_string())
    except Exception as e:
        traceback.print_exc()
    finally:
        if conn:
            conn.quit()


if __name__ == '__main__':
    app.run("0.0.0.0", "5010")

脚本主要有2个地方加入和调用了Celery生成的任务

  • 创建Celery异步任务:脚本中使用@celery.task(name='main.send_async_email')装饰器构建了一个Celery任务,他的作用是把普通的Python函数包装成Celery任务给队列异步执行,其中需要指定name为模块名.函数名,否则Celery会找不到这个任务(没有注册成功)
  • 执行Celery异步任务:调用了send_async_email.delay()方法,如果不加delay就是调用普通的Python函数

下一步启动Flask,最终的Web界面显示效果如下


(3)启动Celery消费者

只有启动了消费者,脚本中需要被传输给中间件的任务才会被执行,下一步启动Celery的消费者,在项目根目录下命令行中输入

root@ubuntu:~/myproject/CELERY_TEST# celery -A celery_test.main:celery worker -l info
/opt/anaconda3/lib/python3.7/site-packages/celery/platforms.py:835: SecurityWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,
 
 -------------- celery@ubuntu (sun-harmonics)
--- ***** ----- 
-- ******* ---- Linux-4.15.0-54-generic-x86_64-with-debian-buster-sid 2021-07-22 19:57:25
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_test.main:0x7f6430716690
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . main.send_async_email

[2021-07-22 19:57:25,863: WARNING/MainProcess] Please run `celery upgrade settings path/to/settings.py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.
[2021-07-22 19:57:26,144: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-07-22 19:57:26,169: INFO/MainProcess] mingle: searching for neighbors
[2021-07-22 19:57:27,192: INFO/MainProcess] mingle: all alone
[2021-07-22 19:57:27,231: INFO/MainProcess] celery@ubuntu ready.

出现最后一行ready表示消费者启动成功,同时可以看到celery配置的相关参数:

  • app:应用在celery_test.main模块下
  • transport:消息队列使用redis存储,地址是localhost:6379/0
  • results:完成结果存储在redis,地址是localhost:6379/0
  • concurrency:线程4
  • [tasks]:执行的任务是main模块下的send_async_email方法

Celery可以采用后台运行的方式,并且将日志写入指定目录,方法是使用celery multi来管理任务的启动和停止等,指定logfile日志目录和pidfile的pid目录,以一个自定义的命名来定义任务名,比如web

root@ubuntu# celery multi start web -A celery_test.main:celery -l info --logfile=logs/celery_%n.log --pidfile=celery_%n.pid
celery multi v5.1.2 (sun-harmonics)
> Starting nodes...
    > web@ubuntu: OK(sun-harmonics)
> Starting nodes...
    > web@ubuntu: OK
root@ubuntu:~/myproject
root@ubuntu:~/myproject/CELERY_TEST# ls
celery_test  celery_web.pid  logs  __pycache__  settings.py
root@ubuntu:~/myproject/CELERY_TEST/logs# ls
celery_web.log

任务在后台启动,并在在目录下生成了pid文件也log目录

(4)异步任务测试

在界面输入邮箱地址发送邮件,点击send发送,此时是采用delay方式执行函数,采用异步发送邮件,视图函数中使用time进行计时,假设在发送邮件下面还有其他任务,记录这些总共花费的时间
异步方式执行完发送邮件和其他任务总计花费8ms

127.0.0.1 - - [23/Jul/2021 14:32:44] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:32:44] "GET / HTTP/1.1" 200 -
0.009320974349975586
127.0.0.1 - - [23/Jul/2021 14:32:46] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:32:46] "GET / HTTP/1.1" 200 -
0.008226156234741211
127.0.0.1 - - [23/Jul/2021 14:32:48] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:32:48] "GET / HTTP/1.1" 200 -
0.007288694381713867

然后使用原始的Python函数不调用delay(send_async_email("直接发送", email)),需要800ms左右,并且网页端明显感觉有卡顿

0.8022255897521973
127.0.0.1 - - [23/Jul/2021 14:35:00] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:35:00] "GET / HTTP/1.1" 200 -
0.7504754066467285
127.0.0.1 - - [23/Jul/2021 14:35:02] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:35:02] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [23/Jul/2021 14:35:04] "POST / HTTP/1.1" 302 -
127.0.0.1 - - [23/Jul/2021 14:35:04] "GET / HTTP/1.1" 200 -
0.8088760375976562
0.8098959922790527

进一步修改html加入计算机的功能,将执行函数改为直接sleep 10秒,可以在Web界面发现提交之后等10秒才能得到计算结果

@celery.task(name='main.send_async_email')
def send_async_email(message, to_email):
    import time
    time.sleep(10)
@app.route("/", methods=["GET", "POST"])
def index():
    if request.method == "GET":
        return render_template("index.html")
    email = request.form.get("email")
    val1 = request.form.get("value1")
    val2 = request.form.get("value2")
    if request.form.get("submit") == "Send":
        # send_async_email.delay("直接发送", email)
        import time
        t = time.time()
        send_async_email("直接发送", email)
        # send_async_email.delay("异步发送", email)
        value3 = float(val1) + float(val2)
    return render_template("index.html", **locals())
(5)观察Redis中存储的队列和处理结果

修改消息队列的broker地址为redis库1,结果处理地址为redis库2,查看redis的数据

root@ubuntu:~# redis-cli 
127.0.0.1:6379> select 1;
(error) ERR invalid DB index
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> keys *
1) "_kombu.binding.celery.pidbox"
2) "_kombu.binding.celeryev"
3) "_kombu.binding.celery"
127.0.0.1:6379[1]> select 2
OK
127.0.0.1:6379[2]> keys *
1) "celery-task-meta-4b64592e-bcc5-4dcf-b838-e0960b7c8e4b"
2) "celery-task-meta-80cb2a66-23f0-4e22-80af-7faa39f12094"
127.0.0.1:6379[2]> get celery-task-meta-80cb2a66-23f0-4e22-80af-7faa39f12094
"{\"status\": \"SUCCESS\", \"result\": null, \"traceback\": null, \"children\": [], \"date_done\": \"2021-07-23T07:19:06.224197\", \"task_id\": \"80cb2a66-23f0-4e22-80af-7faa39f12094\"}"

其中kombu是Celery使用kombu 维护消息队列存储的相关数据,celery-task-meta相关的数据是存储结果数据,json类型,包含处理状态,时间,处理结果,任务id等

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351