2025-12-01 Chat2DB 本地转远程:跨网络数据库管理配置指南

# Chat2DB 本地转远程:跨网络数据库管理配置指南

## 一、从本地到远程:Chat2DB架构解析

### 核心架构升级策略

将本地Chat2DB转换为远程可用服务,关键在于**服务化改造**和**安全访问控制**。原本的单体应用需要拆分为客户端-服务端架构:

```

架构升级对比:

本地模式:

├── 应用层 (Chat2DB GUI)

├── 数据库驱动层

├── 本地数据库连接

└── 文件系统存储

远程模式:

├── 客户端层 (Web/桌面GUI)

├── 网络通信层 (HTTP/WebSocket)

├── 服务端层 (业务逻辑处理)

├── 连接池管理

├── 安全认证层

└── 数据库代理层

```

### 技术选型与准备

```python

# requirements.txt - 远程服务所需依赖

fastapi==0.104.0

uvicorn[standard]==0.24.0

websockets==12.0

sqlalchemy==2.0.23

pymysql==1.1.0

psycopg2-binary==2.9.9

cryptography==41.0.7

jwt==1.3.1

redis==5.0.1  # 用于会话管理

```

## 二、服务端部署与配置

### 基础服务端实现

```python

# server/main.py - Chat2DB远程服务核心

from fastapi import FastAPI, Depends, HTTPException, WebSocket, WebSocketDisconnect

from fastapi.middleware.cors import CORSMiddleware

from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

from sqlalchemy import create_engine, text

from sqlalchemy.pool import QueuePool

import jwt

import asyncio

from typing import Dict, List, Optional

import json

import hashlib

from datetime import datetime, timedelta

app = FastAPI(title="Chat2DB Remote Server", version="1.0.0")

# CORS配置 - 允许跨域访问

app.add_middleware(

    CORSMiddleware,

    allow_origins=["*"],  # 生产环境应指定具体域名

    allow_credentials=True,

    allow_methods=["*"],

    allow_headers=["*"],

)

# 数据库连接池管理

class DatabaseManager:

    _instance = None

    _connections: Dict[str, Dict] = {}


    def __new__(cls):

        if cls._instance is None:

            cls._instance = super().__new__(cls)

        return cls._instance


    def create_engine_from_config(self, config: Dict):

        """根据配置创建数据库引擎"""

        db_type = config.get('type', 'mysql')

        host = config.get('host', 'localhost')

        port = config.get('port', 3306)

        database = config.get('database', '')

        username = config.get('username', '')

        password = config.get('password', '')


        # 生成连接标识符

        conn_id = hashlib.md5(

            f"{db_type}:{host}:{port}:{database}:{username}".encode()

        ).hexdigest()


        if conn_id in self._connections:

            return self._connections[conn_id]['engine']


        # 根据数据库类型构建连接字符串

        if db_type == 'mysql':

            connection_string = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}"

        elif db_type == 'postgresql':

            connection_string = f"postgresql://{username}:{password}@{host}:{port}/{database}"

        elif db_type == 'sqlite':

            connection_string = f"sqlite:///{database}"

        else:

            raise ValueError(f"不支持的数据库类型: {db_type}")


        # 创建带连接池的引擎

        engine = create_engine(

            connection_string,

            poolclass=QueuePool,

            pool_size=10,

            max_overflow=20,

            pool_pre_ping=True,

            pool_recycle=3600,

            echo=False  # 生产环境设为False

        )


        self._connections[conn_id] = {

            'engine': engine,

            'config': config,

            'last_used': datetime.now()

        }


        return engine

# JWT认证管理

class AuthManager:

    SECRET_KEY = "your-secret-key-change-in-production"  # 生产环境应从环境变量读取

    ALGORITHM = "HS256"


    @staticmethod

    def create_access_token(data: Dict, expires_delta: Optional[timedelta] = None):

        """创建访问令牌"""

        to_encode = data.copy()

        if expires_delta:

            expire = datetime.utcnow() + expires_delta

        else:

            expire = datetime.utcnow() + timedelta(hours=24)


        to_encode.update({"exp": expire})

        encoded_jwt = jwt.encode(to_encode, AuthManager.SECRET_KEY, algorithm=AuthManager.ALGORITHM)

        return encoded_jwt


    @staticmethod

    def verify_token(token: str):

        """验证JWT令牌"""

        try:

            payload = jwt.decode(token, AuthManager.SECRET_KEY, algorithms=[AuthManager.ALGORITHM])

            return payload

        except jwt.ExpiredSignatureError:

            raise HTTPException(status_code=401, detail="令牌已过期")

        except jwt.InvalidTokenError:

            raise HTTPException(status_code=401, detail="无效令牌")

# 依赖项:获取当前用户

security = HTTPBearer()

async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):

    """获取当前认证用户"""

    token = credentials.credentials

    payload = AuthManager.verify_token(token)

    return payload

# REST API端点

@app.post("/api/v1/connect")

async def connect_database(

    config: Dict,

    current_user: Dict = Depends(get_current_user)

):

    """连接数据库"""

    try:

        db_manager = DatabaseManager()

        engine = db_manager.create_engine_from_config(config)


        # 测试连接

        with engine.connect() as conn:

            result = conn.execute(text("SELECT 1"))

            result.fetchone()


        return {

            "success": True,

            "message": "数据库连接成功",

            "timestamp": datetime.now().isoformat()

        }

    except Exception as e:

        raise HTTPException(status_code=500, detail=f"数据库连接失败: {str(e)}")

@app.post("/api/v1/query")

async def execute_query(

    request: Dict,

    current_user: Dict = Depends(get_current_user)

):

    """执行SQL查询"""

    try:

        db_manager = DatabaseManager()

        engine = db_manager.create_engine_from_config(request['config'])


        sql = request['sql']

        params = request.get('parameters', {})


        with engine.connect() as conn:

            # 判断查询类型

            sql_lower = sql.strip().lower()

            is_select = sql_lower.startswith('select') or sql_lower.startswith('show')


            if is_select:

                # 查询操作

                result = conn.execute(text(sql), params)

                columns = result.keys()

                rows = [dict(zip(columns, row)) for row in result.fetchall()]


                return {

                    "success": True,

                    "type": "query",

                    "columns": list(columns),

                    "rows": rows,

                    "row_count": len(rows)

                }

            else:

                # 更新操作

                result = conn.execute(text(sql), params)

                conn.commit()


                return {

                    "success": True,

                    "type": "update",

                    "affected_rows": result.rowcount

                }


    except Exception as e:

        raise HTTPException(status_code=500, detail=f"查询执行失败: {str(e)}")

@app.get("/api/v1/databases")

async def list_databases(

    config: Dict,

    current_user: Dict = Depends(get_current_user)

):

    """列出所有数据库"""

    try:

        db_manager = DatabaseManager()

        engine = db_manager.create_engine_from_config(config)


        with engine.connect() as conn:

            # 根据不同数据库类型获取数据库列表

            db_type = config.get('type', 'mysql')


            if db_type == 'mysql':

                result = conn.execute(text("SHOW DATABASES"))

            elif db_type == 'postgresql':

                result = conn.execute(text("SELECT datname FROM pg_database WHERE datistemplate = false"))

            else:3G.P8H.HK|8N.E2C.HK|AX.W4E.HK

                result = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'"))


            databases = [row[0] for row in result.fetchall()]


            return {

                "success": True,

                "databases": databases

            }


    except Exception as e:

        raise HTTPException(status_code=500, detail=f"获取数据库列表失败: {str(e)}")

# WebSocket实时通信

class ConnectionManager:

    def __init__(self):

        self.active_connections: List[WebSocket] = []


    async def connect(self, websocket: WebSocket):

        await websocket.accept()

        self.active_connections.append(websocket)


    def disconnect(self, websocket: WebSocket):

        self.active_connections.remove(websocket)


    async def send_personal_message(self, message: str, websocket: WebSocket):

        await websocket.send_text(message)


    async def broadcast(self, message: str):

        for connection in self.active_connections:

            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")

async def websocket_endpoint(websocket: WebSocket, client_id: str):

    """WebSocket端点,用于实时数据传输"""

    await manager.connect(websocket)

    try:

        while True:

            data = await websocket.receive_text()

            message_data = json.loads(data)


            # 处理不同类型的消息

            message_type = message_data.get('type')


            if message_type == 'query':

                # 执行查询并返回结果

                response = await execute_query_async(message_data)

                await manager.send_personal_message(json.dumps(response), websocket)


            elif message_type == 'ping':

                # 心跳检测

                await manager.send_personal_message(json.dumps({

                    "type": "pong",

                    "timestamp": datetime.now().isoformat()

                }), websocket)


    except WebSocketDisconnect:

        manager.disconnect(websocket)

        await manager.broadcast(f"客户端 {client_id} 已断开连接")

async def execute_query_async(request_data: Dict):

    """异步执行查询"""

    # 这里可以实现异步查询逻辑

    return {"status": "processed", "data": request_data}

# 启动服务

if __name__ == "__main__":

    import uvicorn

    uvicorn.run(

        "main:app",

        host="0.0.0.0",  # 监听所有网络接口

        port=8000,

        reload=True,  # 开发模式启用热重载

        workers=4  # 生产环境根据CPU核心数调整

    )

```

