from flask import Flask, request, render_template_string, jsonify
import os
import signal
import subprocess
import time
import hmac
import hashlib
import base64
import yaml
import csv
from werkzeug.utils import secure_filename
from itertools import cycle
import re
app = Flask(__name__)
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'csv'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
# 用于存储 Taurus 测试进程的 PID 和结果
taurus_process = None
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>动态 API 压测工具</title>
<script>
function addApiField() {
var container = document.getElementById("apiContainer");
var index = document.querySelectorAll(".apiBlock").length + 1;
var apiBlock = document.createElement("div");
apiBlock.classList.add("apiBlock");
apiBlock.innerHTML = `
<hr>
<h3>接口 ${index}</h3>
<label>URL:</label><br>
<input type="text" name="url_${index}" size="80"><br><br>
<label>请求方法:</label><br>
<select name="method_${index}">
<option value="GET">GET</option>
<option value="POST">POST</option>
<option value="PUT">PUT</option>
<option value="DELETE">DELETE</option>
</select><br><br>
<label>Query 参数 (仅 GET 请求, 例如 symbol=ETH_USDT_PERP):</label><br>
<input type="text" name="query_params_${index}" size="80"><br><br>
<label>Header (JSON 格式):</label><br>
<textarea name="headers_${index}" rows="4" cols="80"></textarea><br><br>
<label>Body (JSON 格式, 仅 POST/PUT 需要):</label><br>
<textarea name="body_${index}" rows="6" cols="80"></textarea><br><br>
<label>期望状态码 (例如 200):</label><br>
<input type="text" name="expected_status_${index}" size="10"><br><br>
<label>期望响应包含的关键字 (可选, 例如 "success"):</label><br>
<input type="text" name="expected_body_${index}" size="40"><br><br>
<button type="button" onclick="removeApiField(this)">删除该接口</button>
`;
container.appendChild(apiBlock);
}
function removeApiField(button) {
button.parentElement.remove();
}
function toggleIterationsInput() {
var useIterationsCheckbox = document.getElementById('use_iterations');
var iterationsInput = document.getElementById('iterations_input');
iterationsInput.disabled = !useIterationsCheckbox.checked;
}
// 查询 Taurus PID
function queryTaurusPID() {
fetch('/get-taurus-pid')
.then(response => response.json())
.then(data => {
if (data.status === 'running') {
document.getElementById('pid-display').innerText = `当前 Taurus PID: ${data.pid}`;
} else {
document.getElementById('pid-display').innerText = data.message;
}
});
}
// 停止 Taurus 测试
function stopTaurus() {
var pid = document.getElementById('pid').value;
if (!pid) {
alert('请输入有效的 PID');
return;
}
fetch('/kill-taurus', {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded'
},
body: `pid=${pid}`
})
.then(response => response.json())
.then(data => {
if (data.status === 'success') {
alert(data.message);
} else {
alert('停止失败: ' + data.message);
}
});
}
</script>
</head>
<body>
<h2>动态 API 压测工具</h2>
<form method="post" action="/run-taurus" enctype="multipart/form-data">
<h3>全局压力测试参数</h3>
<label>并发数 (concurrency):</label><br>
<select name="concurrency">
<option value="1">1 (单线程)</option>
<option value="2">2</option>
<option value="5">5</option>
<option value="10">10</option>
<option value="20">20</option>
<option value="50">50</option>
<option value="100">100</option>
<option value="200">200</option>
</select><br><br>
<label>压测持续时间 (hold-for, 单位为秒):</label><br>
<input type="text" name="hold_for" value="600"><br><br>
<label>每个场景的迭代次数 (iterations):</label><br>
<input type="checkbox" name="use_iterations" id="use_iterations" onchange="toggleIterationsInput()">
<label for="use_iterations">启用迭代次数</label><br>
<input type="text" name="iterations" id="iterations_input" placeholder="迭代次数" disabled><br><br>
<label>上传 API Key 文件 (CSV 格式, 列: api_key,api_secret):</label><br>
<input type="file" name="api_keys_file" accept=".csv"><br><br>
<hr>
<h3>添加 API 接口</h3>
<div id="apiContainer"></div>
<button type="button" onclick="addApiField()">+ 添加接口</button>
<br><br>
<input type="submit" value="生成配置并运行 Taurus">
</form>
<button id="stop_button" disabled>停止测试</button>
<button onclick="window.location.href='/results'">查看最后一次的结果</button>
<h3>手动停止 Taurus 测试</h3>
<form id="kill-form" method="post" action="/kill-taurus">
<label for="pid">输入 Taurus 的 PID:</label>
<input type="text" id="pid" name="pid" required>
<button type="button" onclick="stopTaurus()">确认停止</button>
</form>
<h3>查询 Taurus PID</h3>
<button onclick="queryTaurusPID()">查询 PID</button>
<p id="pid-display"></p>
</body>
</html>
"""
def parse_jtl_file(jtl_filename):
results = []
status_code_count = {}
try:
with open(jtl_filename, "r", encoding="utf-8-sig") as jtl_file:
reader = csv.DictReader(jtl_file)
for row in reader:
row = {key.strip(): value for key, value in row.items()}
if not row.get("label") and not row.get("responseCode"):
continue
status_code = row.get("responseCode", "N/A")
if status_code not in status_code_count:
status_code_count[status_code] = 0
status_code_count[status_code] += 1
results.append({
"url": row.get("label", "N/A"),
"status_code": status_code,
"response_time": row.get("elapsed", "N/A"),
"response_body": row.get("responseMessage", "N/A"),
"assertion_result": row.get("success", "true")
})
except FileNotFoundError:
print(f"JTL 文件未找到: {jtl_filename}")
except Exception as e:
print(f"解析 JTL 文件时出错: {e}")
return results, status_code_count
def extract_artifacts_dir(output):
match = re.search(r"Artifacts dir: (.+)", output)
if match:
return match.group(1).strip()
return None
@app.route("/", methods=["GET"])
def index():
return render_template_string(HTML_TEMPLATE)
@app.route("/run-taurus", methods=["POST"])
def run_taurus():
global taurus_process
concurrency = int(request.form.get("concurrency", "1"))
hold_for = request.form.get("hold_for", "1")
use_iterations = "use_iterations" in request.form
iterations = int(request.form.get("iterations", "1")) if use_iterations else None
file = request.files.get('api_keys_file')
if not file or not allowed_file(file.filename):
return "请上传有效的 API Key 文件 (CSV 格式)"
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
api_keys = load_api_keys(file_path)
if not api_keys:
return "API Key 文件为空或格式错误"
key_cycle = cycle(api_keys)
scenarios = {}
execution = []
index = 1
while f"url_{index}" in request.form:
api_name = f"api_test_{index}"
url = request.form[f"url_{index}"]
method = request.form[f"method_{index}"].upper()
query_params = request.form.get(f"query_params_{index}", "").strip()
headers_str = request.form.get(f"headers_{index}", "{}")
body = request.form.get(f"body_{index}", "").strip()
expected_status = request.form.get(f"expected_status_{index}", "").strip()
expected_body = request.form.get(f"expected_body_{index}", "").strip()
assertions = []
if expected_status:
assertions.append({"contains": expected_status, "subject": "http-code"})
if expected_body:
assertions.append({"contains": expected_body, "subject": "body"})
request_path = "/" + "/".join(url.split("/")[3:])
try:
headers = yaml.safe_load(headers_str) if headers_str else {}
except Exception as e:
return f"接口 {index} 的 Header 格式错误: {e}"
api_data = next(key_cycle)
api_key = api_data["api_key"]
api_secret = api_data["api_secret"]
signature, ts = generate_signature(api_secret, method, request_path, query_params, body)
headers.update({
"key": api_key,
"signatureMethod": "HmacSHA256",
"signatureVersion": "2",
"signTimestamp": ts,
"signature": signature
})
full_url = f"{url}?{query_params}" if query_params else url
scenarios[api_name] = {
"requests": [
{
"url": full_url,
"method": method,
"headers": headers,
"body": body if method in ["POST", "PUT"] else None,
"assert": assertions
}
]
}
execution.append({
"scenario": api_name,
"concurrency": concurrency,
"hold-for": hold_for,
"iterations": iterations
})
index += 1
if not scenarios:
return "请至少添加一个 API 接口"
taurus_config = {
"execution": execution,
"scenarios": scenarios,
"modules": {
"jmeter": {
"write-xml-jtl": "full"
}
}
}
yaml_config = yaml.dump(taurus_config, sort_keys=False)
config_filename = "temp_taurus_config.yaml"
with open(config_filename, "w", encoding="utf-8") as f:
f.write(yaml_config)
try:
# 使用 Popen 启动 Taurus 测试,确保异步运行
taurus_process = subprocess.Popen(["bzt", config_filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = taurus_process.communicate() # 等待测试完成并获取输出
taurus_process = None # 清理进程变量
except Exception as e:
return f"Taurus 测试启动失败: {str(e)}"
# 提取结果目录
artifacts_dir = extract_artifacts_dir(output.decode("utf-8"))
if not artifacts_dir:
return f"未找到 Artifacts dir. 请检查 Taurus 是否成功运行。<br><pre>{output}</pre>"
# 解析 JTL 文件并生成报告
all_results = []
status_code_summary = {}
for filename in os.listdir(artifacts_dir):
if filename.endswith(".jtl"):
jtl_filename = os.path.join(artifacts_dir, filename)
results, status_code_count = parse_jtl_file(jtl_filename)
all_results.extend(results)
for code, count in status_code_count.items():
if code not in status_code_summary:
status_code_summary[code] = 0
status_code_summary[code] += count
# 分离断言成功和失败的记录
success_results = [r for r in all_results if r.get("assertion_result", "true").lower() == "true"]
failure_results = [r for r in all_results if r.get("assertion_result", "true").lower() != "true"]
# 分离成功和失败的状态码统计
success_status_code_summary = {}
failure_status_code_summary = {}
for code, count in status_code_summary.items():
success_count = sum(1 for r in success_results if r["status_code"] == code)
failure_count = sum(1 for r in failure_results if r["status_code"] == code)
if success_count > 0:
success_status_code_summary[code] = success_count
if failure_count > 0:
failure_status_code_summary[code] = failure_count
# 渲染报告页面
report_template = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Taurus 执行结果</title>
<style>
.container { display: flex; gap: 20px; }
.tab { flex: 1; border: 1px solid #ccc; padding: 10px; box-shadow: 0 0 10px rgba(0,0,0,0.1); }
.tab h3 { margin-top: 0; }
table { width: 100%; border-collapse: collapse; margin-top: 10px; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
</style>
</head>
<body>
<a href="/">返回</a>
<h3>Taurus 执行结果</h3>
<pre>{{ output }}</pre>
<h3>错误信息</h3>
<pre>{{ error }}</pre>
<hr>
<div class="container">
<div class="tab">
<h3>断言成功的接口响应结果</h3>
<p>总响应条数: {{ success_count }}</p>
<table>
<tr><th>状态码</th><th>数量</th></tr>
{% for code, count in success_status_code_summary.items() %}
<tr><td>{{ code }}</td><td>{{ count }}</td></tr>
{% endfor %}
</table>
</div>
<div class="tab">
<h3>断言失败的接口响应结果</h3>
<p>总响应条数: {{ failure_count }}</p>
<table>
<tr><th>状态码</th><th>数量</th></tr>
{% for code, count in failure_status_code_summary.items() %}
<tr><td>{{ code }}</td><td>{{ count }}</td></tr>
{% endfor %}
</table>
</div>
</div>
<a href="/">返回</a>
</body>
</html>
"""
return render_template_string(
report_template,
output=output.decode("utf-8"),
error=error.decode("utf-8"),
success_count=len(success_results),
failure_count=len(failure_results),
success_status_code_summary=success_status_code_summary,
failure_status_code_summary=failure_status_code_summary
)
@app.route("/get-taurus-pid", methods=["GET"])
def get_taurus_pid():
global taurus_process
if taurus_process and taurus_process.poll() is None: # 检查进程是否仍在运行
return jsonify({"status": "running", "pid": taurus_process.pid})
else:
return jsonify({"status": "stopped", "message": "没有正在运行的 Taurus 测试"})
@app.route("/kill-taurus", methods=["POST"])
def kill_taurus():
global taurus_process
pid = request.form.get("pid")
if not pid:
return jsonify({"status": "error", "message": "请输入有效的 PID"})
try:
os.kill(int(pid), signal.SIGTERM)
if taurus_process and taurus_process.pid == int(pid):
taurus_process = None
return jsonify({"status": "success", "message": f"Taurus 测试进程 (PID: {pid}) 已被终止"})
except Exception as e:
return jsonify({"status": "error", "message": str(e)})
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def load_api_keys(file_path):
keys = []
with open(file_path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
keys.append({
"api_key": row["api_key"].strip(),
"api_secret": row["api_secret"].strip()
})
return keys
def generate_signature(api_secret, method, request_path, query_params="", request_body=""):
ts = get_timestamp()
if method.upper() in ["GET", "DELETE"]:
params = f"{query_params}&signTimestamp={ts}" if query_params else f"signTimestamp={ts}"
else:
params = f"requestBody={request_body}&signTimestamp={ts}"
request_string = f"{method.upper()}\n{request_path}\n{params}"
signature = hmac.new(api_secret.encode("utf-8"), request_string.encode("utf-8"), hashlib.sha256).digest()
return base64.b64encode(signature).decode("utf-8"), ts
def get_timestamp():
return str(int(time.time() * 1000))
if __name__ == '__main__':
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
app.run(debug=True)
带有PID的Taurus,可以停止服务
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
推荐阅读更多精彩内容
- 做自己喜欢做的事,然后把它做到最好! 最好的人才是免费的,因为他赚取的利润早就把他的薪水给盖住了。 注意他人成功的...
- 事实上,接单车辆不一致等问题非常普遍,非注册车主被封是完全可以解封的。 毕竟,这不 是一个司机的错。 因此,需要申...
- 方案一 :返回的时候,执行一个js,关闭播放器 方案二 :返回的时候,让webView加载一次空白页,例如:
- 今天我来分享一下,我在地产的时候开单的情况。我所在的楼盘属于新农村,相信大家都知道新农村这个词,就是属于政府补贴的...