目录
- 前置
- 异常突增突降检测
- 实际代码
- 周期性数据使用PCA检测
- 平稳性数据使用PCA检测
- 周期性数据突增突降主流程
- 前置
· - 流程
- 前置
- 参考文章
前置
- 之前总结过推荐系统,里面包含了机器学习基础业务实战场景(十四)推荐系统
- 简单的分类算法机器学习之分类算法
异常突增突降检测
- 本文介绍异常突增突降检测无监督学习(特点是训练数据没有明确的标签或类别信息)降维算法(PCA),无监督的聚合算法(CBLOF使用了KMeans),现公司业务生产环境中数据太少不如降维准确简单测试
- 本文模型预处理对于太多无用数据不预测,使用均值或者中位数填充部分数据
- 模型预测完之后,异常数据使用箱型图和zscore+肘部法则对最终异常数处理
实际代码
- 基于PCA降维的代码
- 数据类型excel,可以自己构造, 举例子240202_240206_g_video_not_status_cycle_pca
time bw
2024-02-02 00:00:00 12827388.68
2024-02-02 00:01:00 13225658.56
2024-02-02 00:02:00 13246549.29
周期性数据使用PCA检测
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
def get_g_video_not_status_code_data():
# 这块统计出刚刚开始上涨的数据也很准
df = pd.read_excel('F:\\py_data\240202_240206_g_video_not_status_cycle_pca.xlsx', sheet_name='Sheet1')
df['time'] = df['time'].apply(pd.to_datetime)
df['bw'] = df['bw'].astype(float)
df1 = df.set_index('time', drop=True)
se = df1['bw']
time_series = se.asfreq('T').fillna(method='bfill')
se_test = time_series['2024-02-07 00:00:00': '2024-02-09 23:59:00']
return se_test
def get_g_video_not_status_code_data_v2():
# 这块统计出刚刚开始上涨的数据也很准
df = pd.read_excel('F:\\py_data\\240202_240206_g_video_not_status_cycle_pca.xlsx', sheet_name='Sheet1')
df['time'] = df['time'].apply(pd.to_datetime)
df['bw'] = df['bw'].astype(float)
df1 = df.set_index('time', drop=True)
se = df1['bw']
time_series = se.asfreq('T').fillna(method='bfill')
se_test = time_series['2024-02-02 00:00:00': '2024-02-06 23:59:00']
return se_test
def get_12306_status_code_data():
# 这块统计出刚刚开始上涨的数据也很准
df = pd.read_excel('F:\\py_data\\20240208_20240215_12306_status_code.xlsx', sheet_name='Sheet1')
df['time'] = df['time'].apply(pd.to_datetime)
df['bw'] = df['bw'].astype(float)
df1 = df.set_index('time', drop=True)
se = df1['bw']
time_series = se.asfreq('T').fillna(method='bfill')
se_test = time_series['2024-02-08 10:00': '2024-02-15 08:00']
return se_test
def get_baidu_status_code_data():
# 这块统计出刚刚开始上涨的数据也很准, 算是平稳型数据
df = pd.read_excel('F:\\py_data\\20240208_20240215_baidu_status_code.xlsx', sheet_name='Sheet1')
df['time'] = df['time'].apply(pd.to_datetime)
df['bw'] = df['bw'].astype(float)
df1 = df.set_index('time', drop=True)
se = df1['bw']
time_series = se.asfreq('T').fillna(method='bfill')
se_test = time_series['2024-02-08 10:00': '2024-02-15 08:00']
return se_test
if __name__ == '__main__':
se_test = get_g_video_not_status_code_data()
# 计算残差 比之前用seasonal_decompose好,因为seasonal_decompose 有时效性, 数据都是一天的周期
trend = se_test.rolling(window=1440).mean()
residuals = se_test - trend
# 使用向前填充, 前面数据可以不用,数据量不大,填充的价值很小
residuals.fillna(method='bfill', inplace=True)
# 绘制分解后的组件
plt.figure(figsize=(12, 8))
plt.subplot(411)
plt.plot(se_test, label='Original')
plt.legend(loc='best')
plt.subplot(412)
plt.plot(residuals, label='Residuals')
plt.legend(loc='best')
plt.tight_layout()
plt.show()
# 使用PCA计算异常得分, 经过实验这种周期性明显的数据, 离线生成阈值,再结合得分算异常,不如直接取多天比如四天的数据再统计更准确
pca = PCA(n_components=1)
pca_fit = pca.fit(residuals.values.reshape(-1, 1))
# 将这些数据重塑为一个二维数组,其中每一行代表一个数据点,每一列代表一个特征。-1是一个占位符,表示该维度的大小应自动计算
principalComponents = pca_fit.transform(residuals.values.reshape(-1, 1))
# principalComponents ** 2将每个主成分值平方。np.sum(principalComponents ** 2, axis=1)沿着每个数据点的维度(即,沿着列)求和,得到每个数据点的主成分得分的平方。np.sqrt(...)则取平方根,得到每个数据点的主成分得分
scores = np.sqrt(np.sum(principalComponents ** 2, axis=1))
# 通过箱型图确定异常阈值
q1 = np.percentile(residuals, 25)
q3 = np.percentile(residuals, 75)
iqr = q3 - q1
lower_bound = q1 - 10 * iqr
upper_bound = q3 + 1.5 * iqr
# 找出异常的残差
outliers_residual = (residuals < lower_bound) | (residuals > upper_bound)
# 结合PCA异常得分和箱型图异常阈值确定最终的异常数据,outliers_residual这个就比较准
# 这里假设只有残差箱型图和PCA得分同时满足异常条件时,才认为是真正的异常
# 该阈值是均值加上两倍的标准差。在统计学中,一个数据点如果其值超出了均值加减两倍标准差的范围,通常被认为是一个相对比较极端的值(在正态分布的情况下,大约95%的数据会落在这个范围内,所以超出这个范围的值是相对罕见的)
final_outliers = (outliers_residual) & (scores > np.mean(scores) + 2 * np.std(scores))
# 打印异常数据的索引 todo 最后告警要有连续的几分钟比如十分钟才告警来
res = final_outliers[final_outliers].index.tolist()
print("异常数据索引:", res)
平稳性数据使用PCA检测
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from pyod.models.cblof import CBLOF
import pandas as pd
def CBLOF_fit_check(data_scaled_inner_2):
# 使用CBLOF计算异常得分
cblof = CBLOF()
y_pred_cblof = cblof.fit_predict(data_scaled_inner_2[['value']])
scores_cblof = cblof.decision_function(data_scaled_inner_2[['value']])
# 将scores_cblof转换为Pandas Series(如果它还不是的话)
scores_series = pd.Series(scores_cblof)
# 计算四分位数Q1和Q3
Q1 = scores_series.quantile(0.25)
Q3 = scores_series.quantile(0.75)
# 计算四分位距IQR
IQR = Q3 - Q1
# 定义异常值范围, 动态值变化下
lower_bound = Q1 - 10 * IQR
upper_bound = Q3 + 10 * IQR
# 找出异常值,todo 思路是对的找数据的异常值, 还没处理不过没关系
outliers = np.any((scores_cblof < lower_bound) | (scores_cblof > upper_bound))
# 打印异常值
print("Outliers in CBLOF space:", outliers)
if __name__ == '__main__':
val = [9.07, 9.28, 9.37, 9.61, 9.89, 10.11, 10.29, 10.42, 10.45, 10.39,
10.83, 10.79, 10.75, 10.83, 10.85, 10.75, 10.70, 10.64, 10.65, 10.68,
10.83, 10.79, 10.75, 10.83, 15.85, 10.75, 6.75, 8.75, 126.75, 118.75,
116.75]
ind = ['2022-07-24 08:00:00', '2022-07-24 08:01:00', '2022-07-24 08:02:00', '2022-07-24 08:03:00',
'2022-07-24 08:04:00', '2022-07-24 08:05:00', '2022-07-24 08:06:00', '2022-07-24 08:07:00',
'2022-07-24 08:08:00', '2022-07-24 08:09:00', '2022-07-24 08:10:00', '2022-07-24 08:11:00',
'2022-07-24 08:12:00', '2022-07-24 08:13:00', '2022-07-24 08:14:00', '2022-07-24 08:15:00',
'2022-07-24 08:16:00', '2022-07-24 08:17:00', '2022-07-24 08:18:00', '2022-07-24 08:19:00',
'2022-07-24 08:20:00', '2022-07-24 08:21:00', '2022-07-24 08:22:00', '2022-07-24 08:23:00',
'2022-07-24 08:24:00', '2022-07-24 08:25:00', '2022-07-24 08:26:00', '2022-07-24 08:27:00',
'2022-07-24 08:28:00', '2022-07-24 08:29:00', '2022-07-24 08:30:00']
se = pd.Series(val, index=ind)
# 将Series转换为DataFrame
data = pd.DataFrame(se, columns=['value'])
# 期望结果为一个二维数组,所以添加一个时间序列的整数表示列
data['time'] = range(len(data))
# 标准化
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)
# 使用PCA
pca = PCA(n_components=1)
# 训练数据 data_scaled可以改变, 这里数据量不大暂时取相同的
pca_data_train = pca.fit_transform(data_scaled)
# 真正测试
pca_data_test = pca.fit_transform(data_scaled)
# 识别异常值
Q11 = np.percentile(pca_data_train, 25, axis=0)
Q31 = np.percentile(pca_data_train, 75, axis=0)
IQR1 = Q31 - Q11
# 定义异常值范围
lower_bound = Q11 - 1.5 * IQR1
upper_bound = Q31 + 1.5 * IQR1
# 找出异常值
outliers = np.any((pca_data_test < lower_bound) | (pca_data_test > upper_bound), axis=1)
# 打印异常值
print("Outliers in PCA space:", outliers)
# CBLOF检测 也可以识别出来不过会有误告,相对来说上面数据集PCA准确一些,可以结合起来,一个非常大三个比较大徒增数据PCA只会
# 报一个,三个比较大PCA都会报,CBLOF也会报但总有误报
#CBLOF_fit_check(data)
# IForest, 聚类这块不太准确
# 使用IForest计算异常得分, contamination参数表示数据集中异常点所占的比例
# 使用IForest计算异常得分, 箱型图计算的有点问题
# clf = IsolationForest(contamination=0.1) # contamination参数表示数据集中异常点所占的比例
# preds = clf.fit_predict(data[['value']])
# scores = -clf.decision_function(data[['value']]) # IForest的异常得分,负数表示异常程度
# # # 计算Z-score
# z_scores = np.abs(stats.zscore(data['value']))
# # 使用箱型图计算阈值
# Q1 = data['value'].quantile(0.25)
# Q3 = data['value'].quantile(0.75)
# IQR = Q3 - Q1
# threshold_box = Q1 - 1.5 * IQR # 箱型图的异常值阈值
# # 结合Z-score和箱型图阈值
# threshold_combined = np.max([threshold_box, np.mean(z_scores) + 2 * np.std(z_scores)]) # 2是Z-score的标准差倍数阈值
# # 识别异常数据
# outliers_iforest = data[scores < threshold_combined]
# # 打印结果
# print("IForest identified outliers:")
# print(outliers_iforest)
周期性数据突增突降主流程
前置
- 判断是周期型数据,或者平稳性数据,或者既不是周期型也不是平稳型数据,周期型数据判断是根据fft频谱与自相关系数判断是否具有周期性,当然也训练时有一天以上数据, 平稳型数据根据ADF检验是否平稳
- 自动判断周期型数据的周期根据ACF自相关系数判断
流程
主要分为两个阶段
fit阶段: 每天获取两天前的三天前到两天前的数据进行训练
通过上下四分位数的差计算出四分位距,绝对下界为下四分位 - 四分位距 * 传入的常数, 绝对上界等于上四分位 加 四分位距 * * 传入的常数,这种绝对上下界在数据预处理异常数据等用重要应用,类似统计学的箱线图算法,随机森林异常值检测也用到四分位距
predict阶段: 每分钟获取两天前到现在的数据进行预测,主要预测最近三十分钟数据是否有异常