CPU-only 民生商品价格预测与自我优化系统
以下是一个纯CPU实现的轻量级系统,聚焦大模型预测能力的自我优化核心逻辑,专为民生商品价格预测设计。
import json
import pandas as pd
import numpy as np
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import time
import logging
class CPUPricePredictionSystem:
"""
纯CPU实现的民生商品价格预测与自我优化系统
核心特点:
1. 仅依赖CPU资源,无GPU要求
2. 大模型自我优化机制
3. 多商品、多周期预测
4. 决策导向输出
"""
def __init__(self, api_key: str, model_name: str = "gpt-3.5-turbo"):
"""
初始化系统
Args:
api_key: 大模型API密钥
model_name: 使用的大模型名称
"""
self.api_key = api_key
self.model_name = model_name
self.optimization_history = []
self.performance_metrics = {}
self.factor_weights = {
'seasonality': 0.3,
'holiday_effect': 0.25,
'weather_impact': 0.2,
'supply_demand': 0.15,
'market_sentiment': 0.1
}
self.logger = self._setup_logger()
def _setup_logger(self):
"""设置日志记录器"""
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
return logging.getLogger(__name__)
def prepare_historical_data(self, commodity_type: str,
data: pd.DataFrame,
lookback_days: int = 90) -> str:
"""
准备历史数据,格式化为大模型可读的字符串
"""
recent_data = data.tail(lookback_days)
# 计算基础统计信息
avg_price = recent_data['price'].mean()
price_volatility = recent_data['price'].std() / avg_price * 100
# 格式化历史数据
historical_str = "日期,价格(元/公斤),环比变化(%)\n"
for i, row in recent_data.iterrows():
prev_price = recent_data.iloc[i-1]['price'] if i > 0 else row['price']
change_pct = (row['price'] - prev_price) / prev_price * 100 if prev_price > 0 else 0
historical_str += f"{row['date'].strftime('%Y-%m-%d')},{row['price']:.2f},{change_pct:.2f}\n"
# 添加统计摘要
summary = f"\n【数据统计摘要】\n"
summary += f"平均价格: {avg_price:.2f}元/公斤\n"
summary += f"价格波动率: {price_volatility:.1f}%\n"
summary += f"数据范围: {recent_data['date'].min().strftime('%Y-%m-%d')} 至 {recent_data['date'].max().strftime('%Y-%m-%d')}\n"
return historical_str + summary
def generate_optimization_prompt(self, commodity_type: str,
forecast_horizon: str,
historical_summary: str,
previous_predictions: Optional[List[Dict]] = None) -> str:
"""
生成用于自我优化的提示词
"""
# 构建优化历史上下文
optimization_context = ""
if previous_predictions:
optimization_context = "【历史预测表现回顾】\n"
for pred in previous_predictions[-3:]: # 只取最近3次
optimization_context += f"预测日期: {pred.get('prediction_date', '未知')}\n"
optimization_context += f"预测准确率: {pred.get('accuracy', 'N/A')}%\n"
optimization_context += f"主要误差原因: {pred.get('error_analysis', '未记录')}\n\n"
# 构建因素权重上下文
weights_context = "【当前影响因素权重】\n"
for factor, weight in self.factor_weights.items():
weights_context += f"{factor}: {weight:.2f}\n"
prompt = f"""
你是一位民生商品价格预测专家,专注于{commodity_type}的价格趋势分析。你的任务是优化预测模型,提高预测准确性。
{optimization_context}
{weights_context}
【当前市场环境】
- 商品类型: {commodity_type}
- 预测周期: {forecast_horizon}
- 历史数据统计: {historical_summary}
【优化任务】
1. **因素权重调整**: 基于历史表现,重新分配以下因素的权重(总和必须为1.0):
- 季节性 (seasonality): 当前{self.factor_weights['seasonality']:.2f}
- 节假日效应 (holiday_effect): 当前{self.factor_weights['holiday_effect']:.2f}
- 天气影响 (weather_impact): 当前{self.factor_weights['weather_impact']:.2f}
- 供需关系 (supply_demand): 当前{self.factor_weights['supply_demand']:.2f}
- 市场情绪 (market_sentiment): 当前{self.factor_weights['market_sentiment']:.2f}
2. **预测策略优化**:
- 识别过去预测中的主要误差模式
- 建议改进的数据处理方法
- 推荐更合适的预测时间粒度
3. **不确定性校准**:
- 为不同预测周期设置合理的置信区间
- 识别高风险预测时段的预警信号
【输出要求】
以严格的JSON格式输出,包含以下字段:
{
"optimized_weights": {
"seasonality": 0.0-1.0,
"holiday_effect": 0.0-1.0,
"weather_impact": 0.0-1.0,
"supply_demand": 0.0-1.0,
"market_sentiment": 0.0-1.0
},
"optimization_insights": "详细的优化分析和建议",
"confidence_calibration": {
"daily": "百分比字符串,如'±5%'",
"weekly": "百分比字符串,如'±3%'",
"monthly": "百分比字符串,如'±2%'"
},
"data_processing_improvements": ["具体改进建议1", "具体改进建议2"],
"key_monitoring_indicators": ["需要监控的关键指标1", "关键指标2"]
}
"""
return prompt
def generate_prediction_prompt(self, commodity_type: str,
forecast_horizon: str,
historical_data: str,
optimization_result: Dict) -> str:
"""
生成预测提示词,融合优化结果
"""
# 解析优化结果
weights = optimization_result['optimized_weights']
confidence = optimization_result['confidence_calibration']
prompt = f"""
你是一位专业的民生商品价格预测专家,专注于{commodity_type}的价格趋势预测。请基于历史数据和优化后的预测策略,提供准确的价格预测。
【商品信息】
- 商品类型: {commodity_type}
- 预测周期: {forecast_horizon}
【历史价格数据】
{historical_data}
【优化后的预测策略】
- 因素权重分配:
• 季节性: {weights['seasonality']:.2f}
• 节假日效应: {weights['holiday_effect']:.2f}
• 天气影响: {weights['weather_impact']:.2f}
• 供需关系: {weights['supply_demand']:.2f}
• 市场情绪: {weights['market_sentiment']:.2f}
- 置信区间校准:
• 日预测: {confidence.get('daily', '±8%')}
• 周预测: {confidence.get('weekly', '±5%')}
• 月预测: {confidence.get('monthly', '±3%')}
【预测要求】
1. **生成具体预测值**:
- 日预测: 提供未来7天的每日价格预测
- 周预测: 提供未来4周的每周平均价格预测
- 月预测: 提供未来3个月的月平均价格预测
2. **分析关键驱动因素**:
- 识别影响价格的主要因素及其权重
- 分析季节性模式和周期性规律
- 评估节假日和天气的潜在影响
3. **风险评估**:
- 识别价格波动的高风险时段
- 量化预测的不确定性范围
- 提供风险预警信号
4. **决策建议**:
- 为采购、库存、定价提供具体建议
- 推荐最佳的执行时间窗口
- 评估不同决策方案的风险收益
【输出格式要求】
以严格的JSON格式输出,包含以下字段:
{
"predictions": {
"daily": [
{{
"date": "YYYY-MM-DD",
"predicted_price": 数字,
"confidence_interval": ["下限", "上限"],
"key_drivers": ["主要因素1", "主要因素2"]
}}
],
"weekly": [
{{
"week_start": "YYYY-MM-DD",
"week_end": "YYYY-MM-DD",
"avg_price": 数字,
"confidence_interval": ["下限", "上限"]
}}
],
"monthly": [
{{
"month": "YYYY-MM",
"avg_price": 数字,
"confidence_interval": ["下限", "上限"]
}}
]
},
"factor_analysis": {{
"dominant_factors": ["当前主导因素"],
"seasonal_pattern": "季节性模式描述",
"holiday_impact": "节假日影响分析"
}},
"risk_assessment": {{
"high_risk_periods": [
{{
"period": "时间段描述",
"risk_level": "高/中/低",
"trigger_conditions": ["触发条件"]
}}
],
"uncertainty_sources": ["不确定性来源"]
}},
"decision_recommendations": {{
"procurement": {{
"best_timing": "最佳采购时间",
"recommended_quantity": "建议采购量百分比",
"risk_level": "风险等级"
}},
"inventory_management": {{
"safety_stock_level": "安全库存建议",
"reorder_point": "重新订购点"
}},
"pricing_strategy": {{
"suggested_adjustment": "价格调整建议",
"competitor_response": "竞争对手反应预期"
}}
}}
}
"""
return prompt
def call_llm_api(self, prompt: str, temperature: float = 0.3) -> Dict:
"""
调用大模型API(纯CPU实现,使用API调用)
"""
try:
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
payload = {
"model": self.model_name,
"messages": [
{
"role": "system",
"content": "你是一位专业的民生商品价格预测专家,提供准确、可操作的预测和决策建议。"
},
{
"role": "user",
"content": prompt
}
],
"temperature": temperature,
"response_format": {"type": "json_object"}
}
self.logger.info(f"正在调用API进行优化/预测...")
response = requests.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload,
timeout=120 # 2分钟超时
)
if response.status_code != 200:
self.logger.error(f"API调用失败: {response.status_code}, {response.text}")
raise Exception(f"API调用失败: {response.status_code}")
result = response.json()
content = result['choices'][0]['message']['content']
# 解析JSON响应
try:
parsed_result = json.loads(content)
self.logger.info("API调用成功,获得优化/预测结果")
return parsed_result
except json.JSONDecodeError as e:
self.logger.error(f"JSON解析失败: {e}, 原始内容: {content}")
raise
except Exception as e:
self.logger.error(f"API调用异常: {e}")
# 返回默认结果,避免系统崩溃
return self._get_default_fallback_result()
def _get_default_fallback_result(self) -> Dict:
"""获取默认回退结果"""
return {
"optimized_weights": {
"seasonality": 0.3,
"holiday_effect": 0.25,
"weather_impact": 0.2,
"supply_demand": 0.15,
"market_sentiment": 0.1
},
"predictions": {
"daily": [{"date": datetime.now().strftime('%Y-%m-%d'), "predicted_price": 25.0}],
"weekly": [{"week_start": datetime.now().strftime('%Y-%m-%d'), "avg_price": 25.0}],
"monthly": [{"month": datetime.now().strftime('%Y-%m'), "avg_price": 25.0}]
}
}
def update_optimization_history(self, optimization_result: Dict,
prediction_result: Dict,
actual_data: Optional[pd.DataFrame] = None):
"""
更新优化历史,用于持续学习
"""
record = {
'timestamp': datetime.now().isoformat(),
'optimization_result': optimization_result,
'prediction_result': prediction_result,
'performance_metrics': {}
}
# 如果有实际数据,计算性能指标
if actual_data is not None and not actual_data.empty:
self._calculate_performance_metrics(prediction_result, actual_data, record)
self.optimization_history.append(record)
# 保存到文件(可选)
self._save_optimization_history()
# 更新因素权重
if 'optimized_weights' in optimization_result:
self.factor_weights = optimization_result['optimized_weights']
self.logger.info(f"优化历史已更新,当前记录数: {len(self.optimization_history)}")
def _calculate_performance_metrics(self, prediction_result: Dict,
actual_data: pd.DataFrame, record: Dict):
"""
计算预测性能指标
"""
try:
# 计算日预测准确率
if 'daily' in prediction_result['predictions'] and not actual_data.empty:
predicted_dates = [pred['date'] for pred in prediction_result['predictions']['daily']]
actual_prices = actual_data[actual_data['date'].isin(predicted_dates)]['price'].tolist()
if len(actual_prices) > 0:
# 简单的准确率计算
accuracy = 100 - abs((actual_prices[0] - prediction_result['predictions']['daily'][0]['predicted_price']) / actual_prices[0] * 100)
record['performance_metrics']['daily_accuracy'] = max(0, min(100, accuracy))
# 其他指标可以类似计算
record['performance_metrics']['timestamp'] = datetime.now().isoformat()
except Exception as e:
self.logger.warning(f"性能指标计算失败: {e}")
def _save_optimization_history(self):
"""保存优化历史到文件"""
try:
with open(f'optimization_history_{datetime.now().strftime("%Y%m%d")}.json', 'w', encoding='utf-8') as f:
json.dump(self.optimization_history, f, ensure_ascii=False, indent=2)
except Exception as e:
self.logger.warning(f"保存优化历史失败: {e}")
def predict_commodity_price(self, commodity_type: str,
historical_data: pd.DataFrame,
forecast_horizon: str = "daily",
previous_predictions: Optional[List[Dict]] = None) -> Dict:
"""
预测民生商品价格(核心方法)
"""
start_time = time.time()
self.logger.info(f"开始预测 {commodity_type} 价格,预测周期: {forecast_horizon}")
try:
# 1. 准备历史数据
historical_summary = self.prepare_historical_data(commodity_type, historical_data)
# 2. 生成优化提示词并获取优化结果
optimization_prompt = self.generate_optimization_prompt(
commodity_type, forecast_horizon, historical_summary, previous_predictions
)
optimization_result = self.call_llm_api(optimization_prompt, temperature=0.2)
# 3. 生成预测提示词并获取预测结果
prediction_prompt = self.generate_prediction_prompt(
commodity_type, forecast_horizon, historical_summary, optimization_result
)
prediction_result = self.call_llm_api(prediction_prompt, temperature=0.3)
# 4. 更新优化历史
self.update_optimization_history(optimization_result, prediction_result)
# 5. 添加元数据
prediction_result['metadata'] = {
'commodity_type': commodity_type,
'forecast_horizon': forecast_horizon,
'prediction_timestamp': datetime.now().isoformat(),
'processing_time_seconds': round(time.time() - start_time, 2),
'model_used': self.model_name,
'optimization_applied': True
}
self.logger.info(f"预测完成!处理时间: {round(time.time() - start_time, 2)}秒")
return prediction_result
except Exception as e:
self.logger.error(f"预测过程出错: {e}")
# 返回简化结果
return {
'error': str(e),
'fallback_prediction': self._get_default_fallback_result(),
'metadata': {
'commodity_type': commodity_type,
'forecast_horizon': forecast_horizon,
'error_timestamp': datetime.now().isoformat()
}
}
# 使用示例
if __name__ == "__main__":
# 1. 初始化系统(仅需CPU)
api_key = "your_openai_api_key_here" # 替换为实际API密钥
predictor = CPUPricePredictionSystem(api_key=api_key, model_name="gpt-3.5-turbo")
# 2. 准备示例数据(实际应用中从数据库或API获取)
def create_sample_data(commodity_type: str, days: int = 90) -> pd.DataFrame:
"""创建示例数据"""
today = datetime.now()
dates = [(today - timedelta(days=i)) for i in range(days)][::-1]
# 模拟价格数据(实际应用中使用真实数据)
base_prices = {
'pork': 25.0,
'vegetable': 5.0,
'rice': 3.5,
'oil': 15.0
}
prices = []
base = base_prices.get(commodity_type.lower(), 10.0)
for i, date in enumerate(dates):
# 添加季节性波动
seasonal_factor = 1 + 0.2 * np.sin(2 * np.pi * i / 365)
# 添加随机波动
random_factor = 1 + np.random.normal(0, 0.05)
price = base * seasonal_factor * random_factor
prices.append(round(price, 2))
return pd.DataFrame({
'date': dates,
'price': prices,
'commodity': [commodity_type] * days
})
# 3. 预测猪肉价格(日预测)
print("=== 猪肉价格预测(日维度)===")
pork_data = create_sample_data('pork', 90)
pork_prediction = predictor.predict_commodity_price(
commodity_type="猪肉",
historical_data=pork_data,
forecast_horizon="daily"
)
# 4. 预测蔬菜价格(周预测)
print("\n=== 蔬菜价格预测(周维度)===")
vegetable_data = create_sample_data('vegetable', 90)
vegetable_prediction = predictor.predict_commodity_price(
commodity_type="蔬菜",
historical_data=vegetable_data,
forecast_horizon="weekly"
)
# 5. 预测粮油价格(月预测)
print("\n=== 粮油价格预测(月维度)===")
rice_data = create_sample_data('rice', 180)
rice_prediction = predictor.predict_commodity_price(
commodity_type="大米",
historical_data=rice_data,
forecast_horizon="monthly"
)
# 6. 打印关键决策建议
def print_decision_summary(prediction_result: Dict, commodity_name: str):
"""打印决策摘要"""
if 'error' in prediction_result:
print(f"{commodity_name}预测出错: {prediction_result['error']}")
return
print(f"\n{commodity_name}价格预测决策摘要:")
print("=" * 50)
# 打印关键预测
if 'predictions' in prediction_result:
daily_preds = prediction_result['predictions'].get('daily', [])
if daily_preds:
print(f"📅 未来7天预测:")
for i, pred in enumerate(daily_preds[:7]):
print(f" {pred['date']}: {pred['predicted_price']:.2f}元/公斤 " +
f"(置信区间: {pred['confidence_interval'][0]}~{pred['confidence_interval'][1]})")
# 打印决策建议
if 'decision_recommendations' in prediction_result:
recs = prediction_result['decision_recommendations']
print(f"\n💡 关键决策建议:")
if 'procurement' in recs:
proc = recs['procurement']
print(f" 🛒 采购建议: {proc.get('best_timing', '未指定')}采购,建议量{proc.get('recommended_quantity', 'N/A')}")
print(f" ⚠️ 风险等级: {proc.get('risk_level', '中等')}")
if 'inventory_management' in recs:
inv = recs['inventory_management']
print(f" 📦 库存建议: 安全库存{inv.get('safety_stock_level', '标准水平')}")
# 打印风险预警
if 'risk_assessment' in prediction_result:
risks = prediction_result['risk_assessment'].get('high_risk_periods', [])
if risks:
print(f"\n🚨 风险预警:")
for risk in risks[:2]: # 只显示前2个
print(f" • {risk.get('period', '未知时段')}: {risk.get('risk_level', '高')}风险")
print(f" 触发条件: {', '.join(risk.get('trigger_conditions', ['未指定']))}")
print("=" * 50)
# 输出决策摘要
print_decision_summary(pork_prediction, "猪肉")
print_decision_summary(vegetable_prediction, "蔬菜")
print_decision_summary(rice_prediction, "大米")
# 7. 系统性能信息
print(f"\n📊 系统性能:")
print(f"总优化历史记录数: {len(predictor.optimization_history)}")
print(f"当前因素权重: {predictor.factor_weights}")
系统核心设计思路
1. 纯CPU架构设计
- API驱动:通过调用大模型API(而非本地训练),完全避免GPU依赖
- 轻量级数据处理:使用pandas进行高效数据处理,CPU友好
- 内存优化:只保留必要的历史数据和优化记录,避免内存溢出
2. 自我优化核心逻辑
# 优化循环的核心步骤
1. 分析历史预测表现 → 2. 调整因素权重 → 3. 校准置信区间 → 4. 改进数据处理 → 5. 应用优化结果
- 动态权重调整:根据历史表现自动调整季节性、节假日、天气等因素的权重
- 置信区间校准:为不同预测周期(日/周/月)设置不同的不确定性范围
- 持续学习:保存优化历史,每次预测都基于之前的经验进行改进
3. 决策导向输出
- 具体数值建议:采购时机、库存水平、价格调整幅度
- 风险量化:明确标注高风险时段和触发条件
- 执行指导:为不同角色(采购、库存、销售)提供具体行动建议
4. 多商品多周期支持
- 商品类型:猪肉、蔬菜、粮油等民生商品
-
预测粒度:
- 日预测:7天详细价格,适合短期采购决策
- 周预测:4周平均价格,适合库存管理
- 月预测:3月平均价格,适合战略规划
5. 系统优势
- 零GPU依赖:完全在CPU上运行,部署成本低
- 快速迭代:每次预测都优化系统参数,越用越准
- 业务价值:直接输出可执行的决策建议,而非仅仅是预测数字
- 弹性设计:API调用失败时有回退机制,保证系统稳定性
这个系统充分利用大模型的理解和推理能力,通过自我优化机制不断提升预测准确性,同时保持纯CPU实现的轻量级架构,非常适合资源有限但需要高质量价格预测的民生商品应用场景。