### 服务端配置管理

```yaml

# config/production.yaml

server:

  host: "0.0.0.0"

  port: 8000

  workers: 4

  reload: false

  log_level: "info"

database:

  connections:

    max_pool_size: 20

    pool_recycle: 3600

    pool_timeout: 30

security:

  jwt_secret: "${JWT_SECRET}"  # 从环境变量读取

  token_expire_hours: 24

  cors_origins:

    - "https://yourdomain.com"

    - "http://localhost:3000"

rate_limiting:

  enabled: true

  max_requests: 100

  window_seconds: 60

logging:

  file: "logs/chat2db_server.log"

  max_size: "100MB"

  backup_count: 10

  format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

monitoring:

  prometheus_enabled: true

  health_check_interval: 30

```

### 启动脚本

```bash

#!/bin/bash

# start_server.sh

set -e

# 环境检查

check_environment() {

    echo "检查运行环境..."


    # 检查Python版本

    if ! command -v python3 &> /dev/null; then

        echo "错误: 未找到Python3"

        exit 1

    fi


    python_version=$(python3 --version | cut -d' ' -f2)

    echo "Python版本: $python_version"


    # 检查端口占用

    if lsof -Pi :8000 -sTCP:LISTEN -t >/dev/null ; then

        echo "错误: 端口8000已被占用"

        exit 1

    fi


    # 检查依赖

    if [ ! -f "requirements.txt" ]; then

        echo "错误: 未找到requirements.txt"

        exit 1

    fi

}

# 安装依赖

install_dependencies() {

    echo "安装Python依赖..."


    if [ ! -d "venv" ]; then

        echo "创建虚拟环境..."

        python3 -m venv venv

    fi


    source venv/bin/activate


    # 升级pip

    pip install --upgrade pip


    # 安装依赖包

    if [ -f "requirements.txt" ]; then

        pip install -r requirements.txt

    fi


    # 安装额外包

    pip install gunicorn

}

# 配置环境变量

setup_environment() {

    echo "配置环境变量..."


    # 生成JWT密钥

    if [ -z "$JWT_SECRET" ]; then

        export JWT_SECRET=$(openssl rand -hex 32)

        echo "已生成JWT_SECRET"

    fi


    # 设置数据库连接

    if [ -z "$DATABASE_URL" ]; then

        export DATABASE_URL="sqlite:///./chat2db.db"

    fi


    # 创建必要的目录

    mkdir -p logs backups

}

# 启动服务

start_service() {

    echo "启动Chat2DB远程服务..."


    # 根据环境选择启动方式

    if [ "$ENVIRONMENT" = "production" ]; then

        # 生产环境使用gunicorn

        gunicorn main:app \

            --workers 4 \

            --worker-class uvicorn.workers.UvicornWorker \

            --bind 0.0.0.0:8000 \

            --access-logfile logs/access.log \

            --error-logfile logs/error.log \

            --log-level info \

            --timeout 120 \

            --keep-alive 5

    else

        # 开发环境使用uvicorn

        source venv/bin/activate

        python -m uvicorn main:app \

            --host 0.0.0.0 \

            --port 8000 \

            --reload \

            --log-level debug

    fi

}

# 主函数

main() {

    ENVIRONMENT=${1:-development}


    echo "=== Chat2DB远程服务启动 ==="

    echo "环境: $ENVIRONMENT"


    check_environment

    install_dependencies

    setup_environment

    start_service

}

# 捕获退出信号

trap 'echo "服务停止"; exit 0' INT TERM

main "$@"

```

