本算法原自KDD19的论文:Time-Series Anomaly Detection Service at Microsoft
另参考论文:Saliency Detection A Spectral Residual Approach
算法介绍:
谱残差算法包含三个主要步骤:(1)通过傅里叶变换获得对数振幅频谱;(2)计算谱残差(3)通过反傅里叶变换将序列还原到原来空间域。
Python实现(参考代码)
#util.py
import numpy as np
import scipy as sc
import pandas as pd
IsAnomaly = "isAnomaly"
AnomalyId = "id"
AnomalyScore = "score"
Value = "value"
Timestamp = "timestamp"
MAX_RATIO = 0.25
EPS = 1e-8
THRESHOLD = 3
MAG_WINDOW = 3
SCORE_WINDOW = 100
def average_filter(values, n=3):
"""
Calculate the sliding window average for the give time series.
Mathematically, res[i] = sum_{j=i-t+1}^{i} values[j] / t, where t = min(n, i+1)
:param values: list.
a list of float numbers
:param n: int, default 3.
window size.
:return res: list.
a list of value after the average_filter process.
"""
if n >= len(values):
n = len(values)
res = np.cumsum(values, dtype=float)
res[n:] = res[n:] - res[:-n]
res[n:] = res[n:] / n
for i in range(1, n):
res[i] /= (i + 1)
return res
def getData(X):
if isinstance(X,list):
X=np.array(X)
elif isinstance(X,pd.Series):
X=X.values
elif isinstance(X,np.ndarray):
X=X
else:
raise Exception('This data format (%s) is not supported' % type(X))
return X
def guass_filter(X,k=5):
raise NotImplementedError
#SR.py
import numpy as np
import scipy as sc
import pandas as pd
from util import *
from scipy.fftpack import fft,ifft
class SR():
'''
This module realises a spectral residual method for anomaly detection.
The input data suppose list,np.ndarray and pd.Series
'''
def __init__(self,X=np.array([]),slice_window=3,map_window=3,tresh=1):
self.slice_window=slice_window
self.X=getData(X)
self.map_window=map_window
self.thresh=tresh
#raise NotImplementedError
def run(self):
Smap=self.getSalienceMap(self.X)
result=np.array([1 if i > self.thresh else 0 for i in Smap])
return result
def setdata(self,data):
self.X=getData(data)
def setslicewindow(self,thresh):
self.slice_window=thresh
def plot(self):
raise NotImplementedError
def getSR(self,X):
'''
傅里叶变化、残差谱、反傅里叶变化
'''
X=getData(X)
#spectral_residual_transform
yy=fft(X)
A=yy.real
P=yy.imag
V=np.sqrt(A**2+P**2)
eps_index = np.where(V <= EPS)[0]
V[eps_index]=EPS
L=np.log(V)
L[eps_index]=0
residual=np.exp(L-average_filter(L,self.map_window))
yy.imag=residual*P/V
yy.real=residual*A/V
yy.imag[eps_index]=0
yy.real[eps_index]=0
result=ifft(yy)
S=np.sqrt(result.real**2+result.imag**2)
#guass filter
return S
def getSalienceMap(self,X):
Map=self.getSR(self.extendseries(X))[:len(X)]
ave_mag = average_filter(Map, n=self.slice_window)
ave_mag[np.where(ave_mag <= EPS)] = EPS
return abs(Map - ave_mag) / ave_mag
def estimate(self,X):
'''
get k estimated points which is equal to x(n+1)
x(n+1)=x(n-m+1)+m*g
g=sum(g(x(n),x(n-i)))/m
'''
n=len(X)
gradients=[(X[-1]-v)/(n-1-i) for i, v in enumerate(X[:-1])]
#g=np.sum(gradients)/m
return X[1]+np.sum(gradients)
def extendseries(self,X,k=5):
'''
use k to extend oringe serie;
'''
X=np.append(X,self.estimate(X[-k-2:-1]).repeat(k))
return X