测试平台系列(73) 设计测试计划功能

大家好~我是米洛

我正在从0到1打造一个开源的接口测试平台, 也在编写一套与之对应的完整教程,希望大家多多支持。

欢迎关注我的龚仲耗测试开发坑货,获取最新文章教程!

回顾

上一节我们简单介绍了下APScheduler,这一节我们来编写测试计划相关内容。

设计测试计划表

测试计划,其实也可以叫测试集合,它是我们一组用例的集合。并且有着对应的特征:

  • 定时执行

  • 执行完毕后通知方式

  • 通过率低于多少发送邮件/钉钉等通知

  • 优先级

  • 涵盖多少case

  • case失败重试间隔 等等

    这里可能比较的就是把测试计划+测试集合给耦合到一起了。

    基于上面的思路,我们就可以来设计测试计划表了, 具体字段的含义可以参照注释。

    (数据表的设计不一定完美,后续一般会根据业务需求扩展)

from sqlalchemy import Column, String, TEXT, UniqueConstraint, BOOLEAN, SMALLINT, INT
from sqlalchemy.dialects.mysql import TINYTEXT

from app.models.basic import PityBase


class PityTestPlan(PityBase):
    project_id = Column(INT, nullable=False)
    # 测试计划执行环境, 可以多选
    env = Column(String(64), nullable=False)
    # 测试计划名称
    name = Column(String(32), nullable=False)
    # 测试计划优先级
    priority = Column(String(3), nullable=False)
    # cron表达式
    cron = Column(String(12), nullable=False)
    # 用例列表
    case_list = Column(TEXT, nullable=False)
    # 并行/串行(是否顺序执行)
    ordered = Column(BOOLEAN, default=False)
    # 通过率低于这个数会自动发通知
    pass_rate = Column(SMALLINT, default=80)
    # 通知用户,目前只有邮箱,后续用户表可能要完善手机号字段,为了通知
    receiver = Column(TEXT)
    # 通知方式 0: 邮件 1: 钉钉 2: 企业微信 3: 飞书 支持多选
    msg_type = Column(TINYTEXT)
    # 单次case失败重试间隔,默认2分钟
    retry_minutes = Column(SMALLINT, default=2)
    # 测试计划是否正在执行中
    state = Column(SMALLINT, default=0, comment="0: 未开始 1: 运行中")

    __table_args__ = (
        UniqueConstraint('project_id', 'name', 'deleted_at'),
    )

    __tablename__ = "pity_test_plan"

    def __init__(self, project_id, env, case_list, name, priority, cron, ordered, pass_rate, receiver, msg_type,
                 retry_minutes, user, state=0, id=None):
        super().__init__(user, id)
        self.env = ",".join(map(str, env))
        self.case_list = ",".join(map(str, case_list))
        self.name = name
        self.project_id = project_id
        self.priority = priority
        self.ordered = ordered
        self.cron = cron
        self.pass_rate = pass_rate
        self.receiver = ",".join(map(str, receiver))
        self.msg_type = ",".join(map(str, msg_type))
        self.retry_minutes = retry_minutes
        self.state = state

这里值得注意的是,我们定义的FORM数据,环境列表env、接收人receiver、用例列表case_list都是数组,我们需要进行一波转换

编写CRUD方法

  • 抽离异步分页方法和where方法
DatabaseHelper

在DatabaseHelper类中添加pagination方法,接受page和size,session和sql参数,先读出sql匹配到的总数,如果为0则直接return,否则通过offset和limit获取到对应分页的数据。

where方法是用于改进我们平时的多条件查询,类似这种:

image
  • 编写新增测试计划方法
image

可以看到改造之后,我们只需要调用where方法,不需要写if name != "":这样的语句了。

因为我们不允许同一个项目里面出现同名的测试计划,所以条件是项目id+name不能重复。

  • 编写增改删方法
from sqlalchemy import select

from app.models import async_session, DatabaseHelper
from app.models.schema.test_plan import PityTestPlanForm
from app.models.test_plan import PityTestPlan
from app.utils.logger import Log


