# Chat2DB 本地转远程:跨网络数据库管理配置指南
## 一、从本地到远程:Chat2DB架构解析
### 核心架构升级策略
将本地Chat2DB转换为远程可用服务,关键在于**服务化改造**和**安全访问控制**。原本的单体应用需要拆分为客户端-服务端架构:
```
架构升级对比:
本地模式:
├── 应用层 (Chat2DB GUI)
├── 数据库驱动层
├── 本地数据库连接
└── 文件系统存储
远程模式:
├── 客户端层 (Web/桌面GUI)
├── 网络通信层 (HTTP/WebSocket)
├── 服务端层 (业务逻辑处理)
├── 连接池管理
├── 安全认证层
└── 数据库代理层
```
### 技术选型与准备
```python
# requirements.txt - 远程服务所需依赖
fastapi==0.104.0
uvicorn[standard]==0.24.0
websockets==12.0
sqlalchemy==2.0.23
pymysql==1.1.0
psycopg2-binary==2.9.9
cryptography==41.0.7
jwt==1.3.1
redis==5.0.1 # 用于会话管理
```
## 二、服务端部署与配置
### 基础服务端实现
```python
# server/main.py - Chat2DB远程服务核心
from fastapi import FastAPI, Depends, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy import create_engine, text
from sqlalchemy.pool import QueuePool
import jwt
import asyncio
from typing import Dict, List, Optional
import json
import hashlib
from datetime import datetime, timedelta
app = FastAPI(title="Chat2DB Remote Server", version="1.0.0")
# CORS配置 - 允许跨域访问
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应指定具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 数据库连接池管理
class DatabaseManager:
_instance = None
_connections: Dict[str, Dict] = {}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def create_engine_from_config(self, config: Dict):
"""根据配置创建数据库引擎"""
db_type = config.get('type', 'mysql')
host = config.get('host', 'localhost')
port = config.get('port', 3306)
database = config.get('database', '')
username = config.get('username', '')
password = config.get('password', '')
# 生成连接标识符
conn_id = hashlib.md5(
f"{db_type}:{host}:{port}:{database}:{username}".encode()
).hexdigest()
if conn_id in self._connections:
return self._connections[conn_id]['engine']
# 根据数据库类型构建连接字符串
if db_type == 'mysql':
connection_string = f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}"
elif db_type == 'postgresql':
connection_string = f"postgresql://{username}:{password}@{host}:{port}/{database}"
elif db_type == 'sqlite':
connection_string = f"sqlite:///{database}"
else:
raise ValueError(f"不支持的数据库类型: {db_type}")
# 创建带连接池的引擎
engine = create_engine(
connection_string,
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600,
echo=False # 生产环境设为False
)
self._connections[conn_id] = {
'engine': engine,
'config': config,
'last_used': datetime.now()
}
return engine
# JWT认证管理
class AuthManager:
SECRET_KEY = "your-secret-key-change-in-production" # 生产环境应从环境变量读取
ALGORITHM = "HS256"
@staticmethod
def create_access_token(data: Dict, expires_delta: Optional[timedelta] = None):
"""创建访问令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=24)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, AuthManager.SECRET_KEY, algorithm=AuthManager.ALGORITHM)
return encoded_jwt
@staticmethod
def verify_token(token: str):
"""验证JWT令牌"""
try:
payload = jwt.decode(token, AuthManager.SECRET_KEY, algorithms=[AuthManager.ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="令牌已过期")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="无效令牌")
# 依赖项:获取当前用户
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""获取当前认证用户"""
token = credentials.credentials
payload = AuthManager.verify_token(token)
return payload
# REST API端点
@app.post("/api/v1/connect")
async def connect_database(
config: Dict,
current_user: Dict = Depends(get_current_user)
):
"""连接数据库"""
try:
db_manager = DatabaseManager()
engine = db_manager.create_engine_from_config(config)
# 测试连接
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
result.fetchone()
return {
"success": True,
"message": "数据库连接成功",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"数据库连接失败: {str(e)}")
@app.post("/api/v1/query")
async def execute_query(
request: Dict,
current_user: Dict = Depends(get_current_user)
):
"""执行SQL查询"""
try:
db_manager = DatabaseManager()
engine = db_manager.create_engine_from_config(request['config'])
sql = request['sql']
params = request.get('parameters', {})
with engine.connect() as conn:
# 判断查询类型
sql_lower = sql.strip().lower()
is_select = sql_lower.startswith('select') or sql_lower.startswith('show')
if is_select:
# 查询操作
result = conn.execute(text(sql), params)
columns = result.keys()
rows = [dict(zip(columns, row)) for row in result.fetchall()]
return {
"success": True,
"type": "query",
"columns": list(columns),
"rows": rows,
"row_count": len(rows)
}
else:
# 更新操作
result = conn.execute(text(sql), params)
conn.commit()
return {
"success": True,
"type": "update",
"affected_rows": result.rowcount
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"查询执行失败: {str(e)}")
@app.get("/api/v1/databases")
async def list_databases(
config: Dict,
current_user: Dict = Depends(get_current_user)
):
"""列出所有数据库"""
try:
db_manager = DatabaseManager()
engine = db_manager.create_engine_from_config(config)
with engine.connect() as conn:
# 根据不同数据库类型获取数据库列表
db_type = config.get('type', 'mysql')
if db_type == 'mysql':
result = conn.execute(text("SHOW DATABASES"))
elif db_type == 'postgresql':
result = conn.execute(text("SELECT datname FROM pg_database WHERE datistemplate = false"))
else:3G.P8H.HK|8N.E2C.HK|AX.W4E.HK
result = conn.execute(text("SELECT name FROM sqlite_master WHERE type='table'"))
databases = [row[0] for row in result.fetchall()]
return {
"success": True,
"databases": databases
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取数据库列表失败: {str(e)}")
# WebSocket实时通信
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
"""WebSocket端点,用于实时数据传输"""
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
# 处理不同类型的消息
message_type = message_data.get('type')
if message_type == 'query':
# 执行查询并返回结果
response = await execute_query_async(message_data)
await manager.send_personal_message(json.dumps(response), websocket)
elif message_type == 'ping':
# 心跳检测
await manager.send_personal_message(json.dumps({
"type": "pong",
"timestamp": datetime.now().isoformat()
}), websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"客户端 {client_id} 已断开连接")
async def execute_query_async(request_data: Dict):
"""异步执行查询"""
# 这里可以实现异步查询逻辑
return {"status": "processed", "data": request_data}
# 启动服务
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0", # 监听所有网络接口
port=8000,
reload=True, # 开发模式启用热重载
workers=4 # 生产环境根据CPU核心数调整
)
```
### 服务端配置管理
```yaml
# config/production.yaml
server:
host: "0.0.0.0"
port: 8000
workers: 4
reload: false
log_level: "info"
database:
connections:
max_pool_size: 20
pool_recycle: 3600
pool_timeout: 30
security:
jwt_secret: "${JWT_SECRET}" # 从环境变量读取
token_expire_hours: 24
cors_origins:
- "https://yourdomain.com"
- "http://localhost:3000"
rate_limiting:
enabled: true
max_requests: 100
window_seconds: 60
logging:
file: "logs/chat2db_server.log"
max_size: "100MB"
backup_count: 10
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
monitoring:
prometheus_enabled: true
health_check_interval: 30
```
### 启动脚本
```bash
#!/bin/bash
# start_server.sh
set -e
# 环境检查
check_environment() {
echo "检查运行环境..."
# 检查Python版本
if ! command -v python3 &> /dev/null; then
echo "错误: 未找到Python3"
exit 1
fi
python_version=$(python3 --version | cut -d' ' -f2)
echo "Python版本: $python_version"
# 检查端口占用
if lsof -Pi :8000 -sTCP:LISTEN -t >/dev/null ; then
echo "错误: 端口8000已被占用"
exit 1
fi
# 检查依赖
if [ ! -f "requirements.txt" ]; then
echo "错误: 未找到requirements.txt"
exit 1
fi
}
# 安装依赖
install_dependencies() {
echo "安装Python依赖..."
if [ ! -d "venv" ]; then
echo "创建虚拟环境..."
python3 -m venv venv
fi
source venv/bin/activate
# 升级pip
pip install --upgrade pip
# 安装依赖包
if [ -f "requirements.txt" ]; then
pip install -r requirements.txt
fi
# 安装额外包
pip install gunicorn
}
# 配置环境变量
setup_environment() {
echo "配置环境变量..."
# 生成JWT密钥
if [ -z "$JWT_SECRET" ]; then
export JWT_SECRET=$(openssl rand -hex 32)
echo "已生成JWT_SECRET"
fi
# 设置数据库连接
if [ -z "$DATABASE_URL" ]; then
export DATABASE_URL="sqlite:///./chat2db.db"
fi
# 创建必要的目录
mkdir -p logs backups
}
# 启动服务
start_service() {
echo "启动Chat2DB远程服务..."
# 根据环境选择启动方式
if [ "$ENVIRONMENT" = "production" ]; then
# 生产环境使用gunicorn
gunicorn main:app \
--workers 4 \
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--access-logfile logs/access.log \
--error-logfile logs/error.log \
--log-level info \
--timeout 120 \
--keep-alive 5
else
# 开发环境使用uvicorn
source venv/bin/activate
python -m uvicorn main:app \
--host 0.0.0.0 \
--port 8000 \
--reload \
--log-level debug
fi
}
# 主函数
main() {
ENVIRONMENT=${1:-development}
echo "=== Chat2DB远程服务启动 ==="
echo "环境: $ENVIRONMENT"
check_environment
install_dependencies
setup_environment
start_service
}
# 捕获退出信号
trap 'echo "服务停止"; exit 0' INT TERM
main "$@"
```
## 三、客户端适配改造
### Web客户端实现
```html
<!-- client/index.html - Web客户端主界面 -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>远程Chat2DB客户端</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #f5f5f5;
}
.container {
display: flex;
height: 100vh;
}
.sidebar {
width: 300px;
background: white;
border-right: 1px solid #e0e0e0;
display: flex;
flex-direction: column;
}
.main-content {
flex: 1;
display: flex;
flex-direction: column;
}
.connection-panel {
padding: 20px;
border-bottom: 1px solid #e0e0e0;
}
.database-tree {
flex: 1;
overflow-y: auto;
padding: 10px;
}
.editor-container {
flex: 1;
padding: 20px;
display: flex;
flex-direction: column;
}
.sql-editor {
flex: 1;
border: 1px solid #ddd;
border-radius: 4px;
font-family: 'Monaco', 'Menlo', monospace;
font-size: 14px;1P.E8P.HK|7T.R6T.HK|2K.P8H.HK
padding: 10px;
resize: none;
}
.toolbar {
padding: 10px 20px;
background: white;
border-bottom: 1px solid #e0e0e0;
display: flex;
gap: 10px;
}
.result-panel {
padding: 20px;
background: white;
border-top: 1px solid #e0e0e0;
overflow: auto;
}
.status-bar {
padding: 5px 20px;
background: #333;
color: white;
font-size: 12px;
}
.btn {
padding: 8px 16px;
background: #007bff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
.btn:hover {
background: #0056b3;
}
.btn-secondary {
background: #6c757d;
}
table {
width: 100%;
border-collapse: collapse;
}
th, td {
padding: 8px;
border: 1px solid #ddd;
text-align: left;
}
th {
background: #f8f9fa;
font-weight: 600;
}
</style>
</head>
<body>
<div class="container">
<!-- 左侧边栏 -->
<div class="sidebar">
<div class="connection-panel">
<h3>数据库连接</h3>
<div id="connection-list"></div>
<button class="btn" onclick="showConnectionModal()">新建连接</button>
</div>
<div class="database-tree">
<h4>数据库</h4>
<div id="database-tree"></div>
</div>
</div>
<!-- 主内容区 -->
<div class="main-content">
<!-- 工具栏 -->
<div class="toolbar">
<button class="btn" onclick="executeQuery()">执行查询 (F5)</button>
<button class="btn btn-secondary" onclick="formatSQL()">格式化</button>
<button class="btn" onclick="saveQuery()">保存查询</button>
<button class="btn btn-secondary" onclick="clearEditor()">清空</button>
<select id="database-select" onchange="switchDatabase()">
<option value="">选择数据库</option>
</select>
<div style="flex: 1"></div>
<span id="connection-status">未连接</span>
</div>
<!-- SQL编辑器 -->
<div class="editor-container">
<textarea id="sql-editor" class="sql-editor"
placeholder="输入SQL语句..."
onkeydown="handleEditorKeydown(event)"></textarea>
</div>
<!-- 结果面板 -->
<div class="result-panel">
<h3>查询结果</h3>
<div id="result-container"></div>
</div>
<!-- 状态栏 -->
<div class="status-bar">
<span id="row-count">0 行</span>
<span style="margin-left: 20px" id="execution-time">执行时间: 0ms</span>
</div>
</div>
</div>
<!-- 连接配置模态框 -->
<div id="connection-modal" style="display: none; position: fixed; top: 0; left: 0; width: 100%; height: 100%; background: rgba(0,0,0,0.5);">
<div style="background: white; width: 500px; margin: 100px auto; padding: 20px; border-radius: 8px;">
<h3>新建数据库连接</h3>
<form id="connection-form">
<div style="margin-bottom: 15px;">
<label>连接名称</label>
<input type="text" id="conn-name" required style="width: 100%; padding: 8px;">
</div>
<div style="margin-bottom: 15px;">
<label>数据库类型</label>
<select id="db-type" required style="width: 100%; padding: 8px;">
<option value="mysql">MySQL</option>
<option value="postgresql">PostgreSQL</option>
<option value="sqlite">SQLite</option>
</select>
</div>
<div style="display: flex; gap: 10px; margin-bottom: 15px;">
<div style="flex: 1">
<label>主机</label>
<input type="text" id="db-host" required style="width: 100%; padding: 8px;">
</div>
<div style="width: 100px">
<label>端口</label>
<input type="number" id="db-port" style="width: 100%; padding: 8px;">
</div>
</div>
<div style="margin-bottom: 15px;">
<label>数据库名</label>
<input type="text" id="db-name" required style="width: 100%; padding: 8px;">
</div>
<div style="margin-bottom: 15px;">
<label>用户名</label>
<input type="text" id="db-username" required style="width: 100%; padding: 8px;">
</div>
<div style="margin-bottom: 20px;">
<label>密码</label>
<input type="password" id="db-password" required style="width: 100%; padding: 8px;">
</div>
<div style="display: flex; gap: 10px; justify-content: flex-end;">
<button type="button" class="btn btn-secondary" onclick="hideConnectionModal()">取消</button>
<button type="submit" class="btn">测试连接并保存</button>
</div>
</form>
</div>
</div>
<script>
// 全局配置
const API_BASE_URL = 'http://localhost:8000/api/v1';
let currentConnection = null;
let accessToken = null;
// 页面加载时初始化
document.addEventListener('DOMContentLoaded', function() {
loadConnections();
setupWebSocket();
loadSavedQueries();
});
// WebSocket连接
function setupWebSocket() {
const ws = new WebSocket('ws://localhost:8000/ws/client-' + Date.now());
ws.onopen = function() {
updateStatus('已连接', 'green');
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
handleWebSocketMessage(data);
};
ws.onclose = function() {
updateStatus('连接断开', 'red');
// 尝试重连
setTimeout(setupWebSocket, 5000);
};
}
// 认证相关
async function login(username, password) {
try {
const response = await fetch(API_BASE_URL + '/auth/login', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({username, password})
});
if (response.ok) {
const data = await response.json();
accessToken = data.token;
localStorage.setItem('access_token', accessToken);
return true;
}
return false;
} catch (error) {
console.error('登录失败:', error);
return false;
}
}
// 获取认证头
function getAuthHeaders() {
const token = accessToken || localStorage.getItem('access_token');
return {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + token
};
}
// 数据库连接管理
async function testConnection(config) {
try {
const response = await fetch(API_BASE_URL + '/connect', {
method: 'POST',
headers: getAuthHeaders(),
body: JSON.stringify(config)
});
return await response.json();
} catch (error) {
return {success: false, message: '连接测试失败: ' + error.message};
}
}
async function saveConnection(config) {
const connections = JSON.parse(localStorage.getItem('connections') || '[]');
connections.push({
id: Date.now().toString(),
...config
});
localStorage.setItem('connections', JSON.stringify(connections));
}
function loadConnections() {
const connections = JSON.parse(localStorage.getItem('connections') || '[]');
const connectionList = document.getElementById('connection-list');
connectionList.innerHTML = '';
connections.forEach(conn => {
const div = document.createElement('div');
div.innerHTML = `
<div style="padding: 10px; border: 1px solid #ddd; margin-bottom: 5px; cursor: pointer;"
onclick="selectConnection('${conn.id}')">
<strong>${conn.name}</strong><br>
<small>${conn.type} - ${conn.host}</small>
</div>
`;
connectionList.appendChild(div);
});
}
// SQL执行
async function executeQuery() {
const sql = document.getElementById('sql-editor').value;
if (!sql.trim() || !currentConnection) {
alert('请先选择数据库连接并输入SQL语句');
return;
}
const startTime = Date.now();
try {
const response = await fetch(API_BASE_URL + '/query', {
method: 'POST',
headers: getAuthHeaders(),
body: JSON.stringify({
config: currentConnection,
sql: sql,
parameters: {}
})
});
const result = await response.json();
displayResult(result);
const executionTime = Date.now() - startTime;
document.getElementById('execution-time').textContent =
`执行时间: ${executionTime}ms`;
} catch (error) {
showError('查询执行失败: ' + error.message);
}
}
// 结果显示
function displayResult(result) {
const container = document.getElementById('result-container');
if (!result.success) {
container.innerHTML = `<div style="color: red;">${result.message}</div>`;
return;
}
if (result.type === 'query') {
// 显示表格
let html = `<div>返回 ${result.row_count} 行</div>`;
if (result.rows.length > 0) {
html += '<table>';
// 表头
html += '<tr>';
result.columns.forEach(col => {
html += `<th>${col}</th>`;
});
html += '</tr>';
// 数据行
result.rows.forEach(row => {
html += '<tr>';
result.columns.forEach(col => {
html += `<td>${row[col]}</td>`;
});
html += '</tr>';
});
html += '</table>';
}
container.innerHTML = html;
document.getElementById('row-count').textContent =
`${result.row_count} 行`;
} else {
container.innerHTML = `<div>影响行数: ${result.affected_rows}</div>`;
}
}
// 辅助函数
function showConnectionModal() {
document.getElementById('connection-modal').style.display = 'block';
}
function hideConnectionModal() {
document.getElementById('connection-modal').style.display = 'none';
document.getElementById('connection-form').reset();
}
function updateStatus(text, color) {
const elem = document.getElementById('connection-status');
elem.textContent = text;
elem.style.color = color;
}
function showError(message) {
alert('错误: ' + message);
}
// 键盘快捷键
function handleEditorKeydown(event) {
if (event.key === 'F5') {
event.preventDefault();
executeQuery();
}
}
// 表单提交
document.getElementById('connection-form').addEventListener('submit', async function(e) {
e.preventDefault();
const config = {
name: document.getElementById('conn-name').value,
type: document.getElementById('db-type').value,
host: document.getElementById('db-host').value,
port: parseInt(document.getElementById('db-port').value) || 3306,
database: document.getElementById('db-name').value,
username: document.getElementById('db-username').value,
password: document.getElementById('db-password').value
};
const testResult = await testConnection(config);
if (testResult.success) {
await saveConnection(config);
loadConnections();
hideConnectionModal();
alert('连接测试成功并已保存');
} else {
alert('连接测试失败: ' + testResult.message);
}
});
</script>
</body>
</html>
```
### 桌面客户端集成
```python
# client/desktop_client.py - 桌面客户端
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import requests
import json
import threading
from datetime import datetime
class Chat2DBRemoteClient:
def __init__(self, root):
self.root = root
self.root.title("Chat2DB远程客户端")
self.root.geometry("1200x800")
self.api_base = "http://localhost:8000/api/v1"
self.access_token = None
self.setup_ui()
self.load_settings()
def setup_ui(self):
"""设置用户界面"""
# 创建菜单栏
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# 文件菜单
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="文件", menu=file_menu)
file_menu.add_command(label="新建连接", command=self.new_connection)
file_menu.add_separator()
file_menu.add_command(label="退出", command=self.root.quit)
# 主界面布局
main_panel = ttk.PanedWindow(self.root, orient=tk.HORIZONTAL)
main_panel.pack(fill=tk.BOTH, expand=True)
# 左侧连接面板
left_frame = ttk.Frame(main_panel)
main_panel.add(left_frame, weight=1)
# 连接列表
ttk.Label(left_frame, text="数据库连接").pack(pady=5)
self.connection_listbox = tk.Listbox(left_frame, height=15)
self.connection_listbox.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
self.connection_listbox.bind('<<ListboxSelect>>', self.on_connection_select)
# 右侧主面板
right_panel = ttk.PanedWindow(main_panel, orient=tk.VERTICAL)
main_panel.add(right_panel, weight=3)
# SQL编辑器
editor_frame = ttk.LabelFrame(right_panel, text="SQL编辑器")
right_panel.add(editor_frame, weight=2)
self.sql_editor = scrolledtext.ScrolledText(
editor_frame,
wrap=tk.WORD,
font=("Consolas", 11)
)
self.sql_editor.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# 工具栏
toolbar = ttk.Frame(editor_frame)
toolbar.pack(fill=tk.X, padx=5, pady=2)
ttk.Button(toolbar, text="执行(F5)",
command=self.execute_query).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar, text="清除",
command=self.clear_editor).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar, text="保存",
command=self.save_query).pack(side=tk.LEFT, padx=2)
# 结果面板
result_frame = ttk.LabelFrame(right_panel, text="查询结果")
right_panel.add(result_frame, weight=1)
# 结果树状表格
columns = ("#", "字段1", "字段2", "字段3")
self.result_tree = ttk.Treeview(result_frame, columns=columns, show="headings")
for col in columns:
self.result_tree.heading(col, text=col)
self.result_tree.column(col, width=100)
vsb = ttk.Scrollbar(result_frame, orient="vertical",
command=self.result_tree.yview)
hsb = ttk.Scrollbar(result_frame, orient="horizontal",
command=self.result_tree.xview)
self.result_tree.configure(yscrollcommand=vsb.set,
xscrollcommand=hsb.set)
self.result_tree.grid(row=0, column=0, sticky="nsew")
vsb.grid(row=0, column=1, sticky="ns")
hsb.grid(row=1, column=0, sticky="ew")
result_frame.grid_rowconfigure(0, weight=1)
result_frame.grid_columnconfigure(0, weight=1)
# 状态栏
self.status_bar = ttk.Label(self.root, text="就绪", relief=tk.SUNKEN)
self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
# 绑定快捷键
self.root.bind('<F5>', lambda e: self.execute_query())
def load_settings(self):
"""加载设置"""
try:
with open('settings.json', 'r') as f:
settings = json.load(f)
self.access_token = settings.get('access_token')
# 加载连接列表
connections = settings.get('connections', [])
for conn in connections:
self.connection_listbox.insert(tk.END, conn['name'])
except FileNotFoundError:
pass
def save_settings(self):
"""保存设置"""
settings = {
'access_token': self.access_token,
'connections': self.connections
}
with open('settings.json', 'w') as f:
json.dump(settings, f, indent=2)
def new_connection(self):
"""新建连接对话框"""
dialog = tk.Toplevel(self.root)
dialog.title("新建数据库连接")
dialog.geometry("400x350")
# 连接配置表单
ttk.Label(dialog, text="连接名称:").grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)
name_entry = ttk.Entry(dialog, width=30)
name_entry.grid(row=0, column=1, padx=5, pady=5)
ttk.Label(dialog, text="数据库类型:").grid(row=1, column=0, padx=5, pady=5, sticky=tk.W)
db_type = ttk.Combobox(dialog, values=["mysql", "postgresql", "sqlite"], width=27)
db_type.grid(row=1, column=1, padx=5, pady=5)
db_type.current(0)
ttk.Label(dialog, text="主机地址:").grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)
host_entry = ttk.Entry(dialog, width=30)
host_entry.grid(row=2, column=1, padx=5, pady=5)
host_entry.insert(0, "localhost")
ttk.Label(dialog, text="端口:").grid(row=3, column=0, padx=5, pady=5, sticky=tk.W)
port_entry = ttk.Entry(dialog, width=30)
port_entry.grid(row=3, column=1, padx=5, pady=5)
port_entry.insert(0, "3306")
ttk.Label(dialog, text="数据库名:").grid(row=4, column=0, padx=5, pady=5, sticky=tk.W)
db_entry = ttk.Entry(dialog, width=30)
db_entry.grid(row=4, column=1, padx=5, pady=5)
ttk.Label(dialog, text="用户名:").grid(row=5, column=0, padx=5, pady=5, sticky=tk.W)
user_entry = ttk.Entry(dialog, width=30)
user_entry.grid(row=5, column=1, padx=5, pady=5)
ttk.Label(dialog, text="密码:").grid(row=6, column=0, padx=5, pady=5, sticky=tk.W)
pass_entry = ttk.Entry(dialog, width=30, show="*")
pass_entry.grid(row=6, column=1, padx=5, pady=5)
def test_and_save():
"""测试连接并保存"""
config = {
'name': name_entry.get(),
'type': db_type.get(),
'host': host_entry.get(),
'port': int(port_entry.get()),
'database': db_entry.get(),
'username': user_entry.get(),
'password': pass_entry.get()
}
if self.test_connection(config):
self.save_connection(config)
self.connection_listbox.insert(tk.END, config['name'])
dialog.destroy()
messagebox.showinfo("成功", "连接测试成功并已保存")
else:
messagebox.showerror("错误", "连接测试失败")
ttk.Button(dialog, text="测试连接",
command=test_and_save).grid(row=7, column=0, padx=5, pady=10)
ttk.Button(dialog, text="取消",
command=dialog.destroy).grid(row=7, column=1, padx=5, pady=10)
def test_connection(self, config):
"""测试数据库连接"""
try:
response = requests.post(
f"{self.api_base}/connect",
headers=self.get_auth_headers(),
json=config,
timeout=10
)
return response.status_code == 200
except Exception as e:9F.E2C.HK|4Z.W4E.HK|JD.E8P.HK|NW.R6T.HK
self.status_bar.config(text=f"连接测试失败: {str(e)}")
return False
def save_connection(self, config):
"""保存连接配置"""
if not hasattr(self, 'connections'):
self.connections = []
self.connections.append(config)
self.save_settings()
def on_connection_select(self, event):
"""选择连接"""
selection = self.connection_listbox.curselection()
if selection:
index = selection[0]
self.current_connection = self.connections[index]
self.status_bar.config(text=f"已选择: {self.current_connection['name']}")
def execute_query(self):
"""执行SQL查询"""
if not hasattr(self, 'current_connection'):
messagebox.showwarning("警告", "请先选择数据库连接")
return
sql = self.sql_editor.get("1.0", tk.END).strip()
if not sql:
messagebox.showwarning("警告", "请输入SQL语句")
return
# 在新线程中执行查询,避免界面卡顿
thread = threading.Thread(target=self._execute_query_thread, args=(sql,))
thread.daemon = True
thread.start()
def _execute_query_thread(self, sql):
"""查询执行线程"""
try:
self.status_bar.config(text="执行中...")
response = requests.post(
f"{self.api_base}/query",
headers=self.get_auth_headers(),
json={
'config': self.current_connection,
'sql': sql,
'parameters': {}
},
timeout=30
)
if response.status_code == 200:
result = response.json()
self.display_result(result)
self.status_bar.config(text="查询执行完成")
else:
messagebox.showerror("错误", f"查询失败: {response.text}")
except Exception as e:
self.status_bar.config(text=f"执行错误: {str(e)}")
messagebox.showerror("错误", str(e))
def display_result(self, result):
"""显示查询结果"""
# 清空现有结果
for item in self.result_tree.get_children():
self.result_tree.delete(item)
if result['type'] == 'query':
# 更新列标题
columns = result['columns']
self.result_tree['columns'] = columns
for col in columns:
self.result_tree.heading(col, text=col)
self.result_tree.column(col, width=100)
# 添加数据行
for i, row in enumerate(result['rows'], 1):
values = [i] + [row.get(col, '') for col in columns]
self.result_tree.insert('', tk.END, values=values)
self.status_bar.config(text=f"返回 {len(result['rows'])} 行")
else:
self.status_bar.config(text=f"影响行数: {result.get('affected_rows', 0)}")
def clear_editor(self):
"""清空编辑器"""
self.sql_editor.delete("1.0", tk.END)
def save_query(self):
"""保存查询"""
sql = self.sql_editor.get("1.0", tk.END).strip()
if not sql:
messagebox.showwarning("警告", "没有内容可保存")
return
filename = tk.filedialog.asksaveasfilename(
defaultextension=".sql",
filetypes=[("SQL文件", "*.sql"), ("所有文件", "*.*")]
)
if filename:
try:
with open(filename, 'w') as f:
f.write(sql)
self.status_bar.config(text=f"已保存到: {filename}")
except Exception as e:
messagebox.showerror("错误", f"保存失败: {str(e)}")
def get_auth_headers(self):
"""获取认证头"""
return {
'Content-Type': 'application/json',
'Authorization': f'Bearer {self.access_token}'
}
def main():
root = tk.Tk()
app = Chat2DBRemoteClient(root)
root.mainloop()
if __name__ == "__main__":
main()
```
## 四、安全配置与网络优化
### Nginx反向代理配置
```nginx
# nginx/nginx.conf
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
use epoll;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
# Gzip压缩
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_types text/plain text/css text/xml text/javascript
application/json application/javascript application/xml+rss;
# 上游服务配置
upstream chat2db_backend {
server localhost:8000;
keepalive 32;
}
server {
listen 80;
server_name yourdomain.com;
# HTTP重定向到HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name yourdomain.com;
# SSL证书配置
ssl_certificate /etc/nginx/ssl/yourdomain.crt;
ssl_certificate_key /etc/nginx/ssl/yourdomain.key;
ssl_session_timeout 1d;
ssl_session_cache shared:SSL:50m;
ssl_session_tickets off;
# SSL协议和加密套件
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers off;
# HSTS
add_header Strict-Transport-Security "max-age=63072000" always;
# 安全头
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
# 反向代理到Chat2DB服务
location / {
proxy_pass http://chat2db_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 代理超时设置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# 缓冲区优化
proxy_buffering on;
proxy_buffer_size 4k;
proxy_buffers 8 4k;
proxy_busy_buffers_size 8k;
}
# WebSocket支持
location /ws/ {
proxy_pass http://chat2db_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# WebSocket超时设置
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
# 静态文件服务
location /static/ {
alias /path/to/static/files/;
expires 1y;
add_header Cache-Control "public, immutable";
}
# 健康检查端点
location /health {
access_log off;
proxy_pass http://chat2db_backend/health;
}
# 限制请求大小
client_max_body_size 10m;
}
}
```
### 防火墙与安全配置
```bash
#!/bin/bash
# security_setup.sh
set -e
echo "=== 安全配置脚本 ==="
# 1. 配置防火墙
setup_firewall() {
echo "配置防火墙..."
# 检查ufw是否安装
if command -v ufw &> /dev/null; then
ufw --force reset
ufw default deny incoming
ufw default allow outgoing
# 开放必要端口
ufw allow 22/tcp comment 'SSH'
ufw allow 80/tcp comment 'HTTP'
ufw allow 443/tcp comment 'HTTPS'
ufw allow 8000/tcp comment 'Chat2DB API'
ufw --force enable
ufw status verbose
else
echo "警告: ufw未安装,跳过防火墙配置"
fi
}
# 2. 创建专用用户
setup_user() {
echo "创建专用用户..."
if id -u chat2db >/dev/null 2>&1; then
echo "用户chat2db已存在"
else
useradd -r -s /usr/sbin/nologin -M chat2db
echo "已创建用户chat2db"
fi
}
# 3. 配置目录权限
setup_permissions() {
echo "配置目录权限..."
# 创建数据目录
mkdir -p /var/lib/chat2db/{data,logs,backups}
# 设置所有权
chown -R chat2db:chat2db /var/lib/chat2db
chmod -R 750 /var/lib/chat2db
# 配置文件权限
chmod 600 /etc/chat2db/config.yaml
}
# 4. 配置SSL证书
setup_ssl() {
echo "配置SSL证书..."
SSL_DIR="/etc/nginx/ssl"
mkdir -p $SSL_DIR
if [ ! -f "$SSL_DIR/yourdomain.crt" ] || [ ! -f "$SSL_DIR/yourdomain.key" ]; then
echo "生成自签名证书..."
openssl req -x509 -nodes -days 365 -newkey rsa:2048 \
-keyout "$SSL_DIR/yourdomain.key" \
-out "$SSL_DIR/yourdomain.crt" \
-subj "/C=CN/ST=Beijing/L=Beijing/O=Chat2DB/CN=yourdomain.com"
chmod 600 "$SSL_DIR/yourdomain.key"
chmod 644 "$SSL_DIR/yourdomain.crt"
else
echo "SSL证书已存在"
fi
}
# 5. 配置系统服务
setup_service() {
echo "配置系统服务..."
cat > /etc/systemd/system/chat2db.service << 'EOF'
[Unit]
Description=Chat2DB Remote Server
After=network.target
Requires=network.target
[Service]
Type=simple
User=chat2db
Group=chat2db
WorkingDirectory=/var/lib/chat2db
ExecStart=/var/lib/chat2db/venv/bin/python -m uvicorn main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4
ExecReload=/bin/kill -HUP $MAINPID
Restart=on-failure
RestartSec=5s
TimeoutStopSec=30
# 安全配置
NoNewPrivileges=true
PrivateTmp=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/lib/chat2db
# 资源限制
LimitNOFILE=65536
LimitNPROC=4096
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable chat2db.service
systemctl start chat2db.service
echo "服务状态:"
systemctl status chat2db.service --no-pager
}
# 6. 配置日志轮转
setup_logrotate() {
echo "配置日志轮转..."
cat > /etc/logrotate.d/chat2db << 'EOF'
/var/lib/chat2db/logs/*.log {
daily
missingok
rotate 30
compress
delaycompress
notifempty
create 640 chat2db chat2db
sharedscripts
postrotate
systemctl reload chat2db.service > /dev/null 2>&1 || true
endscript
}
EOF
}
# 7. 监控配置
setup_monitoring() {
echo "配置监控..."
# 安装并配置Prometheus exporter
if [ ! -f "/usr/local/bin/prometheus_exporter.py" ]; then
cat > /usr/local/bin/prometheus_exporter.py << 'EOF'
#!/usr/bin/env python3
from prometheus_client import start_http_server, Gauge
import time
import psutil
import requests
# 定义监控指标
cpu_usage = Gauge('chat2db_cpu_usage', 'CPU使用率')
memory_usage = Gauge('chat2db_memory_usage', '内存使用率')
active_connections = Gauge('chat2db_active_connections', '活跃连接数')
query_count = Gauge('chat2db_query_count', '查询计数')
def collect_metrics():
"""收集监控指标"""
# 系统指标
cpu_usage.set(psutil.cpu_percent())
memory_usage.set(psutil.virtual_memory().percent)
# 应用指标
try:
response = requests.get('http://localhost:8000/health', timeout=5)
if response.status_code == 200:
data = response.json()
active_connections.set(data.get('active_connections', 0))
query_count.set(data.get('query_count', 0))
except:
pass
if __name__ == '__main__':
# 在9100端口启动Prometheus metrics服务
start_http_server(9100)
while True:
collect_metrics()
time.sleep(15)
EOF
chmod +x /usr/local/bin/prometheus_exporter.py
# 创建systemd服务
cat > /etc/systemd/system/chat2db-exporter.service << 'EOF'
[Unit]
Description=Chat2DB Prometheus Exporter
After=network.target
[Service]
Type=simple
User=chat2db
ExecStart=/usr/local/bin/prometheus_exporter.py
Restart=always
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable chat2db-exporter.service
systemctl start chat2db-exporter.service
fi
}
# 主函数
main() {
setup_firewall
setup_user
setup_permissions
setup_ssl
setup_service
setup_logrotate
setup_monitoring
echo "=== 安全配置完成 ==="
echo "请确保:"
echo "1. 修改默认密码"
echo "2. 更新SSL证书"
echo "3. 配置数据库访问限制"
}
main "$@"
```
## 五、部署与维护
### Docker化部署
```dockerfile
# Dockerfile
FROM python:3.11-slim
# 设置环境变量
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PORT=8000
# 创建工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
gcc \
g++ \
libpq-dev \
default-libmysqlclient-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd -m -u 1000 chat2db \
&& chown -R chat2db:chat2db /app
USER chat2db
# 暴露端口
EXPOSE $PORT
# 启动命令
CMD ["sh", "-c", "python -m uvicorn main:app --host 0.0.0.0 --port ${PORT} --workers 4"]
```
### 使用说明文档
```markdown
# Chat2DB远程部署指南
## 快速开始
### 1. 服务端部署
```bash
# 克隆仓库
git clone https://github.com/yourname/chat2db-remote.git
cd chat2db-remote/server
# 安装依赖
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
# 配置环境变量
cp .env.example .env
# 编辑.env文件,设置JWT密钥等
# 启动服务
python main.py
```
### 2. 客户端访问
**Web客户端:**
打开浏览器访问 `http://localhost:3000`
**桌面客户端:**
```bash
cd client
python desktop_client.py
```
## 高级配置
### 多环境部署
**开发环境:**
```bash
export ENVIRONMENT=development
python main.py
```
**生产环境:**
```bash
export ENVIRONMENT=production
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker
```
### 数据库支持
支持以下数据库类型:
- MySQL 5.7+
- PostgreSQL 10+
- SQLite 3
## 安全建议
1. **修改默认密码**: 首次部署后立即修改默认密码
2. **启用HTTPS**: 生产环境必须使用HTTPS
3. **限制IP访问**: 通过防火墙限制访问来源
4. **定期备份**: 配置自动备份策略
5. **更新日志**: 监控访问日志和安全日志
## 故障排除
### 常见问题
1. **连接被拒绝**: 检查防火墙和端口配置
2. **认证失败**: 验证JWT令牌和数据库凭据
3. **性能问题**: 检查数据库连接池和索引
### 获取帮助
- 查看日志文件: `logs/chat2db_server.log`
- 检查服务状态: `systemctl status chat2db`
- 社区支持: [GitHub Issues](https://github.com/yourname/chat2db-remote/issues)
```
通过以上配置,可以将本地Chat2DB轻松转换为远程可用的数据库管理工具。这种方案不仅保持了Chat2DB原有的易用性,还增加了跨网络访问的能力,同时通过多重安全措施保障了数据访问的安全性。无论是个人使用还是团队协作,都能获得良好的使用体验。