## 三、客户端适配改造

### Web客户端实现

```html

<!-- client/index.html - Web客户端主界面 -->

<!DOCTYPE html>

<html lang="zh-CN">

<head>

    <meta charset="UTF-8">

    <meta name="viewport" content="width=device-width, initial-scale=1.0">

    <title>远程Chat2DB客户端</title>

    <style>

        * { margin: 0; padding: 0; box-sizing: border-box; }


        body {

            font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;

            background: #f5f5f5;

        }


        .container {

            display: flex;

            height: 100vh;

        }


        .sidebar {

            width: 300px;

            background: white;

            border-right: 1px solid #e0e0e0;

            display: flex;

            flex-direction: column;

        }


        .main-content {

            flex: 1;

            display: flex;

            flex-direction: column;

        }


        .connection-panel {

            padding: 20px;

            border-bottom: 1px solid #e0e0e0;

        }


        .database-tree {

            flex: 1;

            overflow-y: auto;

            padding: 10px;

        }


        .editor-container {

            flex: 1;

            padding: 20px;

            display: flex;

            flex-direction: column;

        }


        .sql-editor {

            flex: 1;

            border: 1px solid #ddd;

            border-radius: 4px;

            font-family: 'Monaco', 'Menlo', monospace;

            font-size: 14px;1P.E8P.HK|7T.R6T.HK|2K.P8H.HK

            padding: 10px;

            resize: none;

        }


        .toolbar {

            padding: 10px 20px;

            background: white;

            border-bottom: 1px solid #e0e0e0;

            display: flex;

            gap: 10px;

        }


        .result-panel {

            padding: 20px;

            background: white;

            border-top: 1px solid #e0e0e0;

            overflow: auto;

        }


        .status-bar {

            padding: 5px 20px;

            background: #333;

            color: white;

            font-size: 12px;

        }


        .btn {

            padding: 8px 16px;

            background: #007bff;

            color: white;

            border: none;

            border-radius: 4px;

            cursor: pointer;

        }


        .btn:hover {

            background: #0056b3;

        }


        .btn-secondary {

            background: #6c757d;

        }


        table {

            width: 100%;

            border-collapse: collapse;

        }


        th, td {

            padding: 8px;

            border: 1px solid #ddd;

            text-align: left;

        }


        th {

            background: #f8f9fa;

            font-weight: 600;

        }

    </style>

</head>

<body>

    <div class="container">

        <!-- 左侧边栏 -->

        <div class="sidebar">

            <div class="connection-panel">

                <h3>数据库连接</h3>

                <div id="connection-list"></div>

                <button class="btn" onclick="showConnectionModal()">新建连接</button>

            </div>

            <div class="database-tree">

                <h4>数据库</h4>

                <div id="database-tree"></div>

            </div>

        </div>


        <!-- 主内容区 -->

        <div class="main-content">

            <!-- 工具栏 -->

            <div class="toolbar">

                <button class="btn" onclick="executeQuery()">执行查询 (F5)</button>

                <button class="btn btn-secondary" onclick="formatSQL()">格式化</button>

                <button class="btn" onclick="saveQuery()">保存查询</button>

                <button class="btn btn-secondary" onclick="clearEditor()">清空</button>

                <select id="database-select" onchange="switchDatabase()">

                    <option value="">选择数据库</option>

                </select>

                <div style="flex: 1"></div>

                <span id="connection-status">未连接</span>

            </div>


            <!-- SQL编辑器 -->

            <div class="editor-container">

                <textarea id="sql-editor" class="sql-editor"

                          placeholder="输入SQL语句..."

                          onkeydown="handleEditorKeydown(event)"></textarea>

            </div>


            <!-- 结果面板 -->

            <div class="result-panel">

                <h3>查询结果</h3>

                <div id="result-container"></div>

            </div>


            <!-- 状态栏 -->

            <div class="status-bar">

                <span id="row-count">0 行</span>

                <span style="margin-left: 20px" id="execution-time">执行时间: 0ms</span>

            </div>

        </div>

    </div>


    <!-- 连接配置模态框 -->

    <div id="connection-modal" style="display: none; position: fixed; top: 0; left: 0; width: 100%; height: 100%; background: rgba(0,0,0,0.5);">

        <div style="background: white; width: 500px; margin: 100px auto; padding: 20px; border-radius: 8px;">

            <h3>新建数据库连接</h3>

            <form id="connection-form">

                <div style="margin-bottom: 15px;">

                    <label>连接名称</label>

                    <input type="text" id="conn-name" required style="width: 100%; padding: 8px;">

                </div>

                <div style="margin-bottom: 15px;">

                    <label>数据库类型</label>

                    <select id="db-type" required style="width: 100%; padding: 8px;">

                        <option value="mysql">MySQL</option>

                        <option value="postgresql">PostgreSQL</option>

                        <option value="sqlite">SQLite</option>

                    </select>

                </div>

                <div style="display: flex; gap: 10px; margin-bottom: 15px;">

                    <div style="flex: 1">

                        <label>主机</label>

                        <input type="text" id="db-host" required style="width: 100%; padding: 8px;">

                    </div>

                    <div style="width: 100px">

                        <label>端口</label>

                        <input type="number" id="db-port" style="width: 100%; padding: 8px;">

                    </div>

                </div>

                <div style="margin-bottom: 15px;">

                    <label>数据库名</label>

                    <input type="text" id="db-name" required style="width: 100%; padding: 8px;">

                </div>

                <div style="margin-bottom: 15px;">

                    <label>用户名</label>

                    <input type="text" id="db-username" required style="width: 100%; padding: 8px;">

                </div>

                <div style="margin-bottom: 20px;">

                    <label>密码</label>

                    <input type="password" id="db-password" required style="width: 100%; padding: 8px;">

                </div>

                <div style="display: flex; gap: 10px; justify-content: flex-end;">

                    <button type="button" class="btn btn-secondary" onclick="hideConnectionModal()">取消</button>

                    <button type="submit" class="btn">测试连接并保存</button>

                </div>

            </form>

        </div>

    </div>


    <script>

        // 全局配置

        const API_BASE_URL = 'http://localhost:8000/api/v1';

        let currentConnection = null;

        let accessToken = null;


        // 页面加载时初始化

        document.addEventListener('DOMContentLoaded', function() {

            loadConnections();

            setupWebSocket();

            loadSavedQueries();

        });


        // WebSocket连接

        function setupWebSocket() {

            const ws = new WebSocket('ws://localhost:8000/ws/client-' + Date.now());


            ws.onopen = function() {

                updateStatus('已连接', 'green');

            };


            ws.onmessage = function(event) {

                const data = JSON.parse(event.data);

                handleWebSocketMessage(data);

            };


            ws.onclose = function() {

                updateStatus('连接断开', 'red');

                // 尝试重连

                setTimeout(setupWebSocket, 5000);

            };

        }


        // 认证相关

        async function login(username, password) {

            try {

                const response = await fetch(API_BASE_URL + '/auth/login', {

                    method: 'POST',

                    headers: {'Content-Type': 'application/json'},

                    body: JSON.stringify({username, password})

                });


                if (response.ok) {

                    const data = await response.json();

                    accessToken = data.token;

                    localStorage.setItem('access_token', accessToken);

                    return true;

                }

                return false;

            } catch (error) {

                console.error('登录失败:', error);

                return false;

            }

        }


        // 获取认证头

        function getAuthHeaders() {

            const token = accessToken || localStorage.getItem('access_token');

            return {

                'Content-Type': 'application/json',

                'Authorization': 'Bearer ' + token

            };

        }


        // 数据库连接管理

        async function testConnection(config) {

            try {

                const response = await fetch(API_BASE_URL + '/connect', {

                    method: 'POST',

                    headers: getAuthHeaders(),

                    body: JSON.stringify(config)

                });


                return await response.json();

            } catch (error) {

                return {success: false, message: '连接测试失败: ' + error.message};

            }

        }


        async function saveConnection(config) {

            const connections = JSON.parse(localStorage.getItem('connections') || '[]');

            connections.push({

                id: Date.now().toString(),

                ...config

            });

            localStorage.setItem('connections', JSON.stringify(connections));

        }


        function loadConnections() {

            const connections = JSON.parse(localStorage.getItem('connections') || '[]');

            const connectionList = document.getElementById('connection-list');


            connectionList.innerHTML = '';

            connections.forEach(conn => {

                const div = document.createElement('div');

                div.innerHTML = `

                    <div style="padding: 10px; border: 1px solid #ddd; margin-bottom: 5px; cursor: pointer;"

                        onclick="selectConnection('${conn.id}')">

                        <strong>${conn.name}</strong><br>

                        <small>${conn.type} - ${conn.host}</small>

                    </div>

                `;

                connectionList.appendChild(div);

            });

        }


        // SQL执行

        async function executeQuery() {

            const sql = document.getElementById('sql-editor').value;

            if (!sql.trim() || !currentConnection) {

                alert('请先选择数据库连接并输入SQL语句');

                return;

            }


            const startTime = Date.now();


            try {

                const response = await fetch(API_BASE_URL + '/query', {

                    method: 'POST',

                    headers: getAuthHeaders(),

                    body: JSON.stringify({

                        config: currentConnection,

                        sql: sql,

                        parameters: {}

                    })

                });


                const result = await response.json();

                displayResult(result);


                const executionTime = Date.now() - startTime;

                document.getElementById('execution-time').textContent =

                    `执行时间: ${executionTime}ms`;


            } catch (error) {

                showError('查询执行失败: ' + error.message);

            }

        }


        // 结果显示

        function displayResult(result) {

            const container = document.getElementById('result-container');


            if (!result.success) {

                container.innerHTML = `<div style="color: red;">${result.message}</div>`;

                return;

            }


            if (result.type === 'query') {

                // 显示表格

                let html = `<div>返回 ${result.row_count} 行</div>`;


                if (result.rows.length > 0) {

                    html += '<table>';


                    // 表头

                    html += '<tr>';

                    result.columns.forEach(col => {

                        html += `<th>${col}</th>`;

                    });

                    html += '</tr>';


                    // 数据行

                    result.rows.forEach(row => {

                        html += '<tr>';

                        result.columns.forEach(col => {

                            html += `<td>${row[col]}</td>`;

                        });

                        html += '</tr>';

                    });


                    html += '</table>';

                }


                container.innerHTML = html;

                document.getElementById('row-count').textContent =

                    `${result.row_count} 行`;


            } else {

                container.innerHTML = `<div>影响行数: ${result.affected_rows}</div>`;

            }

        }


        // 辅助函数

        function showConnectionModal() {

            document.getElementById('connection-modal').style.display = 'block';

        }


        function hideConnectionModal() {

            document.getElementById('connection-modal').style.display = 'none';

            document.getElementById('connection-form').reset();

        }


        function updateStatus(text, color) {

            const elem = document.getElementById('connection-status');

            elem.textContent = text;

            elem.style.color = color;

        }


        function showError(message) {

            alert('错误: ' + message);

        }


        // 键盘快捷键

        function handleEditorKeydown(event) {

            if (event.key === 'F5') {

                event.preventDefault();

                executeQuery();

            }

        }


        // 表单提交

        document.getElementById('connection-form').addEventListener('submit', async function(e) {

            e.preventDefault();


            const config = {

                name: document.getElementById('conn-name').value,

                type: document.getElementById('db-type').value,

                host: document.getElementById('db-host').value,

                port: parseInt(document.getElementById('db-port').value) || 3306,

                database: document.getElementById('db-name').value,

                username: document.getElementById('db-username').value,

                password: document.getElementById('db-password').value

            };


            const testResult = await testConnection(config);

            if (testResult.success) {

                await saveConnection(config);

                loadConnections();

                hideConnectionModal();

                alert('连接测试成功并已保存');

            } else {

                alert('连接测试失败: ' + testResult.message);

            }

        });

    </script>

</body>

</html>

```

