在django中使用celery和RabbitMQ

为什么要使用celery?

Web应用程序适用于请求和响应周期。当用户访问您的应用程序的某个URL时,Web浏览器向您的服务器发送一个请求。Django收到这个请求,并用它做一些事情。通常它涉及在数据库中执行查询,处理数据。当Django做他的事情并处理请求时,用户必须等待。当Django完成处理请求的工作时,它会发送一个响应给最终会看到某些东西的用户。

理想情况下,这个请求和响应周期应该很快,否则我们会让用户等待太久。更糟的是,我们的Web服务器一次只能服务一定数量的用户。所以,如果这个过程很慢,它可以限制您的应用程序一次可以提供的页面数量。

大多数情况下,我们可以使用缓存,优化数据库查询等来解决此问题。但是有些情况下,没有其他的选择:需要做大量的工作。一个报告页面,大量的数据输出,视频/图像处理是你可能想要使用celery的几个例子。

我们在不是在整个项目中使用celery,而只是用于耗费时间的特定任务。这里的想法是尽可能快地响应用户,并将耗时的任务传递给队列,以便在后台执行,并始终保持服务器准备好响应新的请求。

安装

安装celery最简单的方法是使用点:

pip install Celery

现在我们必须安装RabbitMQ。

在Ubuntu 16.04上安装RabbitMQ

要将其安装在较新的Ubuntu版本上非常简单:

apt-get install -y erlangapt-get install rabbitmq-server

然后启用并启动RabbitMQ服务:

systemctlenable rabbitmq-serversystemctl start rabbitmq-server

检查状态以确保一切运行平稳:

systemctl status rabbitmq-server

在Mac上安装RabbitMQ

    brew install rabbitmq

RabbitMQ脚本安装到/usr/local/sbin。你可以把它添加到您的.bash_profile或.profile。

    vim ~/.bash_profile

然后将其添加到文件的底部:

    export PATH=$PATH:/usr/local/sbin

重新启动终端,确保更改已生效。

现在您可以使用以下命令启动RabbitMQ服务器:

    rabbitmq-server

celery基本设置

首先,考虑名为core的应用程序名为mysite的以下Django项目:

将CELERY_BROKER_URL配置添加到settings.py文件中:

settings.py

   CELERY_BROKER_URL='amqp://localhost'

除了settings.pyurls.py文件之外,我们还要创建一个名为celery.py的新文件。

celery.py

import  os

 from  celery import    Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE','mysite.settings')

app=Celery('mysite')app.config_from_object('django.conf:settings',namespace='CELERY')

app.autodiscover_tasks()

现在编辑项目根目录下的__init__.py文件:

__init__.py

from.celeryimportappascelery_app__all__=['celery_app']

这将确保每次Django启动,我们的celery应用程序是重要的。

创建我们的第一个芹菜任务。

我们可以在Django应用程序中创建一个名为tasks.py的文件,并将所有的Celery任务放到这个文件中。我们在项目根目录中创建的Celery应用程序将收集在INSTALLED_APPS 配置中列出的所有Django应用程序中定义的所有任务。

为了测试目的,我们创建一个Celery任务,生成一些随机的用户帐户。

app/ tasks.py

from __future__import absolute_import, unicode_literals

import string

from celeryimport shared_task

from django.contrib.auth.modelsimport User

from django.utils.cryptoimport get_random_string

@shared_task

def create_random_user_accounts(total):

    for i in range(total):

        username = 'user_{}'.format(get_random_string(10, string.ascii_letters))

        email = '{}@example.com'.format(username)

        password = get_random_string(50)

        User.objects.create_user(username=username, email=email, password=password)

    return '{} random users created with success!'.format(total)

这里的重要部分是:

from celery import shared_task

@shared_task

def name_of_your_function(optional_param):

   pass# do something heavy

然后我定义了一个表单和一个视图来处理我的Celery任务:

forms.py

from djangoimport forms

from django.core.validatorsimport MinValueValidator, MaxValueValidator

class GenerateRandomUserForm(forms.Form):

    total = forms.IntegerField(

        validators=[

        MinValueValidator(50),

        MaxValueValidator(500)

        ]

)

这个是需要一个50到500之间的正整数字段。如图:

然后在views中:

from django.contrib.auth.modelsimport User

from django.contribimport messages

from django.views.generic.editimport FormView

from django.shortcutsimport redirect

from .formsimport GenerateRandomUserForm

from .tasksimport create_random_user_accounts

class GenerateRandomUserView(FormView):

template_name ='core/generate_random_users.html'

    form_class = GenerateRandomUserForm

def form_valid(self, form):

    total = form.cleaned_data.get('total')

    create_random_user_accounts.delay(total)  # 重点 就是便是这个任务在cerely后台执行。然后Django继续处理我的视图,GenerateRandomUserView并顺利返回给用户。

    messages.success(self.request,'We are generating your random users! Wait a moment and     refresh this page.')

    return redirect('users_list')

启动工作进程

打开一个新的终端选项卡,然后运行以下命令:

celery -A mysite worker -l info

mysite更改为您的项目名称。结果是这样的:

现在我们可以测试它。我提交了500个表单,创建了500个随机用户。

同时,检查celery worker过程:

[2017-08-20 19:11:17,485: INFO/MainProcess] Received task:mysite.core.tasks.create_random_user_accounts[8799cfbd-deae-41aa-afac-95ed4cc859b0]

然后几秒钟后,如果我们刷新页面,用户在那里:

如果我们再次查看芹菜工作进程,我们可以看到它完成了执行:

[2017-08-20 19:11:45,721: INFO/ForkPoolWorker-2] Taskmysite.core.tasks.create_random_user_accounts[8799cfbd-deae-41aa-afac-95ed4cc859b0] succeededin28.225658523035236s:'500 random users created with success!'

用Supervisord管理生产中的工人流程

如果您将应用程序部署到DigitalOcean等 VPS ,则需要在后台运行辅助进程。在我的教程中,我喜欢使用Supervisord来管理Gunicorn的工作人员,所以它通常与Celery很合适。

首先安装它(在Ubuntu上):

sudo apt-get install supervisor

然后创建一个文件名为mysite的-celery.conf的文件夹中:/etc/supervisor/conf.d/mysite-celery.conf

[program:mysite-celery]

command=/home/mysite/bin/celery worker -A mysite --loglevel=INFO

directory=/home/mysite/mysite

user=nobody

numprocs=1

stdout_logfile=/home/mysite/logs/celery.log

stderr_logfile=/home/mysite/logs/celery.log

autostart=true

autorestart=true

startsecs=10

; Need to waitfor currently executing tasks to finish at shutdown.

; Increase thisif you have verylong running tasks.

stopwaitsecs =600

stopasgroup=true

; Set Celery priority higher than default (999)

; so,if rabbitmqis supervised, it will start first.

priority=1000

在下面的例子中,Django项目是在虚拟环境中。路径为/ home / mysite /

现在重新读取配置并添加新的过程:

sudo supervisorctl reread

sudo supervisorctl update

如果您不熟悉将Django部署到生产服务器并使用Supervisord,可以参考:如何将Django应用程序部署到supervisor。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容