celery 定时任务和信号量

Celery简介

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理

应用

异步调用:那些用户不关心的但是又存在在我们API里面的操作 我们就可以用异步调用的方式来优化(发送邮件 或者上传头像)

定时任务:

    定期去统计日志,数据备份,或者其他的统计任务

Celery的相关概念

task

需要执行的任务

worker

负责干活儿的小弟

broker

任务队列(worker拿任务的地方)

backend

干完活儿 结果存放的位置

Celery基本工作流程

Celery的安装与使用

安装

sudo pip install celery
sudo pip install celery-with-redis
sudo pip install django-celery
sudo apt install redis-server

配置

settings.py文件

ALLOWED_HOSTS = ['*']
INSTALLED_APPS = (
      ...
      'djcelery',
‘自己的APP’
    }
import djcelery
djcelery.setup_loader()
BROKER_URL='redis://localhost:6379/1'
CELERY_CONCURRENCY=2(设置worker的并发数量)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'

settings.py的同级目录下新建celery.py

from __future__ import absolute_import #绝对路径导入
from celery import Celery
from django.conf import settings
import os

#设置系统的环境配置用的是Django的
os.environ.setdefault("DJANGO_SETTING_MODULE", "工程名字.settings")

#实例化celery
app = Celery('mycelery')

app.conf.timezone = "Asia/Shanghai"

#指定celery的配置来源 用的是项目的配置文件settings.py
app.config_from_object("django.conf:settings")

#让celery 自动去发现我们的任务(task)
app.autodiscover_tasks(lambda : settings.INSTALLED_APPS) #你需要在app目录下 新建一个叫tasks.py(一定不要写错)文件

settings.py同级目录下的init.py加入

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

使用

1、在需要使用异步任务的APP目录下新建tasks.py

from celery import task
import time

@task
def hello_celery(loop):
    for i in range(loop):
        print 'hello'
        time.sleep(2)

2、views.py内的调用

任务函数名.delay(参数,,,,)

3、python manage.py migrate 建表(不要忘记建表)

4、启动worker

python manage.py celery worker --loglevel=info (或者celery -A 你的工程名 worker -l info)

注意:修改tasks.py的内容后 要重启celery的服务(命令:python manage.py celery worker --loglevel=info)

定时任务

在settings.py文件添加
CELERYBEAT_SCHEDULE = {
    'schedule-test': {
        'task': 'app的名字.tasks.hello_celery',
        'schedule': timedelta(seconds=3),
        'args': (2,)
    },

}

启动: celery -A 你的工程名称 beat -l info(或者python manage.py celery beat --loglevel=info)

计划任务时间

from celery.schedules import crontab
crontab(minute=u'00', hour=u'11',day_of_week='mon,tue,wed,thu,sun')

示例如下:
 'every-week-three-and-four-run-get_data_with_param':{
        'task': 'APP的名字.tasks.get_data_with_param',
        'schedule': crontab(day_of_week="3, 4"),
        'args':(4, )
    }

坑:

我们启动定时任务服务时 也要先开启worker(python manage.py celery worker --loglevel=info)

如果只开启定时服务 没有开启worker服务 那么定时任务会被放入任务队列,但是由于没有干活儿的worker 那么任务是不会被执行,当worker服务被启动后 会立刻去任务队列领任务并执行

注意:你的任务一定要确保是可以正常执行的

信号量

概念

Django 提供一个“信号分发器”,允许解耦的应用在框架的其它地方发生操作时会被通知到。
也就是说在特定事件发生时,可以发送一个信号去通知注册了这个信号的一个或者多个回调,在回调里进行逻辑处理。

如何监听信号

拥有一个Signal实例
信号回调(函数)
将信号回调绑定到Signal实例
在特定事件中Signal发送信号

Django内置信号

django.db.models.signals.pre_save & post_save�在模型 save()方法调用之前或之后发送。
django.db.models.signals.pre_init                        # Django中的model对象执行其构造方法前,自动触发
django.db.models.signals.post_init                       # Django中的model对象执行其构造方法后,自动触发
    django.db.models.signals.pre_delete & post_delete�在模型delete()方法或查询集的delete() 方法调用之前或之后发送。

    django.core.signals.request_started & request_finished�Django建立或关闭HTTP 请求时发送。

使用Django内置信号

例子,创建数据库记录,触发pre_save和post_save信号



项目的__init__.py文件中代码:

from django.db.models.signals import pre_save,post_save

def pre_save_func(sender,**kwargs):

    print("pre_save_func")
    print("pre_save_msg:",sender,kwargs)

def post_save_func(sender,**kwargs):
    print("post_save_func")
    print("post_save_msg:",sender,kwargs)

pre_save.connect(pre_save_func)             # models对象保存前触发callback函数
post_save.connect(post_save_func)           # models对象保存后触发函数

比较打印的结果,可以看到models对象保存后,在打印信息里包含一个"create=True"的键值对.

也可以使用装饰器来触发信号,把上面__init__.py中的代码修改:

from django.core.signals import request_finished
from django.dispatch import receiver

@receiver(request_finished)
def callback(sender, **kwargs):
    print("Request finished!")

自定义信号

1.定义信号

在项目根目录下创建一个singal_test.py的文件,内容为

from django.dispatch import Signal

action=Signal(providing_args=["函数参数名1","函数参数名2", .......])
2.注册信号

项目应用下面的__init__.py文件内容:

from singal_test import action

def pre_save_func(sender,**kwargs):

    print("pre_save_func")
    print("pre_save_msg:",sender,kwargs)
    
action.connect(pre_save_func)
3.触发信号

views视图函数内容:

from singal_test import action

action.send(sender="python",函数参数名1="111",函数参数名2="222")
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容