### 桌面客户端集成

```python

# client/desktop_client.py - 桌面客户端

import tkinter as tk

from tkinter import ttk, scrolledtext, messagebox

import requests

import json

import threading

from datetime import datetime

class Chat2DBRemoteClient:

    def __init__(self, root):

        self.root = root

        self.root.title("Chat2DB远程客户端")

        self.root.geometry("1200x800")


        self.api_base = "http://localhost:8000/api/v1"

        self.access_token = None


        self.setup_ui()

        self.load_settings()


    def setup_ui(self):

        """设置用户界面"""

        # 创建菜单栏

        menubar = tk.Menu(self.root)

        self.root.config(menu=menubar)


        # 文件菜单

        file_menu = tk.Menu(menubar, tearoff=0)

        menubar.add_cascade(label="文件", menu=file_menu)

        file_menu.add_command(label="新建连接", command=self.new_connection)

        file_menu.add_separator()

        file_menu.add_command(label="退出", command=self.root.quit)


        # 主界面布局

        main_panel = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL)

        main_panel.pack(fill=tk.BOTH, expand=True)


        # 左侧连接面板

        left_frame = ttk.Frame(main_panel)

        main_panel.add(left_frame, weight=1)


        # 连接列表

        ttk.Label(left_frame, text="数据库连接").pack(pady=5)

        self.connection_listbox = tk.Listbox(left_frame, height=15)

        self.connection_listbox.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)

        self.connection_listbox.bind('<<ListboxSelect>>', self.on_connection_select)


        # 右侧主面板

        right_panel = ttk.PanedWindow(main_panel, orient=tk.VERTICAL)

        main_panel.add(right_panel, weight=3)


        # SQL编辑器

        editor_frame = ttk.LabelFrame(right_panel, text="SQL编辑器")

        right_panel.add(editor_frame, weight=2)


        self.sql_editor = scrolledtext.ScrolledText(

            editor_frame,

            wrap=tk.WORD,

            font=("Consolas", 11)

        )

        self.sql_editor.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)


        # 工具栏

        toolbar = ttk.Frame(editor_frame)

        toolbar.pack(fill=tk.X, padx=5, pady=2)


        ttk.Button(toolbar, text="执行(F5)",

                  command=self.execute_query).pack(side=tk.LEFT, padx=2)

        ttk.Button(toolbar, text="清除",

                  command=self.clear_editor).pack(side=tk.LEFT, padx=2)

        ttk.Button(toolbar, text="保存",

                  command=self.save_query).pack(side=tk.LEFT, padx=2)


        # 结果面板

        result_frame = ttk.LabelFrame(right_panel, text="查询结果")

        right_panel.add(result_frame, weight=1)


        # 结果树状表格

        columns = ("#", "字段1", "字段2", "字段3")

        self.result_tree = ttk.Treeview(result_frame, columns=columns, show="headings")


        for col in columns:

            self.result_tree.heading(col, text=col)

            self.result_tree.column(col, width=100)


        vsb = ttk.Scrollbar(result_frame, orient="vertical",

                          command=self.result_tree.yview)

        hsb = ttk.Scrollbar(result_frame, orient="horizontal",

                          command=self.result_tree.xview)


        self.result_tree.configure(yscrollcommand=vsb.set,

                                  xscrollcommand=hsb.set)


        self.result_tree.grid(row=0, column=0, sticky="nsew")

        vsb.grid(row=0, column=1, sticky="ns")

        hsb.grid(row=1, column=0, sticky="ew")


        result_frame.grid_rowconfigure(0, weight=1)

        result_frame.grid_columnconfigure(0, weight=1)


        # 状态栏

        self.status_bar = ttk.Label(self.root, text="就绪", relief=tk.SUNKEN)

        self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)


        # 绑定快捷键

        self.root.bind('<F5>', lambda e: self.execute_query())


    def load_settings(self):

        """加载设置"""

        try:

            with open('settings.json', 'r') as f:

                settings = json.load(f)

                self.access_token = settings.get('access_token')


                # 加载连接列表

                connections = settings.get('connections', [])

                for conn in connections:

                    self.connection_listbox.insert(tk.END, conn['name'])

        except FileNotFoundError:

            pass


    def save_settings(self):

        """保存设置"""

        settings = {

            'access_token': self.access_token,

            'connections': self.connections

        }

        with open('settings.json', 'w') as f:

            json.dump(settings, f, indent=2)


    def new_connection(self):

        """新建连接对话框"""

        dialog = tk.Toplevel(self.root)

        dialog.title("新建数据库连接")

        dialog.geometry("400x350")


        # 连接配置表单

        ttk.Label(dialog, text="连接名称:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)

        name_entry = ttk.Entry(dialog, width=30)

        name_entry.grid(row=0, column=1, padx=5, pady=5)


        ttk.Label(dialog, text="数据库类型:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)

        db_type = ttk.Combobox(dialog, values=["mysql", "postgresql", "sqlite"], width=27)

        db_type.grid(row=1, column=1, padx=5, pady=5)

        db_type.current(0)


        ttk.Label(dialog, text="主机地址:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)

        host_entry = ttk.Entry(dialog, width=30)

        host_entry.grid(row=2, column=1, padx=5, pady=5)

        host_entry.insert(0, "localhost")


        ttk.Label(dialog, text="端口:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)

        port_entry = ttk.Entry(dialog, width=30)

        port_entry.grid(row=3, column=1, padx=5, pady=5)

        port_entry.insert(0, "3306")


        ttk.Label(dialog, text="数据库名:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W)

        db_entry = ttk.Entry(dialog, width=30)

        db_entry.grid(row=4, column=1, padx=5, pady=5)


        ttk.Label(dialog, text="用户名:").grid(row=5, column=0, padx=5, pady=5, sticky=tk.W)

        user_entry = ttk.Entry(dialog, width=30)

        user_entry.grid(row=5, column=1, padx=5, pady=5)


        ttk.Label(dialog, text="密码:").grid(row=6, column=0, padx=5, pady=5, sticky=tk.W)

        pass_entry = ttk.Entry(dialog, width=30, show="*")

        pass_entry.grid(row=6, column=1, padx=5, pady=5)


        def test_and_save():

            """测试连接并保存"""

            config = {

                'name': name_entry.get(),

                'type': db_type.get(),

                'host': host_entry.get(),

                'port': int(port_entry.get()),

                'database': db_entry.get(),

                'username': user_entry.get(),

                'password': pass_entry.get()

            }


            if self.test_connection(config):

                self.save_connection(config)

                self.connection_listbox.insert(tk.END, config['name'])

                dialog.destroy()

                messagebox.showinfo("成功", "连接测试成功并已保存")

            else:

                messagebox.showerror("错误", "连接测试失败")


        ttk.Button(dialog, text="测试连接",

                  command=test_and_save).grid(row=7, column=0, padx=5, pady=10)

        ttk.Button(dialog, text="取消",

                  command=dialog.destroy).grid(row=7, column=1, padx=5, pady=10)


    def test_connection(self, config):

        """测试数据库连接"""

        try:

            response = requests.post(

                f"{self.api_base}/connect",

                headers=self.get_auth_headers(),

                json=config,

                timeout=10

            )

            return response.status_code == 200

        except Exception as e:9F.E2C.HK|4Z.W4E.HK|JD.E8P.HK|NW.R6T.HK

            self.status_bar.config(text=f"连接测试失败: {str(e)}")

            return False


    def save_connection(self, config):

        """保存连接配置"""

        if not hasattr(self, 'connections'):

            self.connections = []


        self.connections.append(config)

        self.save_settings()


    def on_connection_select(self, event):

        """选择连接"""

        selection = self.connection_listbox.curselection()

        if selection:

            index = selection[0]

            self.current_connection = self.connections[index]

            self.status_bar.config(text=f"已选择: {self.current_connection['name']}")


    def execute_query(self):

        """执行SQL查询"""

        if not hasattr(self, 'current_connection'):

            messagebox.showwarning("警告", "请先选择数据库连接")

            return


        sql = self.sql_editor.get("1.0", tk.END).strip()

        if not sql:

            messagebox.showwarning("警告", "请输入SQL语句")

            return


        # 在新线程中执行查询,避免界面卡顿

        thread = threading.Thread(target=self._execute_query_thread, args=(sql,))

        thread.daemon = True

        thread.start()


    def _execute_query_thread(self, sql):

        """查询执行线程"""

        try:

            self.status_bar.config(text="执行中...")


            response = requests.post(

                f"{self.api_base}/query",

                headers=self.get_auth_headers(),

                json={

                    'config': self.current_connection,

                    'sql': sql,

                    'parameters': {}

                },

                timeout=30

            )


            if response.status_code == 200:

                result = response.json()

                self.display_result(result)

                self.status_bar.config(text="查询执行完成")

            else:

                messagebox.showerror("错误", f"查询失败: {response.text}")


        except Exception as e:

            self.status_bar.config(text=f"执行错误: {str(e)}")

            messagebox.showerror("错误", str(e))


    def display_result(self, result):

        """显示查询结果"""

        # 清空现有结果

        for item in self.result_tree.get_children():

            self.result_tree.delete(item)


        if result['type'] == 'query':

            # 更新列标题

            columns = result['columns']

            self.result_tree['columns'] = columns


            for col in columns:

                self.result_tree.heading(col, text=col)

                self.result_tree.column(col, width=100)


            # 添加数据行

            for i, row in enumerate(result['rows'], 1):

                values = [i] + [row.get(col, '') for col in columns]

                self.result_tree.insert('', tk.END, values=values)


            self.status_bar.config(text=f"返回 {len(result['rows'])} 行")

        else:

            self.status_bar.config(text=f"影响行数: {result.get('affected_rows', 0)}")


    def clear_editor(self):

        """清空编辑器"""

        self.sql_editor.delete("1.0", tk.END)


    def save_query(self):

        """保存查询"""

        sql = self.sql_editor.get("1.0", tk.END).strip()

        if not sql:

            messagebox.showwarning("警告", "没有内容可保存")

            return


        filename = tk.filedialog.asksaveasfilename(

            defaultextension=".sql",

            filetypes=[("SQL文件", "*.sql"), ("所有文件", "*.*")]

        )


        if filename:

            try:

                with open(filename, 'w') as f:

                    f.write(sql)

                self.status_bar.config(text=f"已保存到: {filename}")

            except Exception as e:

                messagebox.showerror("错误", f"保存失败: {str(e)}")


    def get_auth_headers(self):

        """获取认证头"""

        return {

            'Content-Type': 'application/json',

            'Authorization': f'Bearer {self.access_token}'

        }

def main():

    root = tk.Tk()

    app = Chat2DBRemoteClient(root)

    root.mainloop()

if __name__ == "__main__":

    main()

```

## 四、安全配置与网络优化

### Nginx反向代理配置

```nginx

# nginx/nginx.conf

user nginx;

worker_processes auto;

error_log /var/log/nginx/error.log warn;

pid /var/run/nginx.pid;

events {

    worker_connections 1024;

    use epoll;

}

http {

    include /etc/nginx/mime.types;

    default_type application/octet-stream;

    log_format main '$remote_addr - $remote_user [$time_local] "$request" '

                    '$status $body_bytes_sent "$http_referer" '

                    '"$http_user_agent" "$http_x_forwarded_for"';

    access_log /var/log/nginx/access.log main;

    sendfile on;

    tcp_nopush on;

    tcp_nodelay on;

    keepalive_timeout 65;

    types_hash_max_size 2048;

    # Gzip压缩

    gzip on;

    gzip_vary on;

    gzip_min_length 1024;

    gzip_types text/plain text/css text/xml text/javascript

              application/json application/javascript application/xml+rss;

    # 上游服务配置

    upstream chat2db_backend {

        server localhost:8000;

        keepalive 32;

    }

    server {

        listen 80;

        server_name yourdomain.com;


        # HTTP重定向到HTTPS

        return 301 https://$server_name$request_uri;

    }

    server {

        listen 443 ssl http2;

        server_name yourdomain.com;

        # SSL证书配置

        ssl_certificate /etc/nginx/ssl/yourdomain.crt;

        ssl_certificate_key /etc/nginx/ssl/yourdomain.key;

        ssl_session_timeout 1d;

        ssl_session_cache shared:SSL:50m;

        ssl_session_tickets off;

        # SSL协议和加密套件

        ssl_protocols TLSv1.2 TLSv1.3;

        ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;

        ssl_prefer_server_ciphers off;

        # HSTS

        add_header Strict-Transport-Security "max-age=63072000" always;

        # 安全头

        add_header X-Frame-Options "SAMEORIGIN" always;

        add_header X-Content-Type-Options "nosniff" always;

        add_header X-XSS-Protection "1; mode=block" always;

        # 反向代理到Chat2DB服务

        location / {

            proxy_pass http://chat2db_backend;

            proxy_http_version 1.1;

            proxy_set_header Upgrade $http_upgrade;

            proxy_set_header Connection "upgrade";

            proxy_set_header Host $host;

            proxy_set_header X-Real-IP $remote_addr;

            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

            proxy_set_header X-Forwarded-Proto $scheme;


            # 代理超时设置

            proxy_connect_timeout 60s;

            proxy_send_timeout 60s;

            proxy_read_timeout 60s;


            # 缓冲区优化

            proxy_buffering on;

            proxy_buffer_size 4k;

            proxy_buffers 8 4k;

            proxy_busy_buffers_size 8k;

        }

        # WebSocket支持

        location /ws/ {

            proxy_pass http://chat2db_backend;

            proxy_http_version 1.1;

            proxy_set_header Upgrade $http_upgrade;

            proxy_set_header Connection "Upgrade";

            proxy_set_header Host $host;

            proxy_set_header X-Real-IP $remote_addr;


            # WebSocket超时设置

            proxy_read_timeout 3600s;

            proxy_send_timeout 3600s;

        }

        # 静态文件服务

        location /static/ {

            alias /path/to/static/files/;

            expires 1y;

            add_header Cache-Control "public, immutable";

        }

        # 健康检查端点

        location /health {

            access_log off;

            proxy_pass http://chat2db_backend/health;

        }

        # 限制请求大小

        client_max_body_size 10m;

    }

}

```

