以下是基于上述优化方案的核心Python代码实现,聚焦大模型预测能力自我优化的核心逻辑(注:因完整大模型部署复杂度高,代码中采用「轻量级预测模型+LoRA增量微调思路」模拟大模型自我优化,同时覆盖数据增强、偏差诊断、参数自调、反馈闭环、决策生成全核心流程)。
基于对搜索结果的分析和深度反思,我发现之前方案有几个关键优化点:
关键优化反思
模型选择聚焦:GPU在LightGBM中性能反而不如CPU,约为CPU性能的十分之一 ,因此应完全聚焦CPU优化而非考虑GPU兼容性
增量学习策略:LightGBM支持真正的增量学习,无需频繁重训整个模型 ,这能显著减少计算开销
内存优化优先:在无GPU环境下,内存优化比计算优化更重要,需要专门配置内存参数
特征工程简化:复杂的自动特征生成在CPU上开销过大,应采用领域知识驱动的核心特征+模型内生特征重要性
并行策略调整:CPU并行化需要精细配置,避免过度并行导致的上下文切换开销
优化后的Python实现
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import json
import os
import logging
from typing import Dict, List, Optional, Tuple, Union
import lightgbm as lgb
from sklearn.metrics import mean_absolute_percentage_error, mean_squared_error
from sklearn.model_selection import TimeSeriesSplit
import joblib
import psutil
import gc
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('price_prediction.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger('VegPricePredictor')
class MemoryOptimizedFeatureEngine:
"""内存优化的特征引擎,避免在CPU环境下产生过多特征"""
def __init__(self, max_lag_days: int = 30, seasonal_periods: List[int] = [7, 30, 365]):
self.max_lag_days = max_lag_days
self.seasonal_periods = seasonal_periods
self.feature_stats = {}
def generate_features(self, df: pd.DataFrame, target_col: str = 'price') -> pd.DataFrame:
"""生成核心特征,控制内存使用"""
df = df.copy()
df['date'] = pd.to_datetime(df['date'])
# 基础时间特征(轻量级)
df['day_of_week'] = df['date'].dt.dayofweek
df['month'] = df['date'].dt.month
df['day_of_month'] = df['date'].dt.day
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
# 节假日特征(需要外部数据,这里简化)
df['is_holiday'] = 0 # 实际应用中需要节假日数据
# 价格相关特征
if target_col in df.columns:
# 移动平均(控制lag数量)
for lag in [1, 3, 7, 14, 30]:
if lag <= self.max_lag_days:
df[f'price_lag_{lag}'] = df[target_col].shift(lag)
# 价格变化率
df['price_change_1d'] = df[target_col].pct_change(1)
df['price_change_7d'] = df[target_col].pct_change(7)
# 季节性特征
for period in self.seasonal_periods:
if period <= len(df):
df[f'seasonal_mean_{period}'] = df[target_col].rolling(window=period, min_periods=1).mean()
# 内存优化:删除不必要的列
cols_to_drop = ['date']
df = df.drop(columns=[c for c in cols_to_drop if c in df.columns])
# 填充缺失值
df = df.ffill().bfill()
# 记录特征统计
self.feature_stats['num_features'] = len(df.columns)
self.feature_stats['memory_usage_mb'] = df.memory_usage().sum() / 1024 / 1024
logger.info(
f"Generated {len(df.columns)} features, Memory usage: {self.feature_stats['memory_usage_mb']:.2f} MB")
return df
class SelfOptimizingLGBPredictor:
"""自优化的LightGBM预测器,专注于CPU环境优化"""
def __init__(self,
config_path: str = 'predictor_config.json',
model_dir: str = 'models',
max_memory_usage_percent: float = 70.0):
"""
初始化自优化预测器
参数:
config_path: 配置文件路径
model_dir: 模型保存目录
max_memory_usage_percent: 最大内存使用百分比,超过则触发优化
"""
self.config_path = config_path
self.model_dir = model_dir
self.max_memory_usage_percent = max_memory_usage_percent
# 确保模型目录存在
os.makedirs(model_dir, exist_ok=True)
# 加载或初始化配置
self.config = self._load_or_init_config()
# 初始化组件
self.feature_engine = MemoryOptimizedFeatureEngine(
max_lag_days=self.config.get('max_lag_days', 30)
)
self.model = None
self.model_version = 0
self.performance_history = []
self.last_optimization_time = None
# 加载现有模型
self._load_latest_model()
def _load_or_init_config(self) -> Dict:
"""加载或初始化配置"""
default_config = {
'model_params': {
'objective': 'regression',
'metric': ['mape', 'rmse'],
'boosting_type': 'gbdt',
'num_leaves': 31,
'learning_rate': 0.05,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'verbose': -1,
'n_jobs': max(1, psutil.cpu_count(logical=False) - 1), # 保留一个CPU核心
'max_depth': 8,
'min_data_in_leaf': 20,
'lambda_l1': 0.1,
'lambda_l2': 0.1
},
'training_params': {
'num_boost_round': 200,
'early_stopping_rounds': 20,
'validation_size': 0.2
},
'optimization_params': {
'min_data_for_optimization': 1000,
'optimization_frequency_days': 7,
'performance_threshold': 0.15 # MAPE阈值
},
'max_lag_days': 30,
'max_model_history': 5
}
if os.path.exists(self.config_path):
try:
with open(self.config_path, 'r') as f:
config = json.load(f)
logger.info("Loaded existing configuration")
# 合并默认配置,确保所有必要参数都存在
for key, value in default_config.items():
if key not in config:
config[key] = value
return config
except Exception as e:
logger.warning(f"Error loading config: {e}, using default config")
return default_config
else:
logger.info("Creating new configuration file")
with open(self.config_path, 'w') as f:
json.dump(default_config, f, indent=2)
return default_config
def _load_latest_model(self):
"""加载最新的模型"""
model_files = [f for f in os.listdir(self.model_dir) if f.startswith('model_v') and f.endswith('.pkl')]
if model_files:
latest_model = max(model_files, key=lambda x: int(x.split('_v')[1].split('.')[0]))
try:
model_path = os.path.join(self.model_dir, latest_model)
self.model = joblib.load(model_path)
self.model_version = int(latest_model.split('_v')[1].split('.')[0])
logger.info(f"Loaded model version {self.model_version} from {model_path}")
return True
except Exception as e:
logger.error(f"Error loading model: {e}")
return False
def _save_model(self, performance_metrics: Dict):
"""保存模型和性能指标"""
self.model_version += 1
model_path = os.path.join(self.model_dir, f'model_v{self.model_version}.pkl')
metrics_path = os.path.join(self.model_dir, f'metrics_v{self.model_version}.json')
try:
# 保存模型
joblib.dump(self.model, model_path)
# 保存性能指标
performance_metrics.update({
'model_version': self.model_version,
'timestamp': datetime.now().isoformat(),
'config': self.config
})
with open(metrics_path, 'w') as f:
json.dump(performance_metrics, f, indent=2)
# 记录到历史
self.performance_history.append(performance_metrics)
# 清理旧模型
self._cleanup_old_models()
logger.info(f"Saved model version {self.model_version} with MAPE: {performance_metrics.get('mape', 0):.4f}")
return True
except Exception as e:
logger.error(f"Error saving model: {e}")
return False
def _cleanup_old_models(self):
"""清理旧模型,保留最近的几个"""
max_history = self.config.get('max_model_history', 5)
model_files = sorted([f for f in os.listdir(self.model_dir) if f.startswith('model_v')],
key=lambda x: int(x.split('_v')[1].split('.')[0]))
if len(model_files) > max_history:
for old_file in model_files[:-max_history]:
try:
os.remove(os.path.join(self.model_dir, old_file))
metrics_file = old_file.replace('model_', 'metrics_').replace('.pkl', '.json')
if os.path.exists(os.path.join(self.model_dir, metrics_file)):
os.remove(os.path.join(self.model_dir, metrics_file))
logger.info(f"Cleaned up old model: {old_file}")
except Exception as e:
logger.warning(f"Error cleaning up {old_file}: {e}")
def _check_memory_usage(self) -> bool:
"""检查内存使用是否超过阈值"""
memory_percent = psutil.virtual_memory().percent
logger.info(f"Current memory usage: {memory_percent:.1f}%")
if memory_percent > self.max_memory_usage_percent:
logger.warning(f"Memory usage ({memory_percent:.1f}%) exceeds threshold ({self.max_memory_usage_percent}%)")
# 触发垃圾回收
gc.collect()
return False
return True
def _optimize_model_params(self, X: pd.DataFrame, y: pd.Series):
"""轻量级模型参数优化"""
logger.info("Starting model parameter optimization...")
# 时间序列交叉验证
tscv = TimeSeriesSplit(n_splits=3)
best_params = self.config['model_params'].copy()
best_score = float('inf')
# 简化的参数网格
param_grid = {
'num_leaves': [15, 31, 63],
'learning_rate': [0.01, 0.05, 0.1],
'max_depth': [6, 8, 10]
}
for num_leaves in param_grid['num_leaves']:
for lr in param_grid['learning_rate']:
for max_depth in param_grid['max_depth']:
params = self.config['model_params'].copy()
params.update({
'num_leaves': num_leaves,
'learning_rate': lr,
'max_depth': max_depth
})
scores = []
for train_idx, val_idx in tscv.split(X):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
train_data = lgb.Dataset(X_train, label=y_train, free_raw_data=False)
val_data = lgb.Dataset(X_val, label=y_val, free_raw_data=False)
callbacks = [lgb.early_stopping(stopping_rounds=self.config['training_params']['early_stopping_rounds']),
lgb.log_evaluation(period=0)]
model = lgb.train(
params,
train_data,
num_boost_round=self.config['training_params']['num_boost_round'],
valid_sets=[val_data],
callbacks=callbacks
)
y_pred = model.predict(X_val)
mape = mean_absolute_percentage_error(y_val, y_pred)
scores.append(mape)
avg_score = np.mean(scores)
logger.info(f"Params: leaves={num_leaves}, lr={lr}, depth={max_depth}, MAPE={avg_score:.4f}")
if avg_score < best_score:
best_score = avg_score
best_params = params
# 更新配置
self.config['model_params'] = best_params
with open(self.config_path, 'w') as f:
json.dump(self.config, f, indent=2)
logger.info(f"Optimization complete. Best MAPE: {best_score:.4f}, Params: {best_params}")
self.last_optimization_time = datetime.now()
return best_params
def _should_optimize(self, new_data_count: int) -> bool:
"""判断是否需要进行模型优化"""
opt_config = self.config['optimization_params']
# 检查数据量
if new_data_count < opt_config['min_data_for_optimization']:
return False
# 检查时间间隔
if self.last_optimization_time is not None:
days_since_opt = (datetime.now() - self.last_optimization_time).days
if days_since_opt < opt_config['optimization_frequency_days']:
return False
# 检查最近性能
if self.performance_history:
recent_mape = [h['mape'] for h in self.performance_history[-3:]]
if np.mean(recent_mape) < opt_config['performance_threshold']:
return False
return True
def incremental_train(self, new_data: pd.DataFrame, target_col: str = 'price'):
"""
增量训练模型
参数:
new_data: 包含新数据的DataFrame,必须包含'date'和目标列
target_col: 目标列名
"""
logger.info(f"Starting incremental training with {len(new_data)} new records")
# 内存检查
if not self._check_memory_usage():
logger.warning("High memory usage, skipping training")
return False
# 数据预处理
new_data = new_data.copy()
if 'date' not in new_data.columns:
raise ValueError("Input data must contain 'date' column")
# 特征工程
feature_df = self.feature_engine.generate_features(new_data, target_col)
# 准备训练数据
if target_col not in new_data.columns:
raise ValueError(f"Target column '{target_col}' not found in input data")
X = feature_df.drop(columns=[target_col], errors='ignore')
y = new_data[target_col]
# 确保数据对齐
if len(X) != len(y):
min_len = min(len(X), len(y))
X = X.iloc[:min_len]
y = y.iloc[:min_len]
logger.warning(f"Data alignment issue fixed, using {min_len} records")
# 确保特征列与模型训练时一致(如果模型已存在)
if self.model is not None:
train_features = self.model.feature_name()
missing_features = [f for f in train_features if f not in X.columns]
extra_features = [f for f in X.columns if f not in train_features]
# 删除额外特征
if extra_features:
X = X.drop(columns=extra_features)
logger.debug(f"Removed extra features: {extra_features}")
# 添加缺失特征(用0填充)
for feat in missing_features:
X[feat] = 0
logger.debug(f"Added missing feature {feat} with default value 0")
# 重新排序特征列以匹配训练时的顺序
X = X[train_features]
# 划分训练集和验证集(时间序列分割)
val_size = int(len(X) * self.config['training_params']['validation_size'])
if val_size < 10: # 确保验证集有足够的样本
val_size = min(10, len(X) // 2)
X_train, X_val = X.iloc[:-val_size], X.iloc[-val_size:]
y_train, y_val = y.iloc[:-val_size], y.iloc[-val_size:]
logger.info(f"Training with {len(X_train)} samples, validating with {len(X_val)} samples")
# 准备LightGBM数据集
train_data = lgb.Dataset(X_train, label=y_train, free_raw_data=False)
val_data = lgb.Dataset(X_val, label=y_val, free_raw_data=False)
# 检查是否需要优化参数
if self._should_optimize(len(new_data)):
self._optimize_model_params(X_train, y_train)
# 模型训练
try:
if self.model is None:
# 首次训练
logger.info("Training first model...")
callbacks = [lgb.early_stopping(stopping_rounds=self.config['training_params']['early_stopping_rounds']),
lgb.log_evaluation(period=50)]
self.model = lgb.train(
self.config['model_params'],
train_data,
num_boost_round=self.config['training_params']['num_boost_round'],
valid_sets=[val_data],
callbacks=callbacks
)
else:
# 增量训练
logger.info("Performing incremental training...")
callbacks = [lgb.early_stopping(stopping_rounds=10),
lgb.log_evaluation(period=25)]
self.model = lgb.train(
self.config['model_params'],
train_data,
num_boost_round=50, # 增量训练使用较少的轮数
valid_sets=[val_data],
init_model=self.model, # 关键:使用现有模型初始化
callbacks=callbacks
)
# 评估模型
y_pred = self.model.predict(X_val)
mape = mean_absolute_percentage_error(y_val, y_pred)
rmse = np.sqrt(mean_squared_error(y_val, y_pred))
performance_metrics = {
'mape': float(mape),
'rmse': float(rmse),
'training_samples': len(X_train),
'validation_samples': len(X_val),
'feature_count': len(X.columns),
'num_trees': self.model.num_trees() if self.model else 0
}
logger.info(f"Model performance - MAPE: {mape:.4f}, RMSE: {rmse:.4f}")
# 保存模型
self._save_model(performance_metrics)
return True
except Exception as e:
logger.error(f"Training failed: {e}")
# 回退到之前保存的模型
self._load_latest_model()
return False
def predict(self, future_dates: List[datetime], external_features: Optional[pd.DataFrame] = None) -> pd.DataFrame:
"""
预测未来价格
参数:
future_dates: 要预测的日期列表
external_features: 外部特征DataFrame(可选)
返回:
包含预测结果的DataFrame
"""
if self.model is None:
raise ValueError("No trained model available. Please train the model first.")
logger.info(f"Predicting for {len(future_dates)} future dates")
# 创建预测数据框架
future_df = pd.DataFrame({'date': future_dates})
# 添加外部特征(如果有)
if external_features is not None:
future_df = pd.merge(future_df, external_features, on='date', how='left')
# 生成特征
feature_df = self.feature_engine.generate_features(future_df)
# 确保特征列与训练时一致
train_features = self.model.feature_name()
missing_features = [f for f in train_features if f not in feature_df.columns]
if missing_features:
logger.warning(f"Missing features for prediction: {missing_features}")
# 用0填充缺失特征(实际应用中应该有更好的处理方式)
for feat in missing_features:
feature_df[feat] = 0
# 只保留训练时使用的特征
feature_df = feature_df[train_features]
# 进行预测
try:
predictions = self.model.predict(feature_df)
result_df = pd.DataFrame({
'date': future_dates,
'predicted_price': predictions
})
# 添加预测区间(简单版本)
if hasattr(self, 'performance_history') and self.performance_history:
recent_mape = np.mean([h['mape'] for h in self.performance_history[-3:]])
result_df['lower_bound'] = result_df['predicted_price'] * (1 - recent_mape)
result_df['upper_bound'] = result_df['predicted_price'] * (1 + recent_mape)
logger.info(f"Prediction completed successfully")
return result_df
except Exception as e:
logger.error(f"Prediction failed: {e}")
raise
def get_feature_importance(self) -> pd.DataFrame:
"""获取特征重要性"""
if self.model is None:
raise ValueError("No trained model available")
importance = self.model.feature_importance(importance_type='gain')
feature_names = self.model.feature_name()
importance_df = pd.DataFrame({
'feature': feature_names,
'importance': importance
}).sort_values('importance', ascending=False)
return importance_df
def health_check(self) -> Dict:
"""系统健康检查"""
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage(self.model_dir)
health_status = {
'memory_usage_percent': memory_info.percent,
'available_memory_gb': memory_info.available / 1024 ** 3,
'disk_usage_percent': disk_info.percent,
'model_version': self.model_version,
'last_optimization': self.last_optimization_time.isoformat() if self.last_optimization_time else None,
'recent_performance': self.performance_history[-1] if self.performance_history else None
}
return health_status
# 使用示例
if __name__ == "__main__":
# 创建预测器实例
predictor = SelfOptimizingLGBPredictor(
config_path='vegetable_price_config.json',
model_dir='models/vegetable_price',
max_memory_usage_percent=80.0
)
# 模拟历史数据(实际应用中从数据库加载)
dates = pd.date_range(start='2023-01-01', end='2024-12-31', freq='D')
np.random.seed(42)
prices = 10 + np.sin(np.arange(len(dates)) * 2 * np.pi / 365) * 2 + np.random.normal(0, 1, len(dates))
historical_data = pd.DataFrame({
'date': dates,
'price': prices,
'volume': np.random.randint(100, 1000, len(dates))
})
# 初始训练
print("Initial training...")
predictor.incremental_train(historical_data)
# 模拟新数据到达
new_dates = pd.date_range(start='2025-01-01', end='2025-01-31', freq='D')
new_prices = 11 + np.sin(np.arange(len(new_dates)) * 2 * np.pi / 365) * 2 + np.random.normal(0, 1, len(new_dates))
new_data = pd.DataFrame({
'date': new_dates,
'price': new_prices,
'volume': np.random.randint(100, 1000, len(new_dates))
})
# 增量训练
print("\nIncremental training with new data...")
predictor.incremental_train(new_data)
# 预测未来30天
future_dates = pd.date_range(start='2025-02-01', end='2025-02-28', freq='D')
print("\nMaking predictions...")
predictions = predictor.predict(future_dates)
print("\nPredictions:")
print(predictions.head())
# 获取特征重要性
print("\nFeature Importance:")
importance = predictor.get_feature_importance()
print(importance.head(10))
# 健康检查
print("\nSystem Health Check:")
health = predictor.health_check()
print(json.dumps(health, indent=2))
核心优化亮点
真正的增量学习:利用LightGBM的
init_model参数实现真正的增量训练,避免全量重训-
内存优化设计:
- 特征工程内存监控和优化
- 智能垃圾回收机制
- 模型历史自动清理
CPU并行优化:动态设置
n_jobs参数,保留一个CPU核心给系统,避免资源争用-
轻量化自优化:
- 基于性能阈值和时间间隔触发优化
- 简化的参数网格搜索替代复杂的贝叶斯优化
- 时间序列交叉验证确保泛化性
-
鲁棒性增强:
- 完整的错误处理和回退机制
- 系统健康检查
- 内存使用监控和自动清理
预测区间估计:基于历史性能自动计算预测区间,提供不确定性评估
这个实现完全在CPU上运行,通过精巧的设计实现了模型的自我优化能力,同时保持了轻量级和高效率。