在Flask中使用Celery(上)

为了在后台运行任务,我们可以使用线程(或者进程)。
使用线程(或者进程)的好处是保持处理逻辑简洁。但是,在需要可扩展的生产环境中,我们也可以考虑使用Celery代替线程。

Celery是什么?
Celery是个异步分布式任务队列。
通过Celery在后台跑任务并不像用线程那么的简单,但是用Celery的话,能够使应用有较好的可扩展性,因为Celery是个分布式架构。下面介绍Celery的三个核心组件。

  • 生产者(Celery client)。生产者(Celery client)发送消息。在Flask上工作时,生产者(Celery client)在Flask应用内运行。

  • 消费者(Celery workers)。消费者用于处理后台任务。消费者(Celery client)可以是本地的也可以是远程的。我们可以在运行Flask的server上运行一个单一的消费者(Celery workers),当业务量上涨之后再去添加更多消费者(Celery workers)。

  • 消息传递者(message broker)。生产者(Celery client)和消费者(Celery workers)的信息的交互使用的是消息队列(message queue)。Celery支持若干方式的消息队列,其中最常用的是RabbitMQ和Redis。

话不多说上代码先。代码中包含两个例子:异步发送邮件;开始一或多个异步工作,然后在网页上更新其进度。

from flask import Flask
from celery import Celery
 
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
 
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

其中的URL参数告诉了Celery,消息传递的服务的位置。如果消息传递者用的不是Redis,或者Redis部署在其他机器,那么需要做适当的改变。
而通过调用 celery.conf.update()方法,我们能够为Celery同步Flask上的配置。仅当需要Celery存储状态即存储结果时,CELERY_RESULT_BACKEND 选项才会被用到。
下文第一个例子不需要存储状态以及存储结果,但是第二个例子是需要的,所以一次配置好。

任何想要在后台运行的任务,都需要使用装饰者celery.task
进行包装,如下。

@celery.task
def my_background_task(arg1, arg2):
    # some long running task here 
    return result 

现在Flask 应用就能够发起“在后台执行任务”的请求了,如下。
task = my_background_task.delay(10, 20)
其中delay() 方法是 apply_async() 的快捷调用。

此处用apply_async()同样奏效,如下。
task = my_background_task.apply_async(args=[10, 20])

相比于 delay() 方法,当使用 apply_async() 方法时,我们能够对后台任务的执行方式有更多的控制。例如任务在何时执行等。

举例来说,下面的代码可以让任务在一分钟之后开始运行。
task = my_background_task.apply_async(args=[10, 20], countdown=60)

delay()apply_async() 的返回值是一个 AsyncResult 的对象。通过该对象,能够获得任务的状态。

例一:异步发邮件
第一个例子的需求比较广泛:发电子邮件的时候无需阻塞主应用线程。本例使用了扩展Flask-Mail。

网页包含了一个Text类型的域的表单。用户需要在其中输入邮箱地址,点击提交,然后服务器向该地址发送一封测试邮件。该表单包含两个提交按钮,其中一个会立即发送邮件,而另一个会在点击后延迟一分钟后再发送。html代码如下。

<html>
  <head>
    <title>Flask + Celery Examples</title>
  </head>
  <body>
    <h1>Flask + Celery Examples</h1>
    <h2>Example 1: Send Asynchronous Email</h2>
    {% for message in get_flashed_messages() %}
    <p style="color: red;">{{ message }}</p>
    {% endfor %}
    <form method="POST">
      <p>Send test email to: <input type="text" name="email" value="{{ email }}"></p>
      <input type="submit" name="submit" value="Send">
      <input type="submit" name="submit" value="Send in 1 minute">
    </form>
  </body></html>

用于发送邮件的Flask-Mail需要一些配置,主要与发送邮件的邮件服务器、发送邮件时间相关。考虑到用户名密码安全性,作者将其放到了环境变量中。