### 防火墙与安全配置

```bash

#!/bin/bash

# security_setup.sh

set -e

echo "=== 安全配置脚本 ==="

# 1. 配置防火墙

setup_firewall() {

    echo "配置防火墙..."


    # 检查ufw是否安装

    if command -v ufw &> /dev/null; then

        ufw --force reset

        ufw default deny incoming

        ufw default allow outgoing


        # 开放必要端口

        ufw allow 22/tcp comment 'SSH'

        ufw allow 80/tcp comment 'HTTP'

        ufw allow 443/tcp comment 'HTTPS'

        ufw allow 8000/tcp comment 'Chat2DB API'


        ufw --force enable

        ufw status verbose

    else

        echo "警告: ufw未安装,跳过防火墙配置"

    fi

}

# 2. 创建专用用户

setup_user() {

    echo "创建专用用户..."


    if id -u chat2db >/dev/null 2>&1; then

        echo "用户chat2db已存在"

    else

        useradd -r -s /usr/sbin/nologin -M chat2db

        echo "已创建用户chat2db"

    fi

}

# 3. 配置目录权限

setup_permissions() {

    echo "配置目录权限..."


    # 创建数据目录

    mkdir -p /var/lib/chat2db/{data,logs,backups}


    # 设置所有权

    chown -R chat2db:chat2db /var/lib/chat2db

    chmod -R 750 /var/lib/chat2db


    # 配置文件权限

    chmod 600 /etc/chat2db/config.yaml

}

# 4. 配置SSL证书

setup_ssl() {

    echo "配置SSL证书..."


    SSL_DIR="/etc/nginx/ssl"

    mkdir -p $SSL_DIR


    if [ ! -f "$SSL_DIR/yourdomain.crt" ] || [ ! -f "$SSL_DIR/yourdomain.key" ]; then

        echo "生成自签名证书..."


        openssl req -x509 -nodes -days 365 -newkey rsa:2048 \

            -keyout "$SSL_DIR/yourdomain.key" \

            -out "$SSL_DIR/yourdomain.crt" \

            -subj "/C=CN/ST=Beijing/L=Beijing/O=Chat2DB/CN=yourdomain.com"


        chmod 600 "$SSL_DIR/yourdomain.key"

        chmod 644 "$SSL_DIR/yourdomain.crt"

    else

        echo "SSL证书已存在"

    fi

}

# 5. 配置系统服务

setup_service() {

    echo "配置系统服务..."


    cat > /etc/systemd/system/chat2db.service << 'EOF'

[Unit]

Description=Chat2DB Remote Server

After=network.target

Requires=network.target

[Service]

Type=simple

User=chat2db

Group=chat2db

WorkingDirectory=/var/lib/chat2db

ExecStart=/var/lib/chat2db/venv/bin/python -m uvicorn main:app \

    --host 0.0.0.0 \

    --port 8000 \

    --workers 4

ExecReload=/bin/kill -HUP $MAINPID

Restart=on-failure

RestartSec=5s

TimeoutStopSec=30

# 安全配置

NoNewPrivileges=true

PrivateTmp=true

ProtectSystem=strict

ProtectHome=true

ReadWritePaths=/var/lib/chat2db

# 资源限制

LimitNOFILE=65536

LimitNPROC=4096

[Install]

WantedBy=multi-user.target

EOF

    systemctl daemon-reload

    systemctl enable chat2db.service

    systemctl start chat2db.service


    echo "服务状态:"

    systemctl status chat2db.service --no-pager

}

# 6. 配置日志轮转

setup_logrotate() {

    echo "配置日志轮转..."


    cat > /etc/logrotate.d/chat2db << 'EOF'

/var/lib/chat2db/logs/*.log {

    daily

    missingok

    rotate 30

    compress

    delaycompress

    notifempty

    create 640 chat2db chat2db

    sharedscripts

    postrotate

        systemctl reload chat2db.service > /dev/null 2>&1 || true

    endscript

}

EOF

}

# 7. 监控配置

setup_monitoring() {

    echo "配置监控..."


    # 安装并配置Prometheus exporter

    if [ ! -f "/usr/local/bin/prometheus_exporter.py" ]; then

        cat > /usr/local/bin/prometheus_exporter.py << 'EOF'

#!/usr/bin/env python3

from prometheus_client import start_http_server, Gauge

import time

import psutil

import requests

# 定义监控指标

cpu_usage = Gauge('chat2db_cpu_usage', 'CPU使用率')

memory_usage = Gauge('chat2db_memory_usage', '内存使用率')

active_connections = Gauge('chat2db_active_connections', '活跃连接数')

query_count = Gauge('chat2db_query_count', '查询计数')

def collect_metrics():

    """收集监控指标"""

    # 系统指标

    cpu_usage.set(psutil.cpu_percent())

    memory_usage.set(psutil.virtual_memory().percent)


    # 应用指标

    try:

        response = requests.get('http://localhost:8000/health', timeout=5)

        if response.status_code == 200:

            data = response.json()

            active_connections.set(data.get('active_connections', 0))

            query_count.set(data.get('query_count', 0))

    except:

        pass

if __name__ == '__main__':

    # 在9100端口启动Prometheus metrics服务

    start_http_server(9100)


    while True:

        collect_metrics()

        time.sleep(15)

EOF

        chmod +x /usr/local/bin/prometheus_exporter.py


        # 创建systemd服务

        cat > /etc/systemd/system/chat2db-exporter.service << 'EOF'

[Unit]

Description=Chat2DB Prometheus Exporter

After=network.target

[Service]

Type=simple

User=chat2db

ExecStart=/usr/local/bin/prometheus_exporter.py

Restart=always

[Install]

WantedBy=multi-user.target

EOF

        systemctl daemon-reload

        systemctl enable chat2db-exporter.service

        systemctl start chat2db-exporter.service

    fi

}

# 主函数

main() {

    setup_firewall

    setup_user

    setup_permissions

    setup_ssl

    setup_service

    setup_logrotate

    setup_monitoring


    echo "=== 安全配置完成 ==="

    echo "请确保:"

    echo "1. 修改默认密码"

    echo "2. 更新SSL证书"

    echo "3. 配置数据库访问限制"

}

main "$@"

```

