前言
在这篇文章中,将会使用python
实现异常检测算法检测服务器中的异常行为,我们将检测每一台服务器上吞吐量(mb/s)和延迟(ms)数据。在服务器运行过程中收集个样本,这些样本都是无标签的,假设这些样本中的大多数数据表示服务器的正常行为,而少数样本的数据是表示服务器的异常行为。
算法实现
数据可视化
首先,通过二维图像可视化显示这些数据集,并且通过这些数据拟合高斯分布,并且可以发现某一些数据出现的概率很低,因此可以认为这些数据是异常数据。
加载数据之后,通过二维可视化之后如下所示:
plt.ion()
data = scio.loadmat('ex8data1.mat')
X = data['X']
Xval = data['Xval']
yval = data['yval'].flatten()
plt.figure()
plt.scatter(X[:, 0], X[:, 1], c='b', marker='x', s=15, linewidth=1)
plt.axis([0, 30, 0, 30])
plt.xlabel('Latency (ms)')
plt.ylabel('Throughput (mb/s')
如下图所示
将数据集可视化显示之后,可以看到,大多数数据落点比较密集,个别数据的落点比较分散,而这些数据很可能就是异常数据。
高斯分布
给定数据集X
,需要计算高斯分布的参数,即也就是均值和方差,高斯分布参数的计算公式如下所示:
,
以上公式的代码实现如下所示:
def estimate_gaussian(X):
# Useful variables
m, n = X.shape
mu = np.zeros(n)
sigma2 = np.zeros(n)
mu = np.mean(X,axis=0)
sigma2 = np.var(X,axis=0)
return mu, sigma2
代码中涉及到多元高斯模型,其计算公式如下所示:
代码实现如下所示:
# 多维高斯分布
def multivariate_gaussian(X, mu, sigma2):
k = mu.size
if sigma2.ndim == 1 or (sigma2.ndim == 2 and (sigma2.shape[1] == 1 or sigma2.shape[0] == 1)):
# 转换为对角矩阵
sigma2 = np.diag(sigma2)
x = X - mu
p = (2 * np.pi) ** (-k / 2) * np.linalg.det(sigma2) ** (-0.5) * np.exp(-0.5*np.sum(np.dot(x, np.linalg.pinv(sigma2)) * x, axis=1))
return p
对数据可视化显示并且以等高线形式显示,实现代码如下所示:
#拟合数据并可视化
def visualize_fit(X, mu, sigma2):
#生成两个方阵,从0开始,到35.5 并且每一列递增速度为0.5
grid = np.arange(0, 35.5, 0.5)
x1, x2 = np.meshgrid(grid, grid)
# x1.flatten('F') 以列优先展开
Z = multivariate_gaussian(np.c_[x1.flatten('F'), x2.flatten('F')], mu, sigma2)
Z = Z.reshape(x1.shape, order='F')
plt.figure()
plt.scatter(X[:, 0], X[:, 1], marker='x', c='b', s=15, linewidth=1)
if np.sum(np.isinf(X)) == 0:
lvls = 10 ** np.arange(-20, 0, 3).astype(np.float)
# 画出等高线图
plt.contour(x1, x2, Z, levels=lvls, colors='r', linewidths=0.7)
最后,可视化显示得到的图像如下所示,以等高线形式表示概率的大小,等高线从外向内,表示概率以此变大,从图像中可以看出大多数数据位于概率较高的地区,而异常数据应该处于概率较低的地区。
visualize_fit(X, mu, sigma2)
plt.xlabel('Latency (ms)')
plt.ylabel('Throughput (mb/s')
input('Program paused. Press ENTER to continue')
参数的选择
为了选择合适的参数,将会使用交叉验证集,其中标签表示此样本为异常样本,而标签表示正常样本,对于每一个交叉验证样本,利用高斯公式会得到一个概率值,最后通过高斯公式计算出所有交叉验证集的概率值,并将其组合得到一个向量,通过此概率向量和输出所组成的向量可以计算参数。
对于给定的参数,如果样本通过高斯分布得到的概率,则该样本为异常样本,引入一个参数评估不同的参数的工作性能。
参数的计算公式如下所示:
其中
- 表示正确的正样本的个数,表示算法能够正确的将其识别为异常样本。
- 表示错误的正样本个数,并且表示算法将正常样本识别为异常样本。
- 表示错误的负样本数量,并且表示算法将异常样本识别为正常样本。
注意:通过高斯分布得到的概率小于,则认为其预测值是1,否则为0,通过交叉验证集得到的一系列概率值与比较可以得到一组二进制向量,对于以上中,参数的计算可以直接通过验证集样本和通过与操作,再通过
np.sum()
函数可以直接得到其数量。
最后,以上过程的实现代码如下所示:
def select_threshold(yval, pval):
f1 = 0
best_eps = 0
best_f1 = 0
for epsilon in np.linspace(np.min(pval), np.max(pval), num=1001):
False(0)'s and True(1)'s of the outlier predictions
predictions = np.less(pval, epsilon) #pval < epsilon = 0
tp = np.sum(np.logical_and(predictions, yval)) #tp = 8
fp = np.sum(np.logical_and(predictions, yval == 0)) # fp = 8
fn = np.sum(np.logical_and(np.logical_not(predictions), yval == 1)) #fn = 2
precision = tp / (tp + fp)
recall = tp / (tp + fn)
f1 = (2 * precision * recall) / (precision + recall)
# 迭代寻找最佳值
if f1 > best_f1:
best_f1 = f1
best_eps = epsilon
return best_eps, best_f1
载入数据,运行以上函数
data = scio.loadmat('ex8data2.mat')
X = data['X']
Xval = data['Xval']
yval = data['yval'].flatten()
# Apply the same steps to the larger dataset
mu, sigma2 = estimate_gaussian(X)
# Training set
p = multivariate_gaussian(X, mu, sigma2)
# Cross Validation set
pval = multivariate_gaussian(Xval, mu, sigma2)
# Find the best threshold
epsilon, f1 = select_threshold(yval, pval)
print('Best epsilon found using cross-validation: {:0.4e}'.format(epsilon))
print('Best F1 on Cross Validation Set: {:0.6f}'.format(f1))
print('# Outliers found: {}'.format(np.sum(np.less(p, epsilon))))
通过运行以上代码,得到的值如下图所示: