带有PID的Taurus,可以停止服务

from flask import Flask, request, render_template_string, jsonify
import os
import signal
import subprocess
import time
import hmac
import hashlib
import base64
import yaml
import csv
from werkzeug.utils import secure_filename
from itertools import cycle
import re

app = Flask(__name__)

UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'csv'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER

# 用于存储 Taurus 测试进程的 PID 和结果
taurus_process = None

HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>动态 API 压测工具</title>
    <script>
        function addApiField() {
            var container = document.getElementById("apiContainer");
            var index = document.querySelectorAll(".apiBlock").length + 1;
            var apiBlock = document.createElement("div");
            apiBlock.classList.add("apiBlock");
            apiBlock.innerHTML = `
                <hr>
                <h3>接口 ${index}</h3>
                <label>URL:</label><br>
                <input type="text" name="url_${index}" size="80"><br><br>
                <label>请求方法:</label><br>
                <select name="method_${index}">
                    <option value="GET">GET</option>
                    <option value="POST">POST</option>
                    <option value="PUT">PUT</option>
                    <option value="DELETE">DELETE</option>
                </select><br><br>
                <label>Query 参数 (仅 GET 请求, 例如 symbol=ETH_USDT_PERP):</label><br>
                <input type="text" name="query_params_${index}" size="80"><br><br>
                <label>Header (JSON 格式):</label><br>
                <textarea name="headers_${index}" rows="4" cols="80"></textarea><br><br>
                <label>Body (JSON 格式, 仅 POST/PUT 需要):</label><br>
                <textarea name="body_${index}" rows="6" cols="80"></textarea><br><br>
                <label>期望状态码 (例如 200):</label><br>
                <input type="text" name="expected_status_${index}" size="10"><br><br>
                <label>期望响应包含的关键字 (可选, 例如 "success"):</label><br>
                <input type="text" name="expected_body_${index}" size="40"><br><br>
                <button type="button" onclick="removeApiField(this)">删除该接口</button>
            `;
            container.appendChild(apiBlock);
        }

        function removeApiField(button) {
            button.parentElement.remove();
        }

        function toggleIterationsInput() {
            var useIterationsCheckbox = document.getElementById('use_iterations');
            var iterationsInput = document.getElementById('iterations_input');
            iterationsInput.disabled = !useIterationsCheckbox.checked;
        }

        // 查询 Taurus PID
        function queryTaurusPID() {
            fetch('/get-taurus-pid')
                .then(response => response.json())
                .then(data => {
                    if (data.status === 'running') {
                        document.getElementById('pid-display').innerText = `当前 Taurus PID: ${data.pid}`;
                    } else {
                        document.getElementById('pid-display').innerText = data.message;
                    }
                });
        }

        // 停止 Taurus 测试
        function stopTaurus() {
            var pid = document.getElementById('pid').value;
            if (!pid) {
                alert('请输入有效的 PID');
                return;
            }
            fetch('/kill-taurus', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded'
                },
                body: `pid=${pid}`
            })
            .then(response => response.json())
            .then(data => {
                if (data.status === 'success') {
                    alert(data.message);
                } else {
                    alert('停止失败: ' + data.message);
                }
            });
        }
    </script>
</head>
<body>
    <h2>动态 API 压测工具</h2>
    <form method="post" action="/run-taurus" enctype="multipart/form-data">
        <h3>全局压力测试参数</h3>
        <label>并发数 (concurrency):</label><br>
        <select name="concurrency">
            <option value="1">1 (单线程)</option>
            <option value="2">2</option>
            <option value="5">5</option>
            <option value="10">10</option>
            <option value="20">20</option>
            <option value="50">50</option>
            <option value="100">100</option>
            <option value="200">200</option>
        </select><br><br>
        <label>压测持续时间 (hold-for, 单位为秒):</label><br>
        <input type="text" name="hold_for" value="600"><br><br>
        <label>每个场景的迭代次数 (iterations):</label><br>
        <input type="checkbox" name="use_iterations" id="use_iterations" onchange="toggleIterationsInput()">
        <label for="use_iterations">启用迭代次数</label><br>
        <input type="text" name="iterations" id="iterations_input" placeholder="迭代次数" disabled><br><br>
        <label>上传 API Key 文件 (CSV 格式, 列: api_key,api_secret):</label><br>
        <input type="file" name="api_keys_file" accept=".csv"><br><br>
        <hr>
        <h3>添加 API 接口</h3>
        <div id="apiContainer"></div>
        <button type="button" onclick="addApiField()">+ 添加接口</button>
        <br><br>
        <input type="submit" value="生成配置并运行 Taurus">
    </form>
    <button id="stop_button" disabled>停止测试</button>
    <button onclick="window.location.href='/results'">查看最后一次的结果</button>

    <h3>手动停止 Taurus 测试</h3>
    <form id="kill-form" method="post" action="/kill-taurus">
        <label for="pid">输入 Taurus 的 PID:</label>
        <input type="text" id="pid" name="pid" required>
        <button type="button" onclick="stopTaurus()">确认停止</button>
    </form>

    <h3>查询 Taurus PID</h3>
    <button onclick="queryTaurusPID()">查询 PID</button>
    <p id="pid-display"></p>
