django+celery异步任务

什么是celery

Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。即插即用,易于使用和维护,单个进程每分钟可以处理数百万个任务。

为什么要用celery

  1. Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间以及用户体验。
    当用户在网站注册时,请求可以立即返回而不用等待发送注册激活邮件之后返回,网站可以将发送邮件这样的耗时不影响主要流程的操作放到消息队列中,Celery 就提供了这样的便捷。
  2. 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
  3. 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。

celery组成架构

image.png

Celery的架构,它采用典型的生产者-消费者模式,主要由三部分组成:broker(即中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列))、workers( Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行)、backend(结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果),task(想要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。)
实际应用中,用户从Web前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。

Django中使用celery异步任务

安装celery

pip install celery

celery配置

在项目根目录下建立包celery_tasks,目录如下:

image.png

创建配置文件

celery_config.py配置消息中间件和结果存储用redis

# Broker配置,使用Redis作为消息中间件,存放broker消息队列
broker_url = "redis://127.0.0.1:6379/4"

# BACKEND配置,这里使用redis,存放执行结果
result_backend = "redis://127.0.0.1:6379/5"

celery_main.py

import os

from celery import Celery

# 设置django环境,celery 运行时需要读取django中的信息
# if not os.getenv('DJANGO_SETTINGS_MODULE'):
#     os.environ['DJANGO_SETTINGS_MODULE'] = 'QmpythonBlog.settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'QmpythonBlog.settings')

# 创建一个Celery类的实例对象
app = Celery('QmpythonBlog')

#  从单独的配置模块中加载配置,导入celery配置
app.config_from_object('celery_tasks.celery_config')

# 自动搜索任务
app.autodiscover_tasks(['celery_tasks.email', ])

创建任务函数

tasks.py中创建需要执行的异步任务

import logging
import random

from django.conf import settings
from django.core.mail import send_mail

from celery_tasks.celery_main import app

from django.core.mail import send_mail
from django_redis import get_redis_connection
from django.conf import settings

from verification import contants

# 导入日志器
logger = logging.getLogger('qmpython')


def random_str(random_length=8):
    active_code = ''
    chars = 'AaBbCcDdEeFfGgHhIiJjKkLlNnMmOoPpQqRrSsTtUuVvWwXxYyZz012346789'
    length = len(chars) - 1

    # 随机生成激活码
    for i in range(0, random_length):  # 需要生成random_length位随机数,循环对应多少次,每次从chars中读取一个字符,拼接成多少位
        active_code += chars[random.randint(0, length)]  # range:左闭右开;randint:左右皆闭;[]:左闭右开

    return active_code