# Flask-Mail configuration
app.config['MAIL_SERVER'] = 'smtp.googlemail.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USERNAME'] = os.environ.get('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = 'flask@example.com'

异步发送代码如下。

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html', email=session.get('email', ''))
    email = request.form['email']
    session['email'] = email

    # send the email
    msg = Message('Hello from Flask',
                  recipients=[request.form['email']])
    msg.body = 'This is a test email sent from a background Celery task.'
    if request.form['submit'] == 'Send':
        # send right away
        send_async_email.delay(msg)
        flash('Sending email to {0}'.format(email))
    else:
        # send in one minute
        send_async_email.apply_async(args=[msg], countdown=60)
        flash('An email will be sent to {0} in one minute'.format(email))

    return redirect(url_for('index'))

用 session 将用户键入的信息保存,以便页面刷新时能够使用该信息。
朋友们发现了,重点在发送邮件的代码,使用的是Celery 的任务send_async_email,通过调用它的 delay() 方法或apply_async() 进行异步发送。

最后来看异步任务代码。

@celery.task
def send_async_email(msg):
     """Background task to send an email with Flask-Mail.""" 
    with app.app_context():
    mail.send(msg)

使用装饰者 celery.task 包装 send_async_email , 使其成为后台运行的任务。因为Flask-Mail需要应用的context,所以需要在调用send方法前先创建应用的context环境。
另一点很重要,从异步调用的返回值是不会保存的,所以应用本身无法知道是否异步调用是否成功。在这个例子之中需要看Celery的消费者的输出才能确定发送邮件过程是否有问题。
第一个例子比较简单,我们起了后台任务然后就不必再去管它了。很多应用的需求与例子一相仿。

然而也会有一些应用,需要监控后台任务的运行,获得任务的结果。下面来看第二个例子。

例二:显示状态更新进度
用户可以点击按钮以启动一个或者多个长时间任务,此时在网页使用ajax技术不断轮询服务器以更新所有的这些长时间任务们的状态。
而对于每一个长时间任务,网页上会有一个窗台条、一个进度百分比、一个状态消息与之对应,当完成时会显示相应结果。

状态更新时后台任务代码。

@celery.task(bind=True)
def long_task(self):
    """Background task that runs a long function with progress reports."""
    verb = ['Starting up', 'Booting', 'Repairing', 'Loading', 'Checking']
    adjective = ['master', 'radiant', 'silent', 'harmonic', 'fast']
    noun = ['solar array', 'particle reshaper', 'cosmic ray', 'orbiter', 'bit']
    message = ''
    total = random.randint(10, 50)
    for i in range(total):
        if not message or random.random() < 0.25:
            message = '{0} {1} {2}...'.format(random.choice(verb),
                                              random.choice(adjective),
                                              random.choice(noun))
        self.update_state(state='PROGRESS',
                          meta={'current': i, 'total': total,
                                'status': message})
        time.sleep(1)
    return {'current': 100, 'total': 100, 'status': 'Task completed!',
            'result': 42}

代码中作者在Celery 装饰者中加入了 bind=True 参数,这使得Celery向函数中传入了self参数,因此在函数中能够记录状态更新。
本例中随机挑选了一些单词作为状态的更新,同时,选取随机数作为每个后台任务运行时间。
self.update_state()方法用于指明 Celery如何接收任务更新。
Celery有很多内建状态比如STARTED , SUCCESS 等等,当然Celery也允许程序员自定义状态。本例子中使用的是自定义状态,PROGRESS 。与PROGRESS 一起的还有metadatametadata 是一个字典,包含当
前进度,任务大小,以及消息。
当循环跳出时,返回字典,字典中包含任务的执行结果。

long_task() 函数在 Celery消费者进程中运行。下面看一下Flask应用如何启动该后台任务。

@app.route('/longtask', methods=['POST'])
def longtask():
    task = long_task.apply_async() 
    return jsonify({}), 202, {'Location': url_for('taskstatus', task_id=task.id)}

用户需要向/longtask 发送 POST 请求以触发后台任务执行。服务器启动任务并存储返回值。作者使用了状态码202,在REST API中有“请求正在处理中”的意思,而加入了Location头则是为了生产者能够获取任务执行时的状态信息。url_for用于生成路由到taskstatus函数的url,并且该url包含task id,task id的值是task.id .

taskstatus 函数用于获取后台任务的更新状态。

@app.route('/status/<task_id>')
def taskstatus(task_id):
    task = long_task.AsyncResult(task_id)
    if task.state == 'PENDING':
        // job did not start yet
        response = {
            'state': task.state,
            'current': 0,
            'total': 1,
            'status': 'Pending...'
        }
    elif task.state != 'FAILURE':
        response = {
            'state': task.state,
            'current': task.info.get('current', 0),
            'total': task.info.get('total', 1),
            'status': task.info.get('status', '')
        }
        if 'result' in task.info:
            response['result'] = task.info['result']
    else:
        # something went wrong in the background job
        response = {
            'state': task.state,
            'current': 1,
            'total': 1,
            'status': str(task.info),  # this is the exception raised
        }
    return jsonify(response)

为了得到后台任务产生的数据,使用了task id作为参数创建了一个task 对象。
本函数产生了JSON响应,JSON响应中的内容与update_state()更新的一致。
我们使用task.state区分后台任务的状态:本例有未运行、未发生错误、发生错误三种状态。
我们使用 task.info 访问任务相关信息。而发生错误时, task.state 的状态是 FAILURE 时,异常会包含在 task.info 之中。

前端JS代码
作者用的是nanobar.js实现进度条,用了jQuery的ajax。

<script src="//cdnjs.cloudflare.com/ajax/libs/nanobar/0.2.1/nanobar.min.js"></script>
<script src="//cdnjs.cloudflare.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>

启动后台任务的按钮的JS代码如下。

function start_long_task() {
        // add task status elements 
        div = $('<div class="progress"><div></div><div>0%</div><div>...</div><div> </div></div><hr>');
        $('#progress').append(div);

        // create a progress bar
        var nanobar = new Nanobar({
            bg: '#44f',
            target: div[0].childNodes[0]
        });

        // send ajax POST request to start background job
        $.ajax({
            type: 'POST',
            url: '/longtask',
            success: function(data, status, request) {
                status_url = request.getResponseHeader('Location');
                update_progress(status_url, nanobar, div[0]);
            },
            error: function() {
                alert('Unexpected error');
            }
        });
    }

其中被加入的HTML元素与任务的信息的对应关系如下。

<div class="progress">
    <div></div>          <-- Progress bar 
    <div>0%</div>        <-- Percentage 
    <div>...</div>       <-- Status message 
    <div> </div>    <-- Result
</div><hr>

start_long_task() 函数通过ajax向 /longtask发送POST请求,使得后台任务开始运行。
当ajax的POST请求返回时,回调函数获得响应,响应中包含形如 /status/<task_id>的url, 其他函数(如update_progress )用此url从 taskstatus 函数获取数据。
调用函数 update_progress() ,向函数传入start_url 以及 nanoba r变量,用于生成进度条。

function update_progress(status_url, nanobar, status_div) {
        // send GET request to status URL
        $.getJSON(status_url, function(data) {
            // update UI
            percent = parseInt(data['current'] * 100 / data['total']);
            nanobar.go(percent);
            $(status_div.childNodes[1]).text(percent + '%');
            $(status_div.childNodes[2]).text(data['status']);
            if (data['state'] != 'PENDING' && data['state'] != 'PROGRESS') {
                if ('result' in data) {
                    // show result
                    $(status_div.childNodes[3]).text('Result: ' + data['result']);
                }
                else {
                    // something unexpected happened
                    $(status_div.childNodes[3]).text('Result: ' + data['state']);
                }
            }
            else {
                // rerun in 2 seconds
                setTimeout(function() {
                    update_progress(status_url, nanobar, status_div);
                }, 2000);
            }
        });
    }

update_progress函数向/status/<task_id>发送GET请求,获得json数据然后更新相应的页面元素。
当后台任务完成时,result会加载到页面之中。如果没有result的话,这就意味着任务的执行以失败告终,此时任务的状态是 FAILURE 。
任当后台任务运行时,为了能够持续获得任务状态并更新页面,作者使用了定时器,定时器每个两秒一更新直到后台任务完成。

运行例子
读者先安装好virtualenv(强烈推荐!但是virtualenv非必需安装)。
下载代码,安装相应库,如下。

1 $ git clone https://github.com/miguelgrinberg/flask-celery-example.git
2 $ cd flask-celery-example
3 $ virtualenv venv
4 $ source venv/bin/activate
5 (venv) $ pip install -r requirements.txt

未安装virtualenv的话直接跳过第三行第四行命令。

redis server端读者自行安装。安装后运行启动。
Celery 消费者也需要读者运行,使用 celery命令。
邮件用户名密码自行设置。

$ export MAIL_USERNAME=<your-mail-username>
$ export MAIL_PASSWORD=<your-mail-password>
$ source venv/bin/activate(venv) 
$ celery worker -A app.celery --loglevel=info

Celery的 -A选项是应用中的celer对象,与文章最开头的代码对应。
--loglevel=info 则是让日志内容更为详细。

最后启动应用。
$ source venv/bin/activate(venv) $ python app.py

访问http://localhost:5000/ 即可。

原文链接 : http://blog.miguelgrinberg.com/post/using-celery-with-flask
译文链接 : http://www.cnblogs.com/ifkite/p/4257721.html
姊妹篇链接:http://blog.miguelgrinberg.com/post/celery-and-the-flask-application-factory-pattern

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容