</body>
</html>
"""

def parse_jtl_file(jtl_filename):
    results = []
    status_code_count = {}
    try:
        with open(jtl_filename, "r", encoding="utf-8-sig") as jtl_file:
            reader = csv.DictReader(jtl_file)
            for row in reader:
                row = {key.strip(): value for key, value in row.items()}
                if not row.get("label") and not row.get("responseCode"):
                    continue
                status_code = row.get("responseCode", "N/A")
                if status_code not in status_code_count:
                    status_code_count[status_code] = 0
                status_code_count[status_code] += 1
                results.append({
                    "url": row.get("label", "N/A"),
                    "status_code": status_code,
                    "response_time": row.get("elapsed", "N/A"),
                    "response_body": row.get("responseMessage", "N/A"),
                    "assertion_result": row.get("success", "true")
                })
    except FileNotFoundError:
        print(f"JTL 文件未找到: {jtl_filename}")
    except Exception as e:
        print(f"解析 JTL 文件时出错: {e}")
    return results, status_code_count

def extract_artifacts_dir(output):
    match = re.search(r"Artifacts dir: (.+)", output)
    if match:
        return match.group(1).strip()
    return None

@app.route("/", methods=["GET"])
def index():
    return render_template_string(HTML_TEMPLATE)

@app.route("/run-taurus", methods=["POST"])
def run_taurus():
    global taurus_process
    concurrency = int(request.form.get("concurrency", "1"))
    hold_for = request.form.get("hold_for", "1")
    use_iterations = "use_iterations" in request.form
    iterations = int(request.form.get("iterations", "1")) if use_iterations else None

    file = request.files.get('api_keys_file')
    if not file or not allowed_file(file.filename):
        return "请上传有效的 API Key 文件 (CSV 格式)"
    filename = secure_filename(file.filename)
    file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    file.save(file_path)
    api_keys = load_api_keys(file_path)
    if not api_keys:
        return "API Key 文件为空或格式错误"
    key_cycle = cycle(api_keys)
    scenarios = {}
    execution = []
    index = 1
    while f"url_{index}" in request.form:
        api_name = f"api_test_{index}"
        url = request.form[f"url_{index}"]
        method = request.form[f"method_{index}"].upper()
        query_params = request.form.get(f"query_params_{index}", "").strip()
        headers_str = request.form.get(f"headers_{index}", "{}")
        body = request.form.get(f"body_{index}", "").strip()
        expected_status = request.form.get(f"expected_status_{index}", "").strip()
        expected_body = request.form.get(f"expected_body_{index}", "").strip()
        assertions = []
        if expected_status:
            assertions.append({"contains": expected_status, "subject": "http-code"})
        if expected_body:
            assertions.append({"contains": expected_body, "subject": "body"})
        request_path = "/" + "/".join(url.split("/")[3:])
        try:
            headers = yaml.safe_load(headers_str) if headers_str else {}
        except Exception as e:
            return f"接口 {index} 的 Header 格式错误: {e}"
        api_data = next(key_cycle)
        api_key = api_data["api_key"]
        api_secret = api_data["api_secret"]
        signature, ts = generate_signature(api_secret, method, request_path, query_params, body)
        headers.update({
            "key": api_key,
            "signatureMethod": "HmacSHA256",
            "signatureVersion": "2",
            "signTimestamp": ts,
            "signature": signature
        })
        full_url = f"{url}?{query_params}" if query_params else url
        scenarios[api_name] = {
            "requests": [
                {
                    "url": full_url,
                    "method": method,
                    "headers": headers,
                    "body": body if method in ["POST", "PUT"] else None,
                    "assert": assertions
                }
            ]
        }
        execution.append({
            "scenario": api_name,
            "concurrency": concurrency,
            "hold-for": hold_for,
            "iterations": iterations
        })
        index += 1
    if not scenarios:
        return "请至少添加一个 API 接口"
    taurus_config = {
        "execution": execution,
        "scenarios": scenarios,
        "modules": {
            "jmeter": {
                "write-xml-jtl": "full"
            }
        }
    }
    yaml_config = yaml.dump(taurus_config, sort_keys=False)
    config_filename = "temp_taurus_config.yaml"
    with open(config_filename, "w", encoding="utf-8") as f:
        f.write(yaml_config)
    try:
        # 使用 Popen 启动 Taurus 测试,确保异步运行
        taurus_process = subprocess.Popen(["bzt", config_filename], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        output, error = taurus_process.communicate()  # 等待测试完成并获取输出
        taurus_process = None  # 清理进程变量
    except Exception as e:
        return f"Taurus 测试启动失败: {str(e)}"

    # 提取结果目录
    artifacts_dir = extract_artifacts_dir(output.decode("utf-8"))
    if not artifacts_dir:
        return f"未找到 Artifacts dir. 请检查 Taurus 是否成功运行。<br><pre>{output}</pre>"

    # 解析 JTL 文件并生成报告
    all_results = []
    status_code_summary = {}
    for filename in os.listdir(artifacts_dir):
        if filename.endswith(".jtl"):
            jtl_filename = os.path.join(artifacts_dir, filename)
            results, status_code_count = parse_jtl_file(jtl_filename)
            all_results.extend(results)
            for code, count in status_code_count.items():
                if code not in status_code_summary:
                    status_code_summary[code] = 0
                status_code_summary[code] += count

    # 分离断言成功和失败的记录
    success_results = [r for r in all_results if r.get("assertion_result", "true").lower() == "true"]
    failure_results = [r for r in all_results if r.get("assertion_result", "true").lower() != "true"]

    # 分离成功和失败的状态码统计
    success_status_code_summary = {}
    failure_status_code_summary = {}
    for code, count in status_code_summary.items():
        success_count = sum(1 for r in success_results if r["status_code"] == code)
        failure_count = sum(1 for r in failure_results if r["status_code"] == code)
        if success_count > 0:
            success_status_code_summary[code] = success_count
        if failure_count > 0:
            failure_status_code_summary[code] = failure_count

    # 渲染报告页面
    report_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="UTF-8">
        <title>Taurus 执行结果</title>
        <style>
            .container { display: flex; gap: 20px; }
            .tab { flex: 1; border: 1px solid #ccc; padding: 10px; box-shadow: 0 0 10px rgba(0,0,0,0.1); }
            .tab h3 { margin-top: 0; }
            table { width: 100%; border-collapse: collapse; margin-top: 10px; }
            th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
        </style>
    </head>
    <body>
        <a href="/">返回</a>
        <h3>Taurus 执行结果</h3>
        <pre>{{ output }}</pre>
        <h3>错误信息</h3>
        <pre>{{ error }}</pre>
        <hr>
        <div class="container">
            <div class="tab">
                <h3>断言成功的接口响应结果</h3>
                <p>总响应条数: {{ success_count }}</p>
                <table>
                    <tr><th>状态码</th><th>数量</th></tr>
                    {% for code, count in success_status_code_summary.items() %}
                        <tr><td>{{ code }}</td><td>{{ count }}</td></tr>
                    {% endfor %}
                </table>
            </div>
            <div class="tab">
                <h3>断言失败的接口响应结果</h3>
                <p>总响应条数: {{ failure_count }}</p>
                <table>
                    <tr><th>状态码</th><th>数量</th></tr>
                    {% for code, count in failure_status_code_summary.items() %}
                        <tr><td>{{ code }}</td><td>{{ count }}</td></tr>
                    {% endfor %}
                </table>
            </div>
        </div>
        <a href="/">返回</a>
    </body>
    </html>
    """
    return render_template_string(
        report_template,
        output=output.decode("utf-8"),
        error=error.decode("utf-8"),
        success_count=len(success_results),
        failure_count=len(failure_results),
        success_status_code_summary=success_status_code_summary,
        failure_status_code_summary=failure_status_code_summary
    )


