django-celery-beat使用
一、引入django-celery-beat包:
INSTALLED_APPS = [
...
'django_celery_beat',
...
]
二、定义celery app:
celery.py
文件:
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'controller.settings')
app = Celery('slb')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
三、引入celery app:
__init__.py
文件:
from .celery import app as celery_app
__all__ = ('celery_app',)
四、定义配置文件:
# celery相关配置
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/2'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
五、定义celery task:
from controller.celery import app
@app.task
def monitor_device_task(device_ip):
logger.info('monitor device task: %s' % device_ip)
六、定义周期性任务:
class MonitorDeviceTask(object):
"""
设备创建,增加周期性任务
"""
def __init__(self, device_obj):
self.device_obj = device_obj
self.periodic_task = PeriodicTask.objects.create(
interval=get_10_seconds_schedule(),
name='add device %s' % self.device_obj.ip,
task='device.tasks.monitor_device_task',
args=json.dumps([self.device_obj.ip])
)
def start(self):
"""
启动任务
"""
self.periodic_task.enabled = True
self.periodic_task.save()
def stop(self):
"""
停止任务
"""
self.periodic_task.enabled = False
self.periodic_task.save()
七、启动redis:
redis-server --protected-mode no
八、启动celery:
python3 -m celery -A controller worker -l info -B
九、添加周期性任务到数据库:
# 增加周期性任务
MonitorDeviceTask(kwargs.get('instance')).start()