## 五、部署与维护

### Docker化部署

```dockerfile

# Dockerfile

FROM python:3.11-slim

# 设置环境变量

ENV PYTHONDONTWRITEBYTECODE=1

ENV PYTHONUNBUFFERED=1

ENV PORT=8000

# 创建工作目录

WORKDIR /app

# 安装系统依赖

RUN apt-get update \

    && apt-get install -y --no-install-recommends \

        gcc \

        g++ \

        libpq-dev \

        default-libmysqlclient-dev \

        pkg-config \

    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件

COPY requirements.txt .

# 安装Python依赖

RUN pip install --no-cache-dir --upgrade pip \

    && pip install --no-cache-dir -r requirements.txt

# 复制应用代码

COPY . .

# 创建非root用户

RUN useradd -m -u 1000 chat2db \

    && chown -R chat2db:chat2db /app

USER chat2db

# 暴露端口

EXPOSE $PORT

# 启动命令

CMD ["sh", "-c", "python -m uvicorn main:app --host 0.0.0.0 --port ${PORT} --workers 4"]

```

### 使用说明文档

```markdown

# Chat2DB远程部署指南

## 快速开始

### 1. 服务端部署

```bash

# 克隆仓库

git clone https://github.com/yourname/chat2db-remote.git

cd chat2db-remote/server

