django定时器_djcelery+mq的使用

环境

python 3.6
django 2.1.8

下载安装

celery==3.1.15
django-celery==3.3.1
flower==0.9.3

代码步骤

-1、配置 settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    
    'djcelery', # 注册
    ...
]


import djcelery  # django的celery,省去了在celery中配置django环境,并且还能在django后台管理任务

## 下面是djcelery配置

# 当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的
# 所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task。
djcelery.setup_loader()

CELERY_ENABLE_UTC = True
# CELERY_ENABLE_UTC = False

# CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TIMEZONE = TIME_ZONE

BROKER_URL = 'amqp://guest@localhost//'

CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' # 任务元数据保存到数据库中

CELERY_ACCEPT_CONTENT = ['application/json']

CELERY_TASK_SERIALIZER = 'json'

CELERY_RESULT_SERIALIZER = 'json'

# CELERY_TASK_RESULT_EXPIRES = 86400  # celery任务执行结果的超时时间, 此配置注释后,任务结果不会定时清理

CELERYD_CONCURRENCY = 1 if DEBUG else 10 # celery worker的并发数

CELERYD_MAX_TASKS_PER_CHILD = 100  # 每个worker执行了多少任务就会销毁
  • 2、在settings.py同级目录下创建celery.py
from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'TestPaltForm.settings') # 项目的settings文件

app = Celery('TestPaltForm') # 项目名为入参

app.config_from_object('django.conf:settings') # 读取settings中的celery配置

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
  • 3、创建app:timedtask

    timedtask目录结构

  • 3.1、timedtask/models.py

from djcelery.models import PeriodicTask
from django.db import models


class TaskExtend(models.Model):
    """
    拓展PeriodicTask模型
    """
    create_time = models.DateTimeField('创建时间', auto_now_add=True)
    update_time = models.DateTimeField(auto_now=True, verbose_name='更新时间', help_text='更新时间')
    email_list = models.CharField('邮箱列表', max_length=2048, default='[]')
    author = models.CharField('创建人', max_length=100, default='')
    project = models.IntegerField('任务所选项目', default=0)
    periodic_task = models.OneToOneField(PeriodicTask, on_delete=models.CASCADE, related_name='taskextend')
  • 3.2、自定义过滤器timedtask/filtersets.py
"""
Author: LZL
"""
from django_filters import rest_framework as filters

from djcelery.models import PeriodicTask


class PeriodicTaskFilter(filters.FilterSet):
    name = filters.CharFilter(field_name="name", lookup_expr='contains')
    description = filters.CharFilter(field_name="description", lookup_expr='contains')
    author = filters.CharFilter(field_name="taskextend__author", lookup_expr='contains')
    create_time = filters.DateFromToRangeFilter(field_name='taskextend__create_time')
    update_time = filters.DateFromToRangeFilter(field_name='date_changed')

    class Meta:
        model = PeriodicTask
        fields = '__all__'
  • 3.3、序列化器timedtask/serializers.py
"""
Author: LZL
"""
from rest_framework import serializers
from djcelery.models import PeriodicTask

from apps.timedtask.models import TaskExtend


class TaskExtendSerializer(serializers.ModelSerializer):
    """
    用例信息序列化
    """

    class Meta:
        model = TaskExtend
        fields = '__all__'


class PeriodicTaskSerializer(serializers.ModelSerializer):
    """
    用例信息序列化
    """
    # 需要对taskextend进行序列化校验
    task_extend = TaskExtendSerializer(source='taskextend', read_only=True)
    # crontab_time = CrontabScheduleSerializer(source='crontab', read_only=True)
    # 对crontab的str返回的时间进行序列化校验:
    # return '{0} {1} {2} {3} {4} (m/h/d/dM/MY)'.format(
    #     cronexp(self.minute),
    #     cronexp(self.hour),
    #     cronexp(self.day_of_week),
    #     cronexp(self.day_of_month),
    #     cronexp(self.month_of_year),
    # )
    crontab_time = serializers.ReadOnlyField(source='crontab.__str__')

    class Meta:
        model = PeriodicTask
        fields = '__all__'
  • 3.4、任务类timedtask/tasks.py
"""
Author: LZL
"""
from __future__ import absolute_import

import os
from datetime import datetime

from celery import shared_task
from djcelery.models import PeriodicTask

from TestPaltForm import settings
from ..testcases.models import TestCases
from ..testcases.serializers import TestcaseEnvSerializer
from ..envs.models import Envs
from utils import common


def run_task_by_cases(func, periodic_args=None):
    """
        执行以testcase为维度的定时任务
    :param func: 定时任务时,传入调用的定时任务函数
    :return:
    """
   # 业务代码略过


@shared_task
def periodic_run(task_args):
    """
        task_args = {
        "case_list": [18, 19],
        "env": 4,
        "project": 10,
        "periodic": 10,
        "name": "定时任务1",
        "description": "定时任务1描述",
        "receivers": ["admin@admin.com", "test@qq.com"]
    }
    :param task_args:
    :return:
    """
    return run_task_by_cases(periodic_run, task_args)

  • 3.5、timedtask/views.py
import json
import logging

from rest_framework.response import Response
from rest_framework import generics, status
from djcelery.models import PeriodicTask, CrontabSchedule, PeriodicTasks
from rest_framework.views import APIView

from .serializers import PeriodicTaskSerializer, TaskExtendSerializer
from .filtersets import PeriodicTaskFilter
from .models import TaskExtend
from .tasks import periodic_run


