2021-05-07遍历用例,执行用例

from collectionsimport Counter

import util

from config.assert_caseimport AssertCase

from config.load_caseimport LoadCase

from config.oper_dbimport OperDatabase

from util.HTMLTestRunnerimport reportExcel

from util.db_utilimport MysqlDb

from util.request_utilimport RequestUnti

from config.load_configimport LoadConfig

from config.send_reportimport SendTestReport

import json

# log = util.log.logger

class NewEnergyTestCase:

# def __init__(self):  # 初始化一些参数和数据

    #    global resultPath

#    resultPath = os.path.join(report_path, "report.html")  # result/report.html

    #    self.caseListFile = os.path.join(path, "caselist.txt")  # 配置执行哪些测试文件的配置文件路径

    #    self.caseFile = os.path.join(path, "testCases")  # 真正的测试断言文件路径

    #    self.caseList = []

#    log.info('resultPath:%s', resultPath)

#    log.info('caseListFile:%s', self.caseListFile)

#    log.info('caseList:%s', self.caseList)

    def findCaseById(self, case_id):

"""

        根据id找测试用例

        :paramcase_id:

        :return:

"""

        print("findCaseById")

my_db = MysqlDb()

sql =f"select * from `case` where case_id='{case_id}'"

        results = my_db.query(sql, state="one")

return results

def runAllCase(self, app):

"""

        执行全部用例的入口

        :paramapp:

        :return:

"""

        print("runAllCase")

# 获取接口域名

        load_config = LoadConfig()

api_host_obj = load_config.loadConfigByAppAndKey(app, "host")

# 获取全部用例

        load_case = LoadCase()

results = load_case.loadAllCaseByApp(app)

for casein results:

print(case)

if case['case_run'] =="yes":

try:

# 执行用例

                    response =self.runCase(case, api_host_obj)

# 断言

                    asswer = AssertCase()

assert_msg = asswer.assertResponse(case, response)

# 更新结果存储数据库

                    op_db = OperDatabase()

rows = op_db.updateResultByCaseId(response, assert_msg['is_pass'], assert_msg['msg'],

                                                      case['case_id'])

print(f"更新结果rows={str(rows)}")

except Exception as e:

print(f"用例id={case['case_id']},标题:{case['case_name']},执行报错:{e}")

# 发送测试报告

        # send = SendTestReport()

# send.sendTestReport(app)

# template += "{0}{1}{2}{3}{4}{5}".format(

#    case['case_id'], case['case_module'], case['case_name'], case['is_pass'], case['msg'], case['response'])

        report = reportExcel()

success =0

        faile =0

        ignore =0

        serious =0

        medium =0

        common =0

        for casein results:

if case['is_pass'] =="True":

success +=1

            elif case['is_pass'] =="False":

faile +=1

                if case['level'] ==1:

serious +=1

                elif case['level'] ==2:

medium +=1

                else:

common +=1

            else:

ignore +=1

        data = {"test_name": app, "test_sum":len(results), "test_success": success, "test_failed": faile,

                "test_not": ignore, 'defect_rate': faile /len(results), 'total_serious': serious,

                'total_medium': medium,

                'total_commonly': common}

datacolumn = {'load-exchange-rate': {'serious':13, 'medium':15, 'commonly':12, 'total':30, 'percent':30},

                      'classify': {'serious':13, 'medium':5, 'commonly':2, 'total':30, 'percent':20},

                      'stock-suspend': {'serious':3, 'medium':5, 'commonly':12, 'total':30, 'percent':20},

                      'stock-brief': {'serious':3, 'medium':15, 'commonly':2, 'total':30, 'percent':30}}

report.writerReport(app, 'DataReport.xlsx', data, datacolumn)# DataReport.xlsx为报告名称,datapie,datacolumn为报告数据

    def runCase(self, case, api_host_obj):

"""

        执行单个用例

        :paramcase:

        :paramapi_host_obj:

        :return:

"""

        print("runCase")

if (case['request_header']):

headers = json.loads(case['request_header'])

if (case['request_params']):

bodys = json.loads(case['request_params'])

method = case['case_method']

req_url = api_host_obj['dict_value'] + case['case_url']

# 是否有前置条件

        if case["pre_case_id"] > -1:

print("有前置条件")

pre_case_id = case['pre_case_id']

pre_case =self.findCaseById(pre_case_id)

# 递归调用

            pre_response =self.runCase(pre_case, api_host_obj)

# 前置条件断言

            pre_assert_msg = AssertCase().assertResponse(pre_case, pre_response)

print(pre_assert_msg)

# 前置条件不通过直接返回

            if not pre_assert_msg['is_pass']:

pre_response['msg'] ='前置条件不通过,' + pre_response['msg']

return pre_response

# 判断需要case的前置条件是哪个字段,从前置用例取对应的返回

            pre_fields = json.loads(case['pre_fields'])

for pre_fieldin pre_fields:

if pre_field['scope'] =='header':

# 遍历headers,替换对应的字段值,即寻找同名的字段

                    for header in headers.keys():

field_name = pre_field['field']

field_value = pre_response[field_name][field_name]

headers[field_name] = field_value

print(headers)

elif pre_field['scope'] =='body':

print("替换body")

print(bodys)

for bodyin bodys:

print(body)

field_name = pre_field['field']

if body == field_name:

field_value = pre_response[field_name]

bodys[field_name] = field_value

if headers['Cookie']:

case =self.findCaseById(3)

cookie = case['response']

cookie =eval(cookie)

headers['Cookie'] = cookie['Cookie']

print(headers)

print(bodys)

# 发起请求

        req = RequestUnti()

response = req.request(req_url, method, headers=headers, param=bodys)

return response

if __name__ =='__main__':

print("main")

s = NewEnergyTestCase()

# result=s.loadAllCaseByApp("测试")

# result=s.findCaseById('1')

    # result = s.loadConfigByAppAndKey(app="测试", key="host")

# print(result)

    test = s.runAllCase("长大学工")

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容