一、项目结构说明
项目目录.png
- 目录结构
interface-automation
├── base(公共方法的封装)
├──common(通用的方法)
├──model(包含字典转为类的model,非必要)
├── cases(测试用例)
├── config(配置文件的读取方法)
├── data(测试的参数)
├── data_provider(获取接口的方法)
├── report(测试报告生成目录、log信息生成目录)
├── requirements.txt(项目所需要的依赖)
├── config.ini(各类通用配置)
├── run_case.py(运行测试的主方法)
二、需要的依赖
- requirements.txt
requests==2.28.2
paho-mqtt==1.6.1
PyYaml==6.0
allure-pytest==2.9.45
pytest==7.1.2
pytest-repeat==0.9.1
pytest-rerunfailures==10.2
pytest-html==3.1.1
colorlog==6.6.0
pytest-assume==2.4.3
click
- 安装allure
- 打开链接:https://github.com/allure-framework/allure2/releases
- 找到任意一个版本(推荐最新版本),Asserts底下,选择.zip格式,下载到本机自定义目录;
- 右键点击此电脑(我的电脑)-> 属性 -> 高级 -> 环境变量 -> 用户变量(或系统变量)-> 找到名为Path的变量 -> 点击编辑 -> 点击新建 -> 将 allure 解压之后的 allure 路径的 bin 目录路径放到环境变量当中。
三、详细的代码说明
3.1 通用方法说明
- http方法的封装:common/http/base_request.py
import time
import requests
from requests import Response
from base.common.log.test_log_kit import TestLogKit
from base.common.universal.var_repo import VarRepo
from config.yaml_tool import YmlTool
logger = TestLogKit().logger
class BaseRequest(object):
_req_param = {}
_req_method = ''
_req_url = ''
# 重新登录次数
_retry_times = 0
@staticmethod
def __get_host(url):
"""从配置文件中获取url"""
host = YmlTool().get_data('server').base_url
return host + url
@classmethod
def request(cls, method, url, headers=None, params=None, json=None, read_timeout=180, judge_token=True) -> dict:
"""
:param method:请求的方法类型,支持GET/POST/DELETE/PATCH/PUT
:param url:请求的url,不需要baseUrl(定义在配置文件中)
:param headers:标头,字典,所有的标头值都必须是string,字节字符串或unicode(建议避免传递unicode类型值)
:param params:参数,字典或字节序列,作为参数加到url中,使用这个参数可以把一些键值对以?key1=value1&key2=value2的模式增加到url中
:param json:数据,字典,字节序或文件对象,重点作为向服务器提供或提交资源,作为request的内容与params不同的是,data提交的数据并不
放在url链接里, 而是放在url链接对应位置的地方作为数据来存储,它也可以接受一个字符串对象。
:param read_timeout: 读取超时
:param judge_token: 是否需要检验token过期的情况,重新进行登陆
:return:(API返回的状态码,接口返回的数据,请求的时间:单位秒)
"""
logger.info(f"[http请求]:请求的方法:{method},请求的url:{url}")
logger.info(f'[http请求参数]:params:{params},json:{json}')
start_timestamp = time.time() * 1000
res_json = {'res_code': None, 'res_data': None, "res_time": 0}
try:
session = requests.Session()
full_url = cls.__get_host(url)
cls._req_method = method
cls._req_url = url
cls._req_param = dict(headers=headers, params=params, json=json)
res = session.request(method=cls._req_method, url=full_url, timeout=(10, read_timeout), **cls._req_param)
except requests.exceptions.ConnectionError:
logger.warning('[http]网络连接失败,请检查网络和域名。当前请求的url为:{0}'.format(url))
return res_json
except Exception as e:
logger.warning(f"[http]未知错误:{e.__str__()}")
return res_json
res_json = cls._response_handle(res, start_timestamp, judge_token)
logger.info(f"[http请求]:返回的状态码:{res_json.get('res_code')},响应时间:{res_json.get('res_time')},"
f"响应结果:{res_json.get('res_data')}")
return res_json
@classmethod
def _response_handle(cls, res, start_timestamp: float, judge_token=True):
"""返回请求响应内容"""
end_timestamp = time.time() * 1000
response = cls._get_res_data(res)
res_json = {'res_code': res.status_code, 'res_data': response,
"res_time": round((end_timestamp - start_timestamp) / 1000, 2)}
if judge_token and res.status_code == 400 and response.get('code') == 21026 and cls._retry_times <= 3:
cls._retry_times += 1
logger.warning("验证码过期,重新登陆中...")
cls._get_auth()
return cls.request(cls._req_method, url=cls._req_url, **cls._req_param)
else:
cls._retry_times = 0
return res_json
@staticmethod
def _get_res_data(res: Response):
try:
if res.headers.get('Content-Type', '') == 'application/zip':
# 响应为zip压缩包时直接返回原对象
return res
else:
return res.json()
except Exception as error:
logger.info(f'[http]获取返回的res错误:{error.__str__()}')
return res.text
@classmethod
def _get_auth(cls):
"""token过期时:再次登陆,重新获取token"""
full_url = cls.__get_host('/users/loginWithEmailPassword')
if VarRepo.username is None or VarRepo.password is None or VarRepo.app_id is None:
logger.warning('[token过期]:配置文件未填登陆的账号密码,不能登陆')
cls._retry_times = 3
return
header = {'Accept': "*/*", 'Content-Type': "application/json", 'terminal': "app", "appId": VarRepo.app_id}
res = requests.post(full_url, headers=header,
json={"username": VarRepo.username, "password": VarRepo.password})
if 200 <= res.status_code < 300:
cls._req_param.get('headers')['token'] = res.json().get('accessToken')
res_data = res.json()
VarRepo.set_user_info(res_data.get('accessToken'), res_data.get('refreshToken'), res_data.get('id'))
else:
raise ValueError('登陆失败,不能继续测试')
- 断言封装方法:/common/universal/assume
from typing import Union
import pytest
class Assume:
"""
断言,assert是强断言
"""
@classmethod
def equal(cls, actual: any, expected: any):
assert actual == expected, f'实际结果[{actual}]不等于预期结果[{expected}]。'
@classmethod
def assume_equal(cls, actual_value: any, expect_value: any, tip=None):
msg = f'期望值:{cls._get_type(expect_value)}({expect_value}) ' \
f'不等于实际值:{cls._get_type(actual_value)}({actual_value})'
if tip is not None:
msg = tip + msg
return pytest.assume(actual_value == expect_value, msg)
@classmethod
def assert_equal(cls, actual: any, expected: any):
assert actual == expected, f'实际结果[{actual}不等于{expected}],不符合预期结果[{actual}等于{expected}]。'
@classmethod
def _get_type(cls, value):
try:
type_str = str(type(value))[8:-2]
return type_str
except Exception as error:
print(f"获取类型错误:{error.__str__()}")
return type(value)
- 存储全局变量的类:/common/universal/var_repo.py
class VarRepo:
"""
变量仓库,保存被多方调用的变量
"""
_ini_read = IniRead()
# allure配置的路径(未避免配置文件中的路径被代码解析成错误路径,所以将配置文件中的路径以变量形式保存)
_allure_config = _ini_read.read_section('Allure')
allure_report = os.path.join(os.getcwd(), _allure_config.report)
allure_result = os.path.join(os.getcwd(), _allure_config.result)
- 测试完的报告处理类:/common/universal/allure_file_clean.py
import json
import os.path
from datetime import datetime
from base.common.log.test_log_kit import TestLogKit
from base.common.universal.universal import Universal
from base.common.universal.var_repo import VarRepo
from config.ini_read import IniRead
from config.yaml_tool import YmlTool
class AllureFileClean:
"""
allure 报告数据清洗,提取业务需要得数据
"""
_logger = TestLogKit().logger
_ini_read = IniRead()
@classmethod
def get_testcases(cls) -> list:
""" 获取所有 allure 报告中执行用例的情况"""
# 将所有数据都收集到files中
files = []
for i in Universal.get_all_files(os.path.join(VarRepo.allure_report, 'data', 'test-cases')):
with open(i, 'r', encoding='utf-8') as file:
date = json.load(file)
files.append(date)
return files
@classmethod
def get_failed_case(cls):
"""
获取到所有失败的用例标题和用例代码路径
:return:
"""
error_epic = []
error_case = []
for i in cls.get_testcases():
status = i.get('status')
if status not in ['failed', 'broken']:
continue
full_name = i.get('fullName', '').split('#')[1]
error_case.append((f"{full_name},{i.get('name')}", status.capitalize()))
# 获取失败的epic
for item in i.get('labels', []):
if item['name'] == 'epic':
error_epic.append(item['value'])
break
error_epic = list(set(error_epic))
return error_case, error_epic
@classmethod
def get_failed_cases_detail(cls) -> str:
"""
返回所有失败的测试用例相关内容
:return:
"""
date, error_epic = cls.get_failed_case()
values = ''
# 判断有失败用例,则返回内容
if len(date) >= 1:
values = '失败用例:\n **********************************\n'
for i in date:
values += ' ' + i[0] + ':' + i[1] + '\n'
return values
@classmethod
def get_case_count(cls) -> any:
"""
统计用例数量
:return:
"""
try:
filename = os.path.join(VarRepo.allure_report, 'widgets', 'summary.json')
with open(filename, 'r', encoding='utf-8') as file:
data = json.load(file)
_case_count = data.get('statistic')
_time = data.get('time').get('duration')
total_count = _case_count['total'] - _case_count['skipped']
pass_count = _case_count['passed']
# 判断运行用例总数大于0
if _case_count['total'] > 0:
# 计算用例成功率
pass_rate = round(pass_count / total_count, 2)
else:
# 如果未运行用例,则成功率为 0.0
pass_rate = 0.0
pass_rate *= 100
# 收集用例运行时长
time_format = "%Y-%m-%d %H:%M:%S"
time_ = data.get('time')
result_text = (f'【开始时间】:'
f'{datetime.fromtimestamp(time_.get("start", 0) / 1000.0).strftime(time_format)}\n'
f'【结束时间】:'
f'{datetime.fromtimestamp(time_.get("stop", 0) / 1000.0).strftime(time_format)}\n'
f'【测试耗时】:{round(_time / 1000 / 60 / 24, 2)}小时\n'
f'【执行用例数】:{total_count}\n'
f'【测试结果】:通过率:{pass_rate}%,失败用例数:{total_count - pass_count},'
f'成功用例数:{pass_count}\n'
f'【测试报告地址】:{cls._ini_read.read_section("Webhook").allure_url}\n'
f'【失败用例列表】: \n'
f'{"用例名称":<40} 错误类型 \n')
return result_text
except Exception as e:
cls._logger.warning(f'统计执行结果时发生意外错误:{e.__str__()}。')
return ''
def get_report_info(self):
"""
从report/allure_results获取报告名称
:return:
"""
@classmethod
def get_step(cls) -> str:
"""
计算用例中某一步骤(@allure.step)的平均执行时间
:return:
"""
step_names = YmlTool().get_data('allure').step
for step_name in step_names:
text = '测试步骤[{}]-成功轮次平均响应时间:{} 秒,总轮次(包括失败轮次)平均响应时间:{} 秒。'
none_text = text.format(step_name, 0, 0)
if not step_name:
return none_text
total_steps_durations, passed_steps_durations = cls._next_content_in_allure(step_name)
ave_time_for_total = sum(total_steps_durations) / len(total_steps_durations) if total_steps_durations else 0
ave_time_for_passed = sum(passed_steps_durations) / len(
passed_steps_durations) if passed_steps_durations else 0
text = text.format(step_name,
round((float(ave_time_for_passed)), 3),
round((float(ave_time_for_total)), 3))
cls._logger.info("测试步骤[{}]-成功次数:{},总次数:{}".format(step_name, str(len(passed_steps_durations)),
str(len(total_steps_durations))))
cls._logger.info(text)
cls._logger.info("成功轮次响应时间:[" + ",".join(list(map(str, passed_steps_durations))) + "]")
cls._logger.info("所有轮次响应时间:[" + ",".join(list(map(str, total_steps_durations))) + "]")
passed_steps_durations.sort()
cls._logger.info("成功轮次响应时间排序:[" + ",".join(list(map(str, passed_steps_durations))) + "]")
@classmethod
def _next_content_in_allure(cls, step_name: str):
"""
遍历/report/allure_results/data/test-cases目录底下所有文件的内容
:return:
"""
total_steps_durations = []
passed_steps_durations = []
times_map_list = []
for content in cls.get_testcases():
test_stage = content.get('testStage')
if not test_stage:
continue
for step in test_stage.get('steps', []):
if step.get('name') != step_name:
continue
duration = step.get('time').get('duration')
timestamp = step.get('time').get('start') / 1000
total_steps_durations.append(round((float(duration) / 1000), 3))
times_map = {datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S.%f'):
round((float(duration) / 1000), 3)}
if step.get('status') != 'passed':
times_map_list.append(str(times_map))
continue
passed_steps_durations.append(round((float(duration) / 1000), 3))
cls._logger.info("失败轮次时间戳:")
cls._logger.info(times_map_list)
return total_steps_durations, passed_steps_durations
@classmethod
def get_epic(cls):
"""获取epic:"""
filename = os.path.join(VarRepo.allure_report, 'widgets', 'behaviors.json')
if not os.path.exists(filename):
return []
with open(filename, 'r', encoding='utf-8') as file:
data = json.load(file)
epic_list = []
for item in data.get('items'):
if item.get('name') in epic_list:
continue
epic_list.append(item.get('name'))
return epic_list
3.2 日志使用:/common/log
- ini.py
import datetime
import logging
import os
import colorlog
from colorlog import ColoredFormatter
# 日志文件的保存路径
log_path = os.path.join(os.getcwd(), 'report', 'logs')
# 判断目录是否存在,不存在则创建新的目录
if not os.path.isdir(log_path):
os.makedirs(log_path)
class LogKit:
def __init__(self, filename: str, logger, log_level, log_format: str = '%(asctime)s | %(levelname)s | %(message)s',
set_stream: bool = True, set_file: bool = True):
"""
日志记录器通用套件
:param filename: 待保存的日志文件名
:param logger: 日志记录器
:param log_level: 最低日志等级
:param log_format: 日志记录格式
"""
self.filename = filename
self.logger = logger
self.log_level = log_level
self.log_format = log_format
self.formatter_file = logging.Formatter(self.log_format)
self.formatter_console = ColoredFormatter(f'%(log_color)s{self.log_format}',
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'bold_yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red',
},
secondary_log_colors={},
style='%',
force_color=True)
if not logger.handlers:
if set_stream:
self.set_stream()
if set_file:
self.set_file()
def set_stream(self):
stream_handler = colorlog.StreamHandler()
stream_handler.setLevel(self.log_level)
self.logger.addHandler(stream_handler)
stream_handler.setFormatter(self.formatter_console)
stream_handler.close()
def set_file(self):
file_handler = logging.FileHandler(
f'{log_path}/{self.filename}{datetime.datetime.now().strftime("_%Y-%m-%d")}.log',
'a',
encoding='UTF-8')
file_handler.setLevel(self.log_level)
self.logger.addHandler(file_handler)
file_handler.setFormatter(self.formatter_file)
file_handler.close()
- test_log_kit.py
import logging
import logging.config
import logging.handlers
import threading
from base.common.log import LogKit
from config.ini_read import IniRead
class TestLogKit:
_instance_lock = threading.Lock()
def __init__(self, log_file_name='test_log'):
"""
日志记录
"""
ini_read = IniRead()
self._log_config = ini_read.read_section('Log')
self.logger = logging.getLogger(__file__)
self.logger.setLevel(self._log_config.level)
LogKit(log_file_name, self.logger, self._log_config.level)
def __new__(cls, *args, **kwargs):
if not hasattr(cls, '_instance'):
with TestLogKit._instance_lock:
if not hasattr(cls, '_instance'):
TestLogKit._instance = super().__new__(cls)
return TestLogKit._instance
- 使用方法
logger = TestLogKit().logger
logger.info()
3.3 用例的编写:/case/
import allure
from base.common.log.test_log_kit import TestLogKit
from base.common.universal.assume import Assume
from base.common.universal.var_repo import VarRepo
from config.yaml_tool import YmlTool
@allure.epic('Personal')
@allure.feature('Personal接口延时测试')
class TestPersonal:
logger = TestLogKit().logger
_yml_tool = YmlTool()
api_config = _yml_tool.get_data('api_config')
_test_param = {'houseId': None, 'deviceId': []}
@allure.story('测试一')
@allure.title('test_001:启动测试')
@allure.description('获取houses id。预期结果<状态码:200>')
def test_get_houses_id(self):
Assume.assume_equal(1, 1)
def _assume_http(self, res, expected_code=200):
"""http断言"""
Assume.assume_equal(res.get('res_code'), expected_code, '[http状态码]:')
Assume.assume_lte(res.get('res_time'), self.api_config.http_timeout, '[http响应时间]:')
res_data = res.get('res_data')
Assume.assert_not_none(res_data)
return res_data
@classmethod
def get_header(cls):
"""【请求的方法头】固定有token、terminal、app,固定定义在这,避免重复写多次
如果token为空,则不执行用例[用于确保有token]
"""
return {"token": VarRepo.access_token, 'terminal': VarRepo.terminal, 'appId': VarRepo.app_id}
3.4 接口的编写/data_provider
- 登陆接口
import allure
from base.common.http.base_request import BaseRequest
from base.common.log.test_log_kit import TestLogKit
from base.common.universal.universal import Universal
from base.common.universal.var_repo import VarRepo
from data_provider.param_validate import ParamValidate
class UsersApi:
logger = TestLogKit().logger
@classmethod
@allure.step('用户登录')
def login(cls, headers, data, save_token=True, **kwargs) -> dict:
"""用户登录
:param headers: 请求头
:param data: 请求参数,需要:email、password
:param save_token: 是否将token保存在全局变量中
"""
res = BaseRequest.request('POST', '/users/login', headers=headers, json=data, **kwargs)
if save_token:
cls.__reset_token(res.get('res_data'))
Universal.wait(2)
return res
```
+ 登陆mqtt
```
import json
import time
import allure
from base.common.log.test_log_kit import TestLogKit
from base.common.mqtt.mqtt_client import MQTTClient
from base.common.universal.universal import Universal
from base.common.universal.var_repo import VarRepo
from data_provider.param_validate import ParamValidate
class MqttRequests:
"""MQTT相关的操作。方法分类按照mqtt的请求方法名区分
"""
logger = TestLogKit().logger
@classmethod
@allure.step('MQTT登陆')
def login(cls, user_id: str, mq_auth: dict):
"""
MQTT的登陆
:param user_id: 用户is
:param mq_auth:需要的参数:client_Id、mqtt_username、mqtt_host、mqtt_password、transport
:return:
"""
try:
if VarRepo.mq_client is not None:
return VarRepo.mq_client.is_connect
param = ParamValidate.validate_param(['host', 'password', 'heartbeat', 'clientId'], mq_auth)
VarRepo.mq_client = MQTTClient(user_id, param.get('host'), param.get('password'),
param.get('heartbeat', 45), param.get('clientId'))
VarRepo.mq_client.run_mq_client()
Universal.wait(2)
return VarRepo.mq_client.is_connect
except Exception as error:
cls.logger.warning(f'[MQTT]连接失败:{error.__str__()}')
return False
```
#### 3.5 conftest.py
```
import os
from base.common.log.test_log_kit import TestLogKit
from base.common.mail.regular_mail import send_lcp
from base.common.universal.allure_file_clean import AllureFileClean
from base.common.universal.var_repo import VarRepo
from config.ini_read import IniRead
from config.yaml_tool import YmlTool
from data_provider.commons_api import CommonsApi
from data_provider.mqtt_requests import MqttRequests
from data_provider.users_api import UsersApi
logger = TestLogKit().logger
ini_read = IniRead()
yml_tool = YmlTool()
def auth_token():
if not VarRepo.is_login:
logger.info(f"配置文件中设置,不需要先登陆一次获取token")
return
logger.info('[登陆获取token中...]')
data = {"username": VarRepo.username, "password": VarRepo.password, 'terminalId': VarRepo.terminal_id,
'terminalName': "ZhangSan's iPhone XS"}
res = UsersApi.login({"appId": VarRepo.app_id, 'terminal': VarRepo.terminal}, data)
logger.info(f"登陆的结果:{res}")
if res is None:
raise ValueError('登陆失败,不能继续测试')
login_mqtt()
def login_mqtt():
"""登陆MQTT,后续用例,不需要再次登陆"""
header = {"token": VarRepo.access_token, 'terminal': VarRepo.terminal, 'appId': VarRepo.app_id}
CommonsApi.get_mqtt_config(header)
if VarRepo.mq_auth is None:
logger.warning('[mqtt登录], 未获取MQTT的配置,不进行登陆')
return
result = MqttRequests.login(VarRepo.user_id, VarRepo.mq_auth)
if not result:
logger.warning('[mqtt登录]登陆失败')
def make_allure_results(session):
"""
生成allure报告HTML文件
:return:
"""
allure_reports_path = session.config.getoption('allure_report_dir')
if allure_reports_path:
try:
allure_path = os.getenv('allure', '')
if allure_path == '':
cmd = f'allure generate reports -c "{allure_reports_path}" -o "{VarRepo.allure_report}"'
else:
cmd = f'{allure_path}/allure generate reports -c "{allure_reports_path}" -o "{VarRepo.allure_report}"'
os.system(cmd)
except Exception as e:
logger.warning(f'生成allure报告时出现意外问题(可根据实际情况忽略):{e.__str__()}')
def pytest_sessionfinish(session):
"""
测试完成后的动作
1.测试结束后生成并打开allure报告
:param session:
:return:
"""
make_allure_results(session)
allure = AllureFileClean()
result = allure.get_case_count()
```
## 四、主方法
```
import pytest
from config.yaml_tool import YmlTool
from conftest import auth_token
if __name__ == '__main__':
"""
-v: 输出用例执行详细信息
-s: 输出调试信息,包括print打印信息
--count: 执行轮次
--reruns: 失败用例重新执行
--reruns_delay: 下一次重新执行的间隔时间
--clean-alluredir:每次执行前清空数据,这样在生成的报告中就不会追加,只显示当前执行的用例
python_files:测试用例文件名(test_*.py)
python_classes:测试用例文件名中的类(Test*)
python_functions:测试用例类中的某个测试用例方法(test_*)
"""
# 登陆获取token,存在全局变量中,如果测试首次登陆类似需求,需要退出登陆
auth_token()
pytest_param = ['-v',
'-s',
'--count=1',
'--reruns=0',
'--reruns-delay=5',
'--alluredir=./report/allure_results',
'--clean-alluredir',
'--html=./report/html-report/report.html',
# 用例方法所在文件::用例方法所在的类::用例方法
'cases/temp.py::TestFavorites::test_get_houses_id']
_yml_tool = YmlTool()
# 根据配置的测试环境的url,如果是pre和prod,不测试一部分用例。增加了mark
# 使用的时候在用例上面添加装饰器:@pytest.mark.env_skip(这个自定义的mark)
base_url = _yml_tool.get_data('server').get('base_url')
is_skip = base_url.find('prod') != -1 or base_url.find('pre') != -1
if is_skip:
pytest_param.append('-m not env_skip')
pytest.main(pytest_param)
```
+ 自定义的mark,在pytest.ini中
```
[pytest]
markers =
env_skip: The tag indicates that pre and prod are not tested
```