class PeriodicTaskView(generics.ListCreateAPIView):
    """
    提供查询,创建
    """
    queryset = PeriodicTask.objects.all().order_by('-date_changed')
    serializer_class = PeriodicTaskSerializer
    filterset_class = PeriodicTaskFilter

    def post(self, request, *args, **kwargs):
        """
        1、先创建或获取Crontab实例
        2、保存任务实例
        data 不要带日期类数据
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        data = json.loads(json.dumps(request.data))
        # 获取或创建crontab
        # crontab_time = {
        #     'minute': '1',
        #     'hour':'1',
        #     'day_of_week':'*',
        #     'day_of_month':'*',
        #     'month_of_year':'*'
        # }
        project = int(data.pop('project'))
        # 前端传递来的crontab时间dict
        crontab_time = data.get('crontab')
        if crontab_time:
            # 如果不带crontab的任务,则是手动运行的
            # 创建定时策略,并获取到实例对象
            crontab, _ = CrontabSchedule.objects.get_or_create(**crontab_time)
            data['crontab'] = crontab.id  # 获取定时策略的id
        data['task'] = 'apps.timedtask.tasks.periodic_run'  # 后期可改为动态获取
        # 保存任务实例
        email_list = data.pop('email_list')
        author = data.pop('author')
        # 序列化定时任务的参数

        serializer = PeriodicTaskSerializer(data=data)
        if serializer.is_valid():
            try:
                serializer.save()  # 保存定时任务
                obj = PeriodicTask.objects.get(pk=serializer.data['id'])
                PeriodicTasks.changed(obj)  # 必须执行此更新,触发celery beat刷新
                # 保存拓展信息
                TaskExtend.objects.create(**{
                    'email_list': email_list,
                    'author': author,
                    'periodic_task_id': serializer.data['id'],
                    'project': project
                })
                return Response(serializer.data)
            except Exception as e:
                return Response({'detail': serializer.errors}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
        else:
            return Response({'message': '创建失败,已有同名任务', 'detail': serializer.errors},
                            status=status.HTTP_400_BAD_REQUEST)


class PeriodicTaskDetailView(generics.RetrieveUpdateDestroyAPIView):
    """
    debugtalk信息单查询、修改、删除
    """
    queryset = PeriodicTask.objects.all().order_by('-date_changed')
    serializer_class = PeriodicTaskSerializer

    def put(self, request, *args, **kwargs):
        """
        针对编辑功能
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        data = json.loads(json.dumps(request.data))
        # 获取或创建crontab
        project = int(data.pop('project'))
        crontab_time = data.get('crontab')
        if crontab_time:
            # 如果不带crontab的任务,则是手动运行的
            crontab, _ = CrontabSchedule.objects.get_or_create(**crontab_time)
            data['crontab'] = crontab.id
        # 保存任务实例
        task_extend = data.pop('task_extend')
        task_extend['project'] = project  # 更新所属项目
        task_extend['email_list'] = data.pop('email_list')
        try:
            periodic_id = data['id']
            PeriodicTask.objects.filter(id=periodic_id).update(**data)
            obj = PeriodicTask.objects.filter(id=periodic_id).first()
            if obj:
                PeriodicTasks.changed(obj)  # 必须执行此更新,触发celery beat刷新
                # 保存拓展信息
                TaskExtend.objects.filter(pk=task_extend.get('id')).update(**task_extend)
            else:
                return Response({'message': '定时任务不存在'}, status=status.HTTP_400_BAD_REQUEST)
            return Response({'message': '任务修改成功'}, status=status.HTTP_200_OK)
        except Exception as es:
            return Response({'message': '修改失败,已有同名任务'}, status=status.HTTP_400_BAD_REQUEST)

    def patch(self, request, *args, **kwargs):
        """
        局部修改,只修改enabled
        :param request:
        :param args:
        :param kwargs:
        :return:
        """
        try:
            obj = PeriodicTask.objects.get(pk=kwargs.get('pk'))
            # enabled_data = json.loads(request.data.get('enabled'))
            obj.enabled = request.data.get('enabled')
            obj.save()
            PeriodicTasks.changed(obj)  # 必须执行此更新,触发celery beat刷新
            return Response({'message': '{}成功'.format('启用' if obj.enabled else '禁用')}, status=status.HTTP_200_OK)
            # return Response({'enabled': obj.enabled}, status=status.HTTP_200_OK)
        except Exception as es:
            return Response({'msg': '任务状态修改失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)


class RunPeriodicTaskView(APIView):
    """
    立刻执行一次定时周期任务的任务
    """

    def post(self, request, *args, **kwargs):
        # 任务记录的id
        task_id = kwargs.get('pk')
        try:
            # args列表,第一个元素为task_info
            args = eval(PeriodicTask.objects.get(pk=task_id).args)
            periodic_run.delay(*args)
            return Response({'message': '任务开始运行,请稍后查询结果...'})
        except PeriodicTask.DoesNotExist:
            resp = {
                'message': '所运行任务不存在,id:{}'.format(task_id)
            }
            return Response(resp)
  • 3.6、timedtask/urls.py
"""
Author: LZL
"""
from django.urls import path

from .views import PeriodicTaskView, PeriodicTaskDetailView, RunPeriodicTaskView

urlpatterns = [
    # 定时任务
    path('periodic/', PeriodicTaskView.as_view()),
    path('periodic/<int:pk>/', PeriodicTaskDetailView.as_view()),
    path('run_periodic/<int:pk>/', RunPeriodicTaskView.as_view()),  # 手动运行(定时)任务
]

生成的数据表

数据表

启动

启动worker:celery -A 项目名 worker -l info -P eventlet
启动beat :celery -A 项目名beat -l info
启动celery后台(需要查看才启动):celery flower
启动mq:自行百度
django后台也可以查看定时任务

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容