from collectionsimport Counter
import util
from config.assert_caseimport AssertCase
from config.load_caseimport LoadCase
from config.oper_dbimport OperDatabase
from util.HTMLTestRunnerimport reportExcel
from util.db_utilimport MysqlDb
from util.request_utilimport RequestUnti
from config.load_configimport LoadConfig
from config.send_reportimport SendTestReport
import json
# log = util.log.logger
class NewEnergyTestCase:
# def __init__(self): # 初始化一些参数和数据
# global resultPath
# resultPath = os.path.join(report_path, "report.html") # result/report.html
# self.caseListFile = os.path.join(path, "caselist.txt") # 配置执行哪些测试文件的配置文件路径
# self.caseFile = os.path.join(path, "testCases") # 真正的测试断言文件路径
# self.caseList = []
# log.info('resultPath:%s', resultPath)
# log.info('caseListFile:%s', self.caseListFile)
# log.info('caseList:%s', self.caseList)
def findCaseById(self, case_id):
"""
根据id找测试用例
:paramcase_id:
:return:
"""
print("findCaseById")
my_db = MysqlDb()
sql =f"select * from `case` where case_id='{case_id}'"
results = my_db.query(sql, state="one")
return results
def runAllCase(self, app):
"""
执行全部用例的入口
:paramapp:
:return:
"""
print("runAllCase")
# 获取接口域名
load_config = LoadConfig()
api_host_obj = load_config.loadConfigByAppAndKey(app, "host")
# 获取全部用例
load_case = LoadCase()
results = load_case.loadAllCaseByApp(app)
for casein results:
print(case)
if case['case_run'] =="yes":
try:
# 执行用例
response =self.runCase(case, api_host_obj)
# 断言
asswer = AssertCase()
assert_msg = asswer.assertResponse(case, response)
# 更新结果存储数据库
op_db = OperDatabase()
rows = op_db.updateResultByCaseId(response, assert_msg['is_pass'], assert_msg['msg'],
case['case_id'])
print(f"更新结果rows={str(rows)}")
except Exception as e:
print(f"用例id={case['case_id']},标题:{case['case_name']},执行报错:{e}")
# 发送测试报告
# send = SendTestReport()
# send.sendTestReport(app)
# template += "{0}{1}{2}{3}{4}{5}".format(
# case['case_id'], case['case_module'], case['case_name'], case['is_pass'], case['msg'], case['response'])
report = reportExcel()
success =0
faile =0
ignore =0
serious =0
medium =0
common =0
for casein results:
if case['is_pass'] =="True":
success +=1
elif case['is_pass'] =="False":
faile +=1
if case['level'] ==1:
serious +=1
elif case['level'] ==2:
medium +=1
else:
common +=1
else:
ignore +=1
data = {"test_name": app, "test_sum":len(results), "test_success": success, "test_failed": faile,
"test_not": ignore, 'defect_rate': faile /len(results), 'total_serious': serious,
'total_medium': medium,
'total_commonly': common}
datacolumn = {'load-exchange-rate': {'serious':13, 'medium':15, 'commonly':12, 'total':30, 'percent':30},
'classify': {'serious':13, 'medium':5, 'commonly':2, 'total':30, 'percent':20},
'stock-suspend': {'serious':3, 'medium':5, 'commonly':12, 'total':30, 'percent':20},
'stock-brief': {'serious':3, 'medium':15, 'commonly':2, 'total':30, 'percent':30}}
report.writerReport(app, 'DataReport.xlsx', data, datacolumn)# DataReport.xlsx为报告名称,datapie,datacolumn为报告数据
def runCase(self, case, api_host_obj):
"""
执行单个用例
:paramcase:
:paramapi_host_obj:
:return:
"""
print("runCase")
if (case['request_header']):
headers = json.loads(case['request_header'])
if (case['request_params']):
bodys = json.loads(case['request_params'])
method = case['case_method']
req_url = api_host_obj['dict_value'] + case['case_url']
# 是否有前置条件
if case["pre_case_id"] > -1:
print("有前置条件")
pre_case_id = case['pre_case_id']
pre_case =self.findCaseById(pre_case_id)
# 递归调用
pre_response =self.runCase(pre_case, api_host_obj)
# 前置条件断言
pre_assert_msg = AssertCase().assertResponse(pre_case, pre_response)
print(pre_assert_msg)
# 前置条件不通过直接返回
if not pre_assert_msg['is_pass']:
pre_response['msg'] ='前置条件不通过,' + pre_response['msg']
return pre_response
# 判断需要case的前置条件是哪个字段,从前置用例取对应的返回
pre_fields = json.loads(case['pre_fields'])
for pre_fieldin pre_fields:
if pre_field['scope'] =='header':
# 遍历headers,替换对应的字段值,即寻找同名的字段
for header in headers.keys():
field_name = pre_field['field']
field_value = pre_response[field_name][field_name]
headers[field_name] = field_value
print(headers)
elif pre_field['scope'] =='body':
print("替换body")
print(bodys)
for bodyin bodys:
print(body)
field_name = pre_field['field']
if body == field_name:
field_value = pre_response[field_name]
bodys[field_name] = field_value
if headers['Cookie']:
case =self.findCaseById(3)
cookie = case['response']
cookie =eval(cookie)
headers['Cookie'] = cookie['Cookie']
print(headers)
print(bodys)
# 发起请求
req = RequestUnti()
response = req.request(req_url, method, headers=headers, param=bodys)
return response
if __name__ =='__main__':
print("main")
s = NewEnergyTestCase()
# result=s.loadAllCaseByApp("测试")
# result=s.findCaseById('1')
# result = s.loadConfigByAppAndKey(app="测试", key="host")
# print(result)
test = s.runAllCase("长大学工")