# 创建任务函数
@app.task
def task_send_email(email, send_type='register'):
    import time
    time.sleep(10)  # 这里休眠10s中,看是否会前面发送的时候被阻塞

    if send_type == 'register':
        code_len = 6
    elif send_type == 'resetpwd':
        code_len = 6
    else:
        code_len = 4

    code = random_str(code_len)
    subject = ''  # 主题
    text_message = ''  # 正文
    html_message = ''  # html格式正文

    if send_type == 'register':
        # print('register')
        subject = u'全民python-注册验证码'
        text_message = u'【全民python】您的注册验证码:{0},请勿将此验证码告知他人,若非本人操作,请联系或者忽略,3分钟内有效!'.format(code)  # text格式
        html_message = u'【全民python】您的注册验证码:{0},请勿将此验证码告知他人,若非本人操作,请联系或者忽略,3分钟内有效!'.format(code)  # html格式方便点击链接

    elif send_type == 'resetpwd':
        subject = u'全民python-找回密码验证码'
        text_message = u'【全民python】找回登录密码的验证码:{0},请勿将此验证码告知他人,若非本人操作,请联系或者忽略,3分钟内有效!'.format(code)
        html_message = u'<p>【全民python】找回登录密码的验证码:{0},请勿将此验证码告知他人,若非本人操作,请联系或者忽略,3分钟内有效!</p>'.format(code)
    else:
        subject = ''
        text_message = ''
        html_message = ''

    conn_redis = get_redis_connection(alias='verify_code')
    pl = conn_redis.pipeline()

    email_flag_key = "email_flag_key_{}".format(email).encode('utf-8')

    email_code_key = "email_code_key{}".format(email).encode('utf-8')

    # 在此处设置为True会出现bug
    try:
        pl.setex(email_flag_key, contants.SEND_CODE_INTERVAL, 1)  # 是否已发送标志
        pl.setex(email_code_key, contants.EMAIL_CODE_REDIS_EXPIRES, code.lower())  # 保存验证码
        # 让管道通知redis执行命令
        pl.execute()
    except Exception as e:
        logger.debug("redis 执行出现异常:{}".format(e))
        return None

    logger.info("email code:{}".format(code))

    try:
        send_status = send_mail(subject, text_message, settings.EMAIL_FROM, [email],
                                html_message)  # 如果提供了html_message,可以发送带HTML代码的邮件。
    except Exception as e:
        logger.debug("发送邮箱验证码[异常][mobile: %s, message: %s]" % (email, e))

        #return None  # 如果为0或者抛出异常,返回None

    else:  # 如果try里面的语句可以正常执行,那么就执行else里面的语句(相当于程序没有碰到致命性错误)
        if send_status:  # 成功返回1,不成功返回0或者报错
            logger.info("发送验证码邮件[正常][email: %s email_code: %s]" % (email, code))
            #return send_status
        else:
            logger.warning("发送验证码邮件[失败][email: %s]" % email)

            #return None

调用异步任务

加载异步任务,发送邮箱验证码,使用delay()函数进行异步任务调用,delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready() 为 true,然后用 result.get() 取结果即可。


def emailCode(request):
    json_data = request.body
    if not json_data:
        return json_status.params_error(message="参数错误")

    # 将json转化为dict
    dict_data = json.loads(json_data.decode('utf-8'))

    username = dict_data.get('username')
    email = dict_data.get('email')

    # 如果2个都存在
    if Account.objects.filter(username=username, email=email).exists():
        return json_status.params_error(message=u'用户名和邮箱都已使用,请重新输入!')

    # 如果username存在
    if Account.objects.filter(username=username).exists():
        return json_status.params_error(message=u'该用户名已使用,请重新输入!')

    # 如果is_email存在
    if Account.objects.filter(email=email).exists():
        return json_status.params_error(message=u'该邮箱已使用,请重新输入!')

    # 检查是否在60s内有发送记录
    conn_redis = get_redis_connection(alias='verify_code')

    email_flag_key = 'email_flag_key_{}'.format(email).encode('utf-8')

    email_flag = conn_redis.get(email_flag_key)
    if email_flag:
        return json_status.params_error(message='获取邮箱验证码过于频繁!')

    # 发送验证码给邮箱
    #  1. 同步发送邮件
    # result = send_email.send_email_code(email, 'register')
    # if result:
    #     return json_status.result(message='验证码已发送您邮箱,3分钟内有效,请注意查收!')
    # else:
    #     return json_status.params_error(message='验证码发送失败,请重新发送!')
    #  2. celery异步发送邮件,调用delay
    task_send_email.delay(email, 'register')
    return json_status.result(message='验证码已发送成功,请注意查收!')

开启celery异步任务

进入虚拟环境,切换到项目根目录下,执行命令启动worker。

celery -A celery_tasks.celery_main worker -l info

启动 celery 服务,通过它来监听是否有任务要处理。这个命令启动了一个worker,用来执行程序中add这个加法任务(task)
-A 选项指定 celery 实例 app 的位置,本例中celery_tasks.celery_main中自动寻找,当然可以直接指定 celery worker -A task.app -l info
-l 选项指定日志级别, -l 是 --loglevel 的缩略形式
其他更多选项通过 celery worker –-help 查看

image.png

启动项目

image.png

image.png

点击发送验证码,没有因异步任务函数中task_send_emailtime.sleep(10)中而等待10s,而是马上进入倒计时。
执行完会看到redis数据库4和5中有结果

image.png

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容