class PityTestPlanDao(object):
    log = Log("PityTestPlanDao")

    @staticmethod
    async def list_test_plan(page: int, size: int, project_id: int = None, name: str = ''):
        try:
            async with async_session() as session:
                conditions = [PityTestPlan.deleted_at == 0]
                DatabaseHelper.where(project_id, PityTestPlan.project_id == project_id, conditions) \
                    .where(name, PityTestPlan.name.like(f"%{name}%"), conditions)
                sql = select(PityTestPlan).where(conditions)
                result, total = await DatabaseHelper.pagination(page, size, session, sql)
                return result, total
        except Exception as e:
            PityTestPlanDao.log.error(f"获取测试计划失败: {str(e)}")
            raise Exception(f"获取测试计划失败: {str(e)}")

    @staticmethod
    async def insert_test_plan(plan: PityTestPlanForm, user: int):
        try:
            async with async_session() as session:
                async with session.begin():
                    query = await session.execute(select(PityTestPlan).where(PityTestPlan.project_id == plan.project_id,
                                                                             PityTestPlan.name == plan.name,
                                                                             PityTestPlan.deleted_at == 0))
                    if query.scalars().first() is not None:
                        raise Exception("测试计划已存在")
                    plan = PityTestPlan(**plan.dict(), user=user)
                    await session.add(plan)
        except Exception as e:
            PityTestPlanDao.log.error(f"新增测试计划失败: {str(e)}")
            raise Exception(f"添加失败: {str(e)}")

    @staticmethod
    async def update_test_plan(plan: PityTestPlanForm, user: int):
        try:
            async with async_session() as session:
                async with session.begin():
                    query = await session.execute(
                        select(PityTestPlan).where(PityTestPlan.id == plan.id, PityTestPlan.deleted_at == 0))
                    data = query.scalars().first()
                    if data is None:
                        raise Exception("测试计划不存在")
                    DatabaseHelper.update_model(data, plan, user)
        except Exception as e:
            PityTestPlanDao.log.error(f"编辑测试计划失败: {str(e)}")
            raise Exception(f"编辑失败: {str(e)}")

    @staticmethod
    async def delete_test_plan(id: int, user: int):
        try:
            async with async_session() as session:
                async with session.begin():
                    query = await session.execute(
                        select(PityTestPlan).where(PityTestPlan.id == plan.id, PityTestPlan.deleted_at == 0))
                    data = query.scalars().first()
                    if data is None:
                        raise Exception("测试计划不存在")
                    DatabaseHelper.delete_model(data, user)
        except Exception as e:
            PityTestPlanDao.log.error(f"删除测试计划失败: {str(e)}")
            raise Exception(f"删除失败: {str(e)}")
            
    @staticmethod
    async def query_test_plan(id: int) -> PityTestPlan:
        try:
            async with async_session() as session:
                sql = select(PityTestPlan).where(PityTestPlan.deleted_at == 0, PityTestPlan.id == id)
                data = await session.execute(sql)
                return data.scalars().first()
        except Exception as e:
            PityTestPlanDao.log.error(f"获取测试计划失败: {str(e)}")
            raise Exception(f"获取测试计划失败: {str(e)}")

基本思路差不多,老CRUD了!这里就不多说了,对sqlalchemy的async内容不了解的可以去官网看看demo。

这个query方法,是给定时任务查询测试计划数据使用。由于做了软删除,会导致经常忘记带上deleted_at==0的条件

编写相关接口(app/routers/testcase/testplan.py)

from fastapi import Depends

from app.dao.test_case.TestPlan import PityTestPlanDao
from app.handler.fatcory import PityResponse
from app.models.schema.test_plan import PityTestPlanForm
from app.routers import Permission
from app.routers.testcase.testcase import router
from config import Config


@router.get("/plan/list")
async def list_test_plan(page: int, size: int, project_id: int = None, name: str = "", user_info=Depends(Permission())):
    try:
        data, total = await PityTestPlanDao.list_test_plan(page, size, project_id, name)
        return PityResponse.success_with_size(PityResponse.model_to_list(data), total=total)
    except Exception as e:
        return PityResponse.failed(str(e))


@router.get("/plan/insert")
async def insert_test_plan(form: PityTestPlanForm, user_info=Depends(Permission(Config.MANAGER))):
    try:
        await PityTestPlanDao.insert_test_plan(form, user_info['id'])
        return PityResponse.success()
    except Exception as e:
        return PityResponse.failed(str(e))


@router.get("/plan/update")
async def update_test_plan(form: PityTestPlanForm, user_info=Depends(Permission(Config.MANAGER))):
    try:
        await PityTestPlanDao.update_test_plan(form, user_info['id'])
        return PityResponse.success()
    except Exception as e:
        return PityResponse.failed(str(e))


@router.get("/plan/delete")
async def delete_test_plan(id: int, user_info=Depends(Permission(Config.MANAGER))):
    try:
        await PityTestPlanDao.delete_test_plan(id, user_info['id'])
        return PityResponse.success()
    except Exception as e:
        return PityResponse.failed(str(e))

这边我们把测试计划的增删改权限赋予MANAGER,但其实最好是能给对应的项目经理,不过那样会稍微复杂点。我们暂时先偷个懒,或许再完善

今天的内容就分享到这里,下节会介绍测试计划如何和APScheduler结合起来,之后便是编写测试计划的前端页面部分,完成定时任务功能。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容

  • 16宿命:用概率思维提高你的胜算 以前的我是风险厌恶者,不喜欢去冒险,但是人生放弃了冒险,也就放弃了无数的可能。 ...
    yichen大刀阅读 6,044评论 0 4
  • 公元:2019年11月28日19时42分农历:二零一九年 十一月 初三日 戌时干支:己亥乙亥己巳甲戌当月节气:立冬...
    石放阅读 6,877评论 0 2
  • 今天上午陪老妈看病,下午健身房跑步,晚上想想今天还没有断舍离,马上做,衣架和旁边的的布衣架,一看乱乱,又想想自己是...
    影子3623253阅读 2,910评论 1 8