Celery和Django的分布式自动化测试

为了实现快速高效使用计算集群解决大量测试用例管理和执行的问题,基于Celery和Django的分布式自动化测试,其由API服务器层、用例管理层、任务调度层和任务执行层组成四层架构,实现了定时调度测试、分布式执行、失败重试等功能。能够快速部署和配置测试执行节点,实现了充分利用计算集群资源、提高测试效率的目的。

自动化测试Celery工作原理:

Celery.png

Django Celery部署

  • 1. 安装celery

首先,我们必须拥有一个broker消息队列用于发送和接收消息。Celery官网给出了多个broker的备选方案:RabbitMQRedisDatabase以及其他的消息中间件。我这边使用的是Redis作为消息中间人。
django-celery-beat 定时任务
django-celery-results 存储Celery任务结果第三方插件,我这边是根据业务逻辑重新设计了数据结构

pip install celery==5.0.5
pip install redis==3.5.3
pip install django-celery-beat==2.2.0
pip install django-celery-results==2.0.1
  • 2. 注册APP
INSTALLED_APPS = [
    ....   
    'django_celery_beat',
    'django_celery_results',
]
  • 3. 配置settings.py
# 设置代理人broker
broker_url = f'redis://{HOST}:6379'
# 使用django orm 作为结果存储
result_backend = 'django-db'
# celery 的启动工作数量设置
worker_concurrency = 5
# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
worker_prefetch_multiplier = 5
# celery 的 worker 执行多少个任务后进行重启操作
worker_max_tasks_per_child = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
worker_disable_rate_limits = True
# 指定任务接受的序列化类型
accept_content = ['json']
# 指定任务序列化方式
task_serializer = 'json'
# 指定结果序列化的方式
result_serializer = 'json'
# celery beat配置(周期性任务设置)
timezone = 'Asia/Shanghai'
enable_utc = False
beat_sync_every = 1
# settings USE_TZ=False时添加该选项,否启动 django celery beat 的时候,出现这个错误TypeError: can't compare offset-naive and offset-aware datetimes
DJANGO_CELERY_BEAT_TZ_AWARE = False 
# 休眠最大秒数
beat_max_loop_interval = 300
beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
  • 4. 新增celery_tasks文件
"""目录结构"""
├── celery_tasks
│ ├── init.py
│ ├── celery.py
# celery.py
# 将Celery连接到应用程序
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# 为celery设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ServerDjango.settings')
app = Celery('celery_tasks')
# 加载配置
app.config_from_envvar('DJANGO_SETTINGS_MODULE')
# 设置app自动加载任务
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


# init.py
from celery_tasks.celery import app as celery_app
__all__ = ['celery_app']

celery.py中设定了对settings.pyINSTALLED_APPSautodiscover_tasksCelery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task需要注意的是,与一般的.py中实现celery不同,tasks.py必须建在各app的根目录下,且不能随意命名。

  • 例: tasks.py
from celery import shared_task

@shared_task
def tailf_log(channel_name, file_path):
    """跟踪日志"""
    channel_layer = get_channel_layer()
    try:
        with open(file_path, encoding='utf-8') as f:
            while True:
                line = f.readline()
                if line:
                    async_to_sync(channel_layer.send)(
                        channel_name,
                        {
                            "type": "send.message",
                            "message": str(line)
                        }
                    )
                else:
                    time.sleep(0.5)
    except Exception as e:
        f.close()
        print(e)

  • 5. 分别启动wokerbeat
celery -A celery_tasks worker -l info   # 启动woker
celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers.DatabaseScheduler #启动beat 调度器使用数据库
  • 依据现有业务逻辑增加了任务失败重试机制、任务返回后计算下次任务执行时间以及当前任务消耗时间功能。
import celery
from celery.schedules import crontab
from django_celery_beat.models import PeriodicTask
from ManageApps.my_tasks.models import UserTasks

class CeleryTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        task = UserTasks.objects.get(task_id=kwargs['task_id'])
        if kwargs["task_id"] and task.retry:
            # 失败重试,默认300s
            self.retry(exc=exc, countdown=300, max_retries=1)
        return super(CeleryTask, self).on_failure(exc, task_id, args, kwargs, einfo)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if kwargs["task_id"]:  # 关键字参数task_id, 判断是否为定时任务
            task = _next_run_time(kwargs['task_id'])
            elapsed_time = (datetime.datetime.now() - task.last_run_at).total_seconds()
            UserTaskResult.objects.create(**{"task_id": kwargs['task_id'], "result_id": task_id,
                                             "elapsed": round(elapsed_time, 2), "status": status})
        return super(CeleryTask, self).after_return(status, retval, task_id, args, kwargs, einfo)