@app.route("/get-taurus-pid", methods=["GET"])
def get_taurus_pid():
    global taurus_process
    if taurus_process and taurus_process.poll() is None:  # 检查进程是否仍在运行
        return jsonify({"status": "running", "pid": taurus_process.pid})
    else:
        return jsonify({"status": "stopped", "message": "没有正在运行的 Taurus 测试"})


@app.route("/kill-taurus", methods=["POST"])
def kill_taurus():
    global taurus_process
    pid = request.form.get("pid")
    if not pid:
        return jsonify({"status": "error", "message": "请输入有效的 PID"})
    try:
        os.kill(int(pid), signal.SIGTERM)
        if taurus_process and taurus_process.pid == int(pid):
            taurus_process = None
        return jsonify({"status": "success", "message": f"Taurus 测试进程 (PID: {pid}) 已被终止"})
    except Exception as e:
        return jsonify({"status": "error", "message": str(e)})


def allowed_file(filename):
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS


def load_api_keys(file_path):
    keys = []
    with open(file_path, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            keys.append({
                "api_key": row["api_key"].strip(),
                "api_secret": row["api_secret"].strip()
            })
    return keys


def generate_signature(api_secret, method, request_path, query_params="", request_body=""):
    ts = get_timestamp()
    if method.upper() in ["GET", "DELETE"]:
        params = f"{query_params}&signTimestamp={ts}" if query_params else f"signTimestamp={ts}"
    else:
        params = f"requestBody={request_body}&signTimestamp={ts}"
    request_string = f"{method.upper()}\n{request_path}\n{params}"
    signature = hmac.new(api_secret.encode("utf-8"), request_string.encode("utf-8"), hashlib.sha256).digest()
    return base64.b64encode(signature).decode("utf-8"), ts


def get_timestamp():
    return str(int(time.time() * 1000))


if __name__ == '__main__':
    os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
    app.run(debug=True)
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容