思路:1.根据yapi提供的开放接口,获取接口数据,生成yaml文件
2.根据yaml文件内容生成pytest文件
效果图
···
已下是主要代码
# 测试代码中,判断期望值用的查询json值的方法
#递归查询多级json直到找出所需要的key值
def josnFind(jsonb, key):
for k in jsonb:
if k == key:
return jsonb[k]
chlidType = type(jsonb[k])
if dict == chlidType:
return josnFind(jsonb[k], key)
# 测试代码和生成代码均需使用的方法
# yamlkey传入值时,返回yaml文件中该key的值
# yamlkey不传入时,默认返回yaml 文件所有值
def get_test_data(filepath, yamlkey=0):
# validate = [] # 存放断言
# path = str(os.path.abspath(
# os.path.join(os.path.dirname(__file__), os.pardir))) + r"\TestProject\Mallproject" + "\\" + filepath
path = filepath
# print(path)
with open(path, encoding='utf-8') as f:
data = yaml.load(f.read(), Loader=yaml.SafeLoader)
# print(data)
if yamlkey == 0:
return data
return data.get(yamlkey)
import json
import os
from string import Template
import yaml
import requests
from Util.common import get_test_data
token = "yapi 项目token"
YApibaseUrl = "yapi请求地址"
requests.adapters.DEFAULT_RETRIES = 100
#接口信息获取
def get_list_menu(parm):
# time.sleep(5)
print("获取菜单")
data = zrequests('/api/interface/getCatMenu', parm)
return data
def get_list_cat(parm, ):
# time.sleep(5)
print("获取分类下接口id")
# print()
data = zrequests('/api/interface/list_cat', parm)
return data
def get_interface(parm):
# time.sleep(5)
print("获取接口详情数据")
data = zrequests('/api/interface/get', parm)
return data
def zrequests(api, parm):
i = 0
while i < 3:
try:
data = requests.get(url=YApibaseUrl + api, params=parm, timeout=(60, 7))
s = requests.session()
s.keep_alive = False
break
except requests.exceptions.ConnectionError:
print('重试请求' + api, "第" + str(i) + "次")
i = i + 1
return data
# 创建yaml文件
#生成yaml模板数据
def createYamlData(data):
BorC = str(data['title'])[0:2]
path = str(data['path'])
method = str(data['method'])
body = getBody(data)
yamlData = {path.replace(r'/', '_'): {'method': method, 'path': path, 'parm': body, 'BorC': BorC, 'expect': None}}
return yamlData
#获取接口请求参数
def getBody(data):
if 'req_body_other' in data:
jsonbody = json.loads(data['req_body_other'])['properties']
for key in jsonbody:
jsonbody[key] = getJsonValue(jsonbody[key], 'description') + ' ' + getJsonValue(jsonbody[key], 'type')
return jsonbody
else:
return None
def getJsonValue(jsonD, key):
if key in jsonD:
return jsonD[key]
else:
return 'NULL'
#创建yaml文件
def createDirandYaml(project_id, filedir):
#获取接口分类目录,根据分类创建文件夹
menu_catParm = {"token": token, "project_id": project_id}
menu_data = get_list_menu(menu_catParm).json()['data']
for menu in menu_data:
_id = menu['_id']
name = menu['name']
desc = str(menu['desc']).replace(' ', '')
if name == '公共分类':
continue
menu_dir = filedir + desc
try:
os.mkdir(menu_dir)
except FileExistsError:
print("当文件夹已存在时,无法创建该文件夹。")
with open(menu_dir + "/" + desc + '.yaml', 'w', encoding='utf-8') as f: # 'a'代表持续写入,‘w’代表覆盖写入
#创建yaml 文件,并写入数据
list_catParm = {"token": token, "catid": _id, "limit": 1000}
list_catData = get_list_cat(list_catParm).json()['data']['list']
apilist = []
for api in list_catData:
# 接口详情
# print(list_catData)
api_catParm = {"token": token, "id": api['_id']}
api_data = get_interface(api_catParm).json()['data']
# print(type(api_data))
api_dataYaml = createYamlData(api_data)
apilist.append(api_dataYaml)
yaml.dump(api_dataYaml, f, allow_unicode=True, sort_keys=False)
def createCaseCode(path):
# 测试方法模板
a = Template("\n\ndef test${casename}():\n\
yamlpath = str(os.getcwd())+'${path}'\n\
parm = get_test_data(yamlpath, '_login_checkUserIsNew')\n\
print('\\n请求数据:' + str(parm))\n\
response = requestsUtil(parm).json()\n\
print('返回数据:' + str(response))\n\
expect = parm['expect']\n\
for key in expect:\n\
assert str(josnFind(response, key)) == str(expect[key])\n")
for root, dirs, files in os.walk(path):
# print(type(root))
for file in files:
if '.yaml' in file:
# print(root + str(file).replace('.yaml', '.py'))
yamlPath = root + '\\' + str(file)
with open(root + '\\' + str(file).replace('.yaml', '.py'), 'w', encoding='utf-8') as f:
data = get_test_data(root + '\\' + file)
# 默认导入
f.write("from TestProject.Mallproject.requestsUtil import requestsUtil\n\
from Util.common import get_test_data, josnFind\n\
import os\n")
for key in data:
f.write(a.substitute(casename=key, path=file))
print(data)