背景
运维时,往往需要对多个接口做性能测试。Locust只能对单个py文件测试,若不优化需人工处理每个文件。
实现功能
- 执行指定目录下py文件中的指定标签
- log、csv文件保存到指定目录下
def get_file_tags(case_path):
"""
获取指定py文件中的标签值,以列表的形式返回
注:文件中不能存在已注释的tag标签
:param case_path: py文件路径
:return: 标签列表
"""
version_list = set()
with open(case_path, mode='r', encoding="utf-8") as project_file:
file_str = project_file.readlines()
for line_str in file_str:
if re.search("@tag", line_str):
version_list.update(re.findall(r"v\d+", line_str))
return version_list
LOCUST_ALL_FILE_ALL_TAG, LOCUST_ALL_FILE_SELECTED_TAG, LOCUST_SELECTED_FILE_ALL_TAG, LOCUST_SELECTED_FILE_SELECTED_TAG\
= ["all_file_all_tag", "all_file_selected_tag", "selected_file_all_tag", "selected_file_selected_tag"]
def make_out_put_dir():
"""
创建输出文件文件夹
:return: log路径,csv路径
"""
now_time_str = datetime.now().strftime("%Y%m%d_%H%M%S")
base_log_dir = os.path.join(base_dir, "Outputs", 'logs')
base_csv_dir = os.path.join(base_dir, "Outputs", 'reports')
# 创建输出文件夹
os.mkdir(os.path.join(base_log_dir, now_time_str))
os.mkdir(os.path.join(base_csv_dir, now_time_str))
return os.path.join(base_log_dir, now_time_str), os.path.join(base_csv_dir, now_time_str)
def get_case_file_name(case_directory):
"""
递归函数,获取指定目录下的所有的load_*.py文件目录列表
"""
case_file_list = list()
if not os.path.isdir(case_directory):
return case_file_list
for child_file in os.listdir(case_directory):
if os.path.isdir(os.path.join(case_directory, child_file)):
case_file_list.extend(get_case_file_name(os.path.join(case_directory, child_file)))
else:
ret = re.match(r"^load_.*\.py$", child_file)
if ret:
case_file_list.append(os.path.join(case_directory, ret.group()))
return case_file_list
def write_case_to_file(case_directory):
"""
将指定目录下要执行的load_*.py文件都记录在跟目录下的case_file.txt里面。
"""
case_file_list = get_case_file_name(case_directory)
with open(os.path.join(base_dir, "case_file.txt"), mode='w') as file:
for case_name in case_file_list:
file.write("{}{}".format(case_name.split(base_dir)[1], "\r"))
def get_case_data():
"""
从根目录下的case_file.txt中读取要进行压测的load_*.py文件,存放到列表中。
"""
case_file_name = list()
with open(os.path.join(base_dir, "case_file.txt"), mode='r') as file:
while True:
case_name = file.readline()
if not case_name:
return case_file_name
# linux 系统下文件路径格式变更
if sys.platform.startswith("linux"):
match_obj = re.search(r"TestCases\\(.*)\\(.*)\\(load_.*[.]py)", case_name)
if match_obj:
case_name = os.path.join(base_dir, "TestCases", match_obj.group(1), match_obj.group(2),
match_obj.group(3))
case_file_name.append(case_name)
def start_locust(run_type, case_path=os.path.join(os.path.dirname(os.path.dirname((os.path.abspath(__file__)))), "TestCases")):
"""
根据run_type类型确认执行脚本及tag
:param run_type: LOCUST_ALL_FILE_ALL_TAG 所有文件所有标签
LOCUST_ALL_FILE_SELECTED_TAG 所有文件指定标签
LOCUST_SELECTED_FILE_ALL_TAG 指定文件所有标签
LOCUST_SELECTED_FILE_SELECTED_TAG 指定文件指定标签
:param case_path: 运行所有标签时,读取该目录下的脚本
:return:
"""
# 执行所有文件时,读取case_path目录下所有脚本并保存到case_file.txt中
if run_type in [LOCUST_ALL_FILE_ALL_TAG, LOCUST_ALL_FILE_SELECTED_TAG]:
write_case_to_file(case_path)
log_dir, csv_dir = make_out_put_dir()
# 获取所有执行脚本
case_files = get_case_data()
for case_file in case_files:
ret = re.search(r"(load_.*)\.py$", case_file)
if ret:
# 执行语句
locust_parse = "locust -f {}{} --host={} --headless --users {} --spawn-rate {} --run-time {} "\
.format(base_dir, case_file[:-1],
configr.cp.get('api', 'pre_url'),
configr.cp.get('locust_run', 'clients'),
configr.cp.get('locust_run', 'spawn_rate'),
configr.cp.get('locust_run', 'run_times'))
# 运行类型为指定标签,则在locust_parse中增加 -T
if run_type in [LOCUST_ALL_FILE_SELECTED_TAG, LOCUST_SELECTED_FILE_SELECTED_TAG]:
if list(set(eval(configr.cp.get("run_tags", "tags"))) & get_file_tags(base_dir+case_file[:-1])):
# 当前文件包含配置相中的tag, 在locust_parse中增加 -T
search_res = re.findall(r"v\d+", configr.cp.get("run_tags", "tags"))
locust_parse += "--tags "
locust_parse += " ".join(search_res) + " "
else:
# 结束当前循环,开始下一个循环
continue
try:
os.mkdir(os.path.join(csv_dir, ret.group(1)))
except Exception as e: # 避免出现名称相同的csv文件存在而导致运行失败
logging.error(e.__repr__())
else:
log_path = os.path.join(log_dir, ret.group(1)) + '.log'
cvs_path = os.path.join(csv_dir, ret.group(1), ret.group(1))
locust_parse += "--loglevel=INFO --logfile={} --csv={}".format(log_path, cvs_path)
# 执行
os.system(locust_parse)