def _next_run_time(task_id):
    """计算任务下次运行时间"""
    per_task = PeriodicTask.objects.get(id=task_id)
    my_task = UserTasks.objects.get(task_id=task_id)
    if per_task.crontab_id and my_task.start_time:
        # 周期任务
        cron_obj = CrontabSchedule.objects.get(id=per_task.crontab_id)
        cron = crontab(minute=cron_obj.minute, hour=cron_obj.hour, day_of_week=cron_obj.day_of_week,
                       day_of_month=cron_obj.day_of_month, month_of_year=cron_obj.month_of_year)
        now = cron.now()  # 当前运行时间
        result = cron.remaining_delta(last_run_at=now)
        ends_in = (result[0] + result[1]).replace(tzinfo=None)
        my_task.start_time = ends_in
    elif per_task.interval_id and my_task.start_time:
        # 间隔任务
        interval = IntervalSchedule.objects.get(id=per_task.interval_id)
        offset = datetime.timedelta(minutes=+0)
        if interval.period == 'minutes':
            offset = datetime.timedelta(minutes=+interval.every)
        elif interval.period == 'days':
            offset = datetime.timedelta(days=+interval.every)
        elif interval.period == 'hours':
            offset = datetime.timedelta(hours=+interval.every)
        elif interval.period == 'seconds':
            offset = datetime.timedelta(seconds=+interval.every)
        elif interval.period == 'microseconds':
            offset = datetime.timedelta(microseconds=+interval.every)
        my_task.start_time = datetime.datetime.now() + offset
    else:
        # 第一次运行写入当前时间
        my_task.start_time = datetime.datetime.now()
    my_task.save()
    return my_task
  • 参考django-celery-beat、django-celery-result二次设计任务模型
from django.db import models
from django_celery_beat.models import PeriodicTask
from django_celery_results.models import TaskResult, TASK_STATE_CHOICES

class UserTasks(models.Model):
    user = models.ForeignKey('user.User', on_delete=models.CASCADE, verbose_name='所属用户', help_text='所属用户',
                             null=True, blank=True)
    task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所属任务', help_text='所属任务',
                             null=True, blank=True)
    task_tags = models.CharField(max_length=255, null=True, blank=True, verbose_name='任务标签', help_text='任务标签')
    notice = models.SmallIntegerField(verbose_name='任务通知', help_text='任务通知')
    failfast = models.BooleanField(default=False, blank=True, verbose_name='错误停止测试机制', help_text='错误停止测试机制')
    retry = models.BooleanField(default=False, blank=True, verbose_name='重试机制', help_text='重试机制')
    task_type = models.BooleanField(default=False, blank=True, verbose_name='任务类型', help_text='任务类型')
    last_run_at = models.DateTimeField(blank=True, null=True, verbose_name='Last Run Datetime',
                                       help_text='计划上次触发任务运行的日期时间')
    start_time = models.DateTimeField(blank=True, null=True, verbose_name='Start Datetime',
                                      help_text='Datetime when the schedule should begin triggering the task to run',)

    class Meta:
        db_table = 'tb_user_tasks'
        verbose_name = '用户任务'
        verbose_name_plural = verbose_name


class UserTaskResult(models.Model):
    result_id = models.CharField(max_length=255, null=True, blank=True, verbose_name='Result ID', help_text='结果ID')
    task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所属任务', help_text='所属任务',
                             null=True, blank=True)
    create_time = models.BigIntegerField(verbose_name="创建时间", help_text="创建时间")
    elapsed = models.FloatField(verbose_name="耗时/s", help_text="耗时/s", null=True, blank=True, default=0.00)
    status = models.CharField(max_length=50, default='PENDING', choices=TASK_STATE_CHOICES,
                              verbose_name='任务状态',
                              help_text='Current state of the task being run')

    class Meta:
        db_table = 'tb_user_task_result'
        verbose_name = '用户任务结果'
        verbose_name_plural = verbose_name

前端页面

  • 任务管理
    image.png

    image.png
  • 任务统计
    image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容