什么是celery
Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。即插即用,易于使用和维护,单个进程每分钟可以处理数百万个任务。
为什么要用celery
- Web应用。当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间以及用户体验。
当用户在网站注册时,请求可以立即返回而不用等待发送注册激活邮件之后返回,网站可以将发送邮件这样的耗时不影响主要流程的操作放到消息队列中,Celery 就提供了这样的便捷。 - 定时任务。生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
- 同步完成的附加工作都可以异步完成。比如发送短信/邮件、推送消息、清理/设置缓存等。
celery组成架构
Celery的架构,它采用典型的生产者-消费者模式,主要由三部分组成:broker(即中间人,在这里就是指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/拿取产品的地方(队列))、workers( Celery 中的工作者,类似与生产/消费模型中的消费者,其从队列中取出任务并执行)、backend(结果储存的地方,队列中的任务运行完后的结果或者状态需要被任务发送者知道,那么就需要一个地方储存这些结果),task(想要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。)
实际应用中,用户从Web前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。
Django中使用celery异步任务
安装celery
pip install celery
celery配置
在项目根目录下建立包celery_tasks
,目录如下:
创建配置文件
celery_config.py
配置消息中间件和结果存储用redis
# Broker配置,使用Redis作为消息中间件,存放broker消息队列
broker_url = "redis://127.0.0.1:6379/4"
# BACKEND配置,这里使用redis,存放执行结果
result_backend = "redis://127.0.0.1:6379/5"
celery_main.py
import os
from celery import Celery
# 设置django环境,celery 运行时需要读取django中的信息
# if not os.getenv('DJANGO_SETTINGS_MODULE'):
# os.environ['DJANGO_SETTINGS_MODULE'] = 'QmpythonBlog.settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'QmpythonBlog.settings')
# 创建一个Celery类的实例对象
app = Celery('QmpythonBlog')
# 从单独的配置模块中加载配置,导入celery配置
app.config_from_object('celery_tasks.celery_config')
# 自动搜索任务
app.autodiscover_tasks(['celery_tasks.email', ])
创建任务函数
在tasks.py
中创建需要执行的异步任务
import logging
import random
from django.conf import settings
from django.core.mail import send_mail
from celery_tasks.celery_main import app
from django.core.mail import send_mail
from django_redis import get_redis_connection
from django.conf import settings
from verification import contants
# 导入日志器
logger = logging.getLogger('qmpython')
def random_str(random_length=8):
active_code = ''
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlNnMmOoPpQqRrSsTtUuVvWwXxYyZz012346789'
length = len(chars) - 1
# 随机生成激活码
for i in range(0, random_length): # 需要生成random_length位随机数,循环对应多少次,每次从chars中读取一个字符,拼接成多少位
active_code += chars[random.randint(0, length)] # range:左闭右开;randint:左右皆闭;[]:左闭右开
return active_code
# 创建任务函数
@app.task
def task_send_email(email, send_type='register'):
import time
time.sleep(10) # 这里休眠10s中,看是否会前面发送的时候被阻塞
if send_type == 'register':
code_len = 6
elif send_type == 'resetpwd':
code_len = 6
else:
code_len = 4
code = random_str(code_len)
subject = '' # 主题
text_message = '' # 正文
html_message = '' # html格式正文
if send_type == 'register':
# print('register')
subject = u'全民python-注册验证码'
text_message = u'【全民python】您的注册验证码:{0},请勿将此验证码告知他人,若非本人操作,请联系或者忽略,3分钟内有效!'.format(code) # text格式
html_message = u'【全民python】您的注册验证码:{0},请勿将此验证码告知他人,若非本人操作,请联系或者忽略,3分钟内有效!'.format(code) # html格式方便点击链接
elif send_type == 'resetpwd':
subject = u'全民python-找回密码验证码'
text_message = u'【全民python】找回登录密码的验证码:{0},请勿将此验证码告知他人,若非本人操作,请联系或者忽略,3分钟内有效!'.format(code)
html_message = u'<p>【全民python】找回登录密码的验证码:{0},请勿将此验证码告知他人,若非本人操作,请联系或者忽略,3分钟内有效!</p>'.format(code)
else:
subject = ''
text_message = ''
html_message = ''
conn_redis = get_redis_connection(alias='verify_code')
pl = conn_redis.pipeline()
email_flag_key = "email_flag_key_{}".format(email).encode('utf-8')
email_code_key = "email_code_key{}".format(email).encode('utf-8')
# 在此处设置为True会出现bug
try:
pl.setex(email_flag_key, contants.SEND_CODE_INTERVAL, 1) # 是否已发送标志
pl.setex(email_code_key, contants.EMAIL_CODE_REDIS_EXPIRES, code.lower()) # 保存验证码
# 让管道通知redis执行命令
pl.execute()
except Exception as e:
logger.debug("redis 执行出现异常:{}".format(e))
return None
logger.info("email code:{}".format(code))
try:
send_status = send_mail(subject, text_message, settings.EMAIL_FROM, [email],
html_message) # 如果提供了html_message,可以发送带HTML代码的邮件。
except Exception as e:
logger.debug("发送邮箱验证码[异常][mobile: %s, message: %s]" % (email, e))
#return None # 如果为0或者抛出异常,返回None
else: # 如果try里面的语句可以正常执行,那么就执行else里面的语句(相当于程序没有碰到致命性错误)
if send_status: # 成功返回1,不成功返回0或者报错
logger.info("发送验证码邮件[正常][email: %s email_code: %s]" % (email, code))
#return send_status
else:
logger.warning("发送验证码邮件[失败][email: %s]" % email)
#return None
调用异步任务
加载异步任务,发送邮箱验证码,使用delay()
函数进行异步任务调用,delay 返回的是一个 AsyncResult 对象,里面存的就是一个异步的结果,当任务完成时result.ready() 为 true,然后用 result.get() 取结果即可。
def emailCode(request):
json_data = request.body
if not json_data:
return json_status.params_error(message="参数错误")
# 将json转化为dict
dict_data = json.loads(json_data.decode('utf-8'))
username = dict_data.get('username')
email = dict_data.get('email')
# 如果2个都存在
if Account.objects.filter(username=username, email=email).exists():
return json_status.params_error(message=u'用户名和邮箱都已使用,请重新输入!')
# 如果username存在
if Account.objects.filter(username=username).exists():
return json_status.params_error(message=u'该用户名已使用,请重新输入!')
# 如果is_email存在
if Account.objects.filter(email=email).exists():
return json_status.params_error(message=u'该邮箱已使用,请重新输入!')
# 检查是否在60s内有发送记录
conn_redis = get_redis_connection(alias='verify_code')
email_flag_key = 'email_flag_key_{}'.format(email).encode('utf-8')
email_flag = conn_redis.get(email_flag_key)
if email_flag:
return json_status.params_error(message='获取邮箱验证码过于频繁!')
# 发送验证码给邮箱
# 1. 同步发送邮件
# result = send_email.send_email_code(email, 'register')
# if result:
# return json_status.result(message='验证码已发送您邮箱,3分钟内有效,请注意查收!')
# else:
# return json_status.params_error(message='验证码发送失败,请重新发送!')
# 2. celery异步发送邮件,调用delay
task_send_email.delay(email, 'register')
return json_status.result(message='验证码已发送成功,请注意查收!')
开启celery异步任务
进入虚拟环境,切换到项目根目录下,执行命令启动worker。
celery -A celery_tasks.celery_main worker -l info
启动 celery 服务,通过它来监听是否有任务要处理。这个命令启动了一个worker,用来执行程序中add这个加法任务(task)
-A 选项指定 celery 实例 app 的位置,本例中celery_tasks.celery_main中自动寻找,当然可以直接指定 celery worker -A task.app -l info
-l 选项指定日志级别, -l 是 --loglevel 的缩略形式
其他更多选项通过 celery worker –-help 查看
启动项目
点击发送验证码,没有因异步任务函数中task_send_email
中time.sleep(10)
中而等待10s,而是马上进入倒计时。
执行完会看到redis数据库4和5中有结果