# 安装依赖

python -m venv venv

source venv/bin/activate

pip install -r requirements.txt

# 配置环境变量

cp .env.example .env

# 编辑.env文件,设置JWT密钥等

# 启动服务

python main.py

```

### 2. 客户端访问

**Web客户端:**

打开浏览器访问 `http://localhost:3000`

**桌面客户端:**

```bash

cd client

python desktop_client.py

```

## 高级配置

### 多环境部署

**开发环境:**

```bash

export ENVIRONMENT=development

python main.py

```

**生产环境:**

```bash

export ENVIRONMENT=production

gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker

```

### 数据库支持

支持以下数据库类型:

- MySQL 5.7+

- PostgreSQL 10+

- SQLite 3

## 安全建议

1. **修改默认密码**: 首次部署后立即修改默认密码

2. **启用HTTPS**: 生产环境必须使用HTTPS

3. **限制IP访问**: 通过防火墙限制访问来源

4. **定期备份**: 配置自动备份策略

5. **更新日志**: 监控访问日志和安全日志

## 故障排除

### 常见问题

1. **连接被拒绝**: 检查防火墙和端口配置

2. **认证失败**: 验证JWT令牌和数据库凭据

3. **性能问题**: 检查数据库连接池和索引

### 获取帮助

- 查看日志文件: `logs/chat2db_server.log`

- 检查服务状态: `systemctl status chat2db`

- 社区支持: [GitHub Issues](https://github.com/yourname/chat2db-remote/issues)

```

通过以上配置,可以将本地Chat2DB轻松转换为远程可用的数据库管理工具。这种方案不仅保持了Chat2DB原有的易用性,还增加了跨网络访问的能力,同时通过多重安全措施保障了数据访问的安全性。无论是个人使用还是团队协作,都能获得良好的使用体验。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容