1.版本信息
celery==4.2.1
Django==1.11.11
django-celery-beat==1.4.0
django-celery-results==1.0.1
官方文档:celery定期任务;django_celery_beat 1.4.0;
2.安装与配置
1.使用pip安装包:
$ pip install django-celery-beat
2.将django_celery_beat模块添加到INSTALLED_APPSDjango项目中settings.py:
#jdango时区配置
TIME_ZONE = 'Asia/Shanghai'
# 如果USE_TZ设置为True时,Django会使用系统默认设置的时区,此时的TIME_ZONE不管有没有设置都不起作用
# 如果USE_TZ 设置为False,TIME_ZONE = 'Asia/Shanghai', 则使用上海的UTC时间。
USE_TZ = False
INSTALLED_APPS = (
...,
'django_celery_beat',
)
# celery beat配置
# CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = TIME_ZONE
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
3.应用Django数据库迁移,以便创建必要的表:
$ python manage.py migrate
迁移之后生产四个表:
django_celery_beat.models.PeriodicTask 此模型定义要运行的单个周期性任务。
django_celery_beat.models.IntervalSchedule 以特定间隔(例如,每5秒)运行的计划。
django_celery_beat.models.CrontabSchedule
与像在cron项领域的时间表 分钟小时日的一周 DAY_OF_MONTH month_of_year
django_celery_beat.models.PeriodicTasks 此模型仅用作索引以跟踪计划何时更改
4.proj/proj/celery.py 创建celery.py文件:
# _*_ coding:utf-8 _*_
"""
异步
创建celery实例
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import os
from celery import Celery
from django.utils import timezone
from kombu import Exchange
from kombu import Queue
import datetime
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery("proj")
# app.now=datetime.datetime.utcnow
# print app.now(), 'celery---------------->>>>>>'
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
# 在django配置文件中进行配置,以大写CELERY开头
app.config_from_object('django.conf:settings', namespace='CELERY')
# 解决时区问题,定时任务启动就循环输出
app.now = timezone.now
# app.now = datetime.datetime.now
# Load task modules from all registered Django app configs.
# celery自动发现所有django-app下面的任务tasks.py
app.autodiscover_tasks()
# 通过设置x-max-priority参数来配置队列以支持优先级
# app.conf.task_queues = [
# Queue('tasks', Exchange('tasks'), routing_key='tasks',
# queue_arguments={'x-max-priority': 10}),
# ]
# 设置所有队列的默认值
# app.conf.task_queue_max_priority = 10
# 设置了三个Queue绑定到一个direct类型的exchange上,然后consumer监听所有的队列,消息来了后就轮询调用consumer进行处理.
task_exchange = Exchange('tasks', type='direct')
# 异步任务优先级
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
Queue('midpri', task_exchange, routing_key='midpri'),
Queue('lopri', task_exchange, routing_key='lopri')]
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
备注:
1.django-celery-beat 依赖celery异步,所以先要配置好异步任务,才能使用定时任务;
2.先版本一直有一个时区问题bug(只能使用utc时区,自定义时区未生效),运行定期任务可能会出现无限循环任务,添加 app.now = timezone.now 这个暂时解决无限循环问题,但是cron周期任务还是有问题
3.运行
首先启动异步任务,然后dug模式运行定期服务:
$ celery -A proj beat -l info
下面是配置上日志命令:
celery -A proj beat -l info -f /home/bl/work/proj/log/celery_beat.log
4.添加任务
使用django-admin后台直接在表里添加定时任务或者在代码中在表PeriodicTask 中直接填任务即可,djang-celery-beat 会自动检测执行任务。
例如:
>>> from django_celery_beat.models import PeriodicTask, IntervalSchedule
# 添加定时间隔周期
#IntervalSchedule.DAYS
#IntervalSchedule.HOURS
#IntervalSchedule.MINUTES
#IntervalSchedule.SECONDS
#IntervalSchedule.MICROSECONDS
>>> schedule, created = IntervalSchedule.objects.get_or_create(
... every=10,
... period=IntervalSchedule.SECONDS,
... )
#添加定时任务
>>> PeriodicTask.objects.create(
... interval=schedule, # we created this above.
... name='Importing contacts', # simply describes this periodic task.
... task='proj.tasks.add', # name of task.
... )
#下面是添加带参数的定时任务
>>> import json
>>> from datetime import datetime, timedelta
>>> PeriodicTask.objects.create(
... interval=schedule, # we created this above.
... name='Importing contacts', # simply describes this periodic task.
... task='proj.tasks.add', # name of task.
... args=json.dumps(['arg1', 'arg2']),
... kwargs=json.dumps({
... 'be_careful': True,
... }),
... expires=datetime.utcnow() + timedelta(seconds=30)
... )