Celery-分布式任务队列学习笔记

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。
它是一个专注于实时处理的任务队列,同时也支持任务调度。
以上是celery自己官网的介绍

celery的应用场景很广泛

  • 处理异步任务
  • 任务调度
  • 处理定时任务
  • 分布式调度

好处也很多,尤其在使用python构建的应用系统中,无缝衔接,使用相当方便。

Celery

安装

安装Celery

推荐使用pip安装,如果你使用的是虚拟环境,请在虚拟环境里安装

$ pip install celery

安装消息中间件

Celery 支持 RabbitMQ、Redis 甚至其他数据库系统作为其消息代理中间件

你希望用什么中间件和后端就请自行安装,一般都使用redis或者RabbitMQ

安装Redis

在Ubuntu系统下使用apt-get命令就可以

$ sudo apt-get install redis-server

如果你使用redis作为中间件,还需要安装redis支持包,同样使用pip安装即可

$ pip install redis

能出现以下结果即为成功

redis 127.0.0.1:6379>

其他的redis知识这里不左介绍,如果有兴趣,可以自行了解

如果你使用RabbitMQ,也请安装RabbitMQ

安装RabbitMQ

$ sudo apt-get install rabbitmq-server

使用Celery

简单直接使用

可以在需要的地方直接引入Celery,直接使用即可。最简单的方式只需要配置一个任务和中间人即可

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/3')

@app.task
def add(x, y):
    return x + y

我这里使用了redis作为中间件,这是可以按自己的习惯替换的

由于默认的配置不是最切合我们的项目实际需要,一般来说我们都需要按我们自己的要求配置一些,
但是由于需要将项目解耦,也好维护,我们最好使用单独的一个文件编写配置。

单独配置配置文件

比上面的稍微复杂一点,我们需要创建两个文件,一个为config.py的celery配置文件,在其中填写适合我们项目的配置,在创建一个tasks.py文件来编写我们的任务。文件的名字可以按你的喜好自己命名。

config.py内容为:

# coding=utf-8
# 配置文件同一配置celery
BROKER_URL = 'redis://localhost:6379/3'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/4'

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

# 把“脏活”路由到专用的队列:
CELERY_ROUTES = {
    'tasks.add': 'low-priority',
}

# 限制任务的速率,这样每分钟只允许处理 10 个该类型的任务:
CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

配置好以后可以用以下命令检查配置文件是否正确(config为配置文件名)

$ python -m config

tasks.py内容为:

# coding=utf-8
from celery import Celery

app = Celery()
# 参数为配置文件的文件名
app.config_from_object('config')

@app.task
def add(x, y):
    return x + y

还有一种同一设置配置的方式,不是很推荐

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

在app使用前先需要用以上方法批量更新配置文件。

在应用上使用

工程目录结构为

proj/
    __init__.py
    # 存放配置和启动celery代码
    celery.py
    # 存放任务
    tasks.py

celery.py为:

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='redis://localhost:6379/3',
             backend='redis://localhost:6379/4',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

tasks.py为:

from __future__ import absolute_import, unicode_literals
from .celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

启动celery只需要在proj同级目录下:

$ celery -A proj worker -l info

在django中使用celery

我们的django的项目的目录结构一般如下

proj/
    manage.py
    myapp/
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

想要在django项目中使用celery,我们首先需要在django中配置celery

我们需要在与工程名同名的子文件夹中添加celery.py文件
在本例中也就是proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
# 第二个参数为工程名.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

# 括号里的参数为工程名
app = Celery('proj')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
# 配置文件需要写在setting.py中,并且配置项需要使用`CELERY_`作为前缀
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
# 能够自动加载所有在django中注册的app,也就是setting.py中的INSTALLED_APPS
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

然后我们需要在同级目录下的init.py文件中配置如下内容
proj/proj/init.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

然后我们就可以把需要的任务放到需要的app下的tasks.py中,现在项目目录结构如下

proj/
    manage.py
    myapp1/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    myapp2/
        __init__.py
        tasks.py
        views.py
        model.py
        tests.py
    proj/
        __init__py
        settings.py
        urls.py
        wsgi.py

可能的一个tasks.py文件内容如下:
myapp1/tasks.py为:

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time


@shared_task
def add(x, y):
    # 为了测试是否是异步,特意休眠5s,观察是否会卡主主进程
    time.sleep(5)
    print(x+y)
    return x + y


@shared_task
def mul(x, y):
    return x * y


@shared_task
def xsum(numbers):
    return sum(numbers)

@shared_task修饰器可以让你创建task不需要app实体

在需要的地方调用相关任务即可,例如在myapp1/views.py中调用

from django.shortcuts import render
from .tasks import add


def index(request):
    # 测试celery任务
    add.delay(4,5)
    return render(request,'index.html')

然后就可以启动项目,celery需要单独启动,所以需要开两个终端,分别

启动web应用服务器

$ python manage.py runserver

启动celery

$ celery -A proj worker -l info

然后访问浏览器就可以在启动celery的终端中看到输出


测试结果

扩展

  • 如果你的项目需要在admin中管理调度,请使用django-celery-beat
  1. 使用pip安装django-celery-beat
$ pip install django-celery-beat

不要在使用django-celery,这个项目已经停止更新好好多年。。。。

  1. 在settings.py中添加这个app
INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)
  1. 同步一下数据库
$ python manage.py migrate
  1. 设置celery beat服务使用django_celery_beat.schedulers:DatabaseScheduler scheduler
$ celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

然后在就可以admin界面看到了。

  • 如果你想使用Django-ORM或者Django Cache作为后端,需要安装django-celery-results扩展(笔者不建议)
  1. 使用pip安装django-celery-results
$ pip install django-celery-results

不要在使用django-celery,这个项目已经停止更新好好多年。。。。

  1. 在settings.py中添加这个app
INSTALLED_APPS = (
    ...,
    'django_celery_results',
)
  1. 同步一下数据库
$ python manage.py migrate django_celery_results
  1. 配置后端,在settings.py中配置
# 使用数据库作为结果后端
CELERY_RESULT_BACKEND = 'django-db'

# 使用缓存作为结果后端
CELERY_RESULT_BACKEND = 'django-cache'

基本使用大概就是上述这些,其他具体配置和使用还需自己研读官方文档

注:

  • 上述环境在ubuntu16.04 lts django1.9中搭建测试成功
  • 上述文字皆为个人看法,如有错误或建议请及时联系我
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容