本文代码位于GitHub。
1. 概述
下图是一个典型的自适应滤波场景, 输入信号经过一个位置的系统变换后得到参考信号。 自适应滤波器的目标是找出一组滤波器系数来逼近系统, 使输入信号经过变换后,与参考信号误差最小。
若使用FIR滤波器设计, 自适应滤波器的输出就是输入信号与滤波器权值的卷积:
自适应滤波器的算法就是以误差为最小化目标,迭代求出最优的滤波器系数。
2. LMS和NLMS算法
LMS是最广泛应用的自适应滤波算法,以MSE误差为目标函数,以梯度下降为优化算法。并且通常情况下,LMS以最新的输入计算的瞬时梯度替代实际梯度计算,类似于机器学习的随机梯度下降法。
NLMS是使用输入的功率对步长进行归一化的方法,可以取得更好的收敛性能。
时域上实现LMS和NLMS算法的参考资料很多,这里不赘述,下面列出算法迭代步骤。
输入:
输入向量最新样本
期望输出最新样本
输出:
滤波器系数,长度为M的FIR滤波器
滤波器输出
滤波器输出与期望间的误差
初始化:
滤波器系数
迭代过程:
for n = 0, 1, 2...
1.读取输入样本 和期望输出样本
2.滤波:
3.计算误差:
4.更新系数:
LMS:
NLMS:
算法复杂度:
LMS: (次乘法和次加法) * 序列长度
NLMS: (次乘法和次加法) * 序列长度
3. Block LMS算法
LMS算法对输入数据是串行处理的,每输入一个样本,都需要进行次乘法和次加法,对于长度为的信号,计算时间复杂度为。可以通过将输入数据分段并行处理,并且利用频域FFT来做快速卷积,大大减少计算复杂度。
首先需要将串行的LMS算法转变为分块处理,也就是Block LMS(BLMS)。 每次迭代,输入数据被分成长度为的块进行处理。和LMS使用瞬时梯度来进行滤波器参数更新不同,BLMS使用L点的平均梯度来进行参数更新。 也就是机器学习里面Stochastic Gradient Descent 和 Mini-Batch Gradient Descent的区别。 对第块数据,BLMS算法递推公式为:
输入:
输入向量
期望输出
输出:
滤波器系数,长度为M的FIR滤波器
滤波器输出
滤波器输出与期望间的误差
初始化:
滤波器系数
迭代过程:
for k = 0, 1, 2 ... N/L,读入第k块数据,
1.
2. for i = 0, 1, 2 ... L-1
2.1 滤波:
2.2 计算误差:
2.3 累计梯度:
3 更新系数:
算法复杂度:
次乘法和次加法
4. 快速卷积
前面一节描述的BLMS算法跟LMS算法相比,除了用块平均梯度代替瞬时梯度外,并没有不同。 为了提升卷积计算的复杂度,我们需要引入快速卷积。 也就是用FFT计算循环卷积,实现线性卷积的快速分块计算。
长度为L的和长度为M的,线性卷积的结果是长度为的。 时域卷积,频域是相乘,因此有
傅立叶变换在频域上是连续的,计算机做傅立叶运算会将频域离散化采样,转化成点离散傅立叶变换(DFT),并且用FFT算法做快速计算。 将卷积的输入和都补零成点,做一个点的DFT,其逆变换就是循环卷积的结果,时域上是周期为的重复信号。
如上图所示,只有取DFT点数,才能防止卷积结果在时域上面混叠。IDFT结果的前个值就是所需的卷积结果。
于是两组有限长信号的卷积,则转换成3次DFT计算。
直接计算卷积, 大约需要次乘法和次加法。
采用FFT算法,3次DFT计算需要次复数乘法和次复数加法,相对直接计算算法复杂度从降到了。
前面说的是两段有限长信号作一次卷积的流程。回到BLMS,对信号进行分段处理。每段输入长度是, 滤波器长度是。
如果在后面补个零,而在当前块前部保留前一块的最后个点,做点快速卷积,结果中最后点是有效输出,而前点可丢弃。这种分块处理长卷积的方法,叫Overlap-and-Save方法。用FFT快速卷积+Overlap-and-Save方法,就可以高效处理BLMS滤波。
5. LMS算法的快速计算:频域自适应滤波
使用快速卷积的方法实现BLMS,就是频域实现的LMS自适应滤波器。称作Fast Block LMS (FBLMS) 或者 Frequency Domain Adaptive Filter (FDAF)。
对于长度为的滤波器,FBLMS一般采用点FFT,且使用Overlap-and-Savee的快速卷积方法。也就是说,滤波器向量做FFT前须补个零值。
输入向量的分块长度也设为,则FFT的输入前点是保存前一块的数据,后点是当前块数据。
使用overlap-save快速卷积方法实现BLMS中的卷积部分,当前块输出向量为IFFT的后点。
梯度的计算,可以将误差与输入信号放到频域来做,具体推导参考Block Adaptive Filters and Frequency Domain Adaptive Filters
输入:
分块输入信号, 块长度为M
分块参考信号, 块长度为M
输出:
更新的滤波器系数,长度为M的FIR滤波器
分块输出信号, 块长度为M
滤波器输出与参考信号间的误差
初始化:
滤波器系数
初始数据块
迭代过程:
for ,读入第块数据,
1.
2. 计算块输出
3. 计算误差:
4. 计算梯度:
5. 更新滤波器:
算法复杂度:
def process(self, x_b, d_b, update=True):
'''
Process a block data, and updates the adaptive filter (optional)
Parameters
----------
x_b: float
the new input block signal
d_b: float
the new reference block signal
update: bool, optional
whether or not to update the filter coefficients
'''
self.x = np.concatenate([self.x[self.B:], x_b])
# block-update parameters
X = fft(self.x)
y_2B = ifft( X * self.W)
y = np.real(y_2B[self.B:])
e = d_b - y
# Update the parameters of the filter
if self.update:
E = fft(np.concatenate([np.zeros(self.B), e])) # (2B)
if self.nlms:
norm = np.abs(X)**2 + 1e-10
E = E/norm
# Set the upper bound of E, to prevent divergence
m_errThreshold = 0.2
Enorm = np.abs(E) # (2B)
# print(E)
for eidx in range(2*self.B):
if Enorm[eidx]>m_errThreshold:
E[eidx] = m_errThreshold*E[eidx]/(Enorm[eidx]+1e-10)
# Compute the correlation vector (gradient constraint)
phi = np.einsum('i,i->i',X.conj(),E) # (2B)
phi = ifft(phi)
phi[self.B:] = 0
phi = fft(phi)
self.W = self.W + self.mu*phi
self.w = np.real(ifft(self.W)[:self.B])
return y, e
6. 减少时延:滤波器分割的频域自适应滤波
前面是将输入信号分块处理,提高算法效率。当FIR滤波器抽头数量很大时,FBLMS每M点计算一次输出和更新滤波器,造成比较大的延时。
一种想法是将滤波器也进行分割,这种改进延时的滤波器有几种名字:
Partitioned Fast Block LMS (PFBLMS),Frequency Domain Adaptive Filter (PBFDAF), Multi-DelayBlock Filter(MDF), 都是一回事。
如果将长度为的滤波器等分为长度为的小段,。则卷积的结果可以分解为个卷积之叠加。
于是一段线性卷积被分解成个线性卷积,并且可以用FFT+OLS分别计算这个卷积。 这样做好处是,每次迭代只需要输入长度为的信号块,保留原buffer中的后段,与最新的一段作为新的输入,就可以重复以上的段卷积叠加。每次迭代,可以更新点输出信号。
PFBLMS算法的时延就只有FBLMS的,极大地改善了滤波器的可用性。Speex和WebRTC的回声消除代码都使用了这种结构的滤波器。
输入:
分块输入信号, 块长度为B
分块参考信号, 块长度为B
输出:
滤波器系数,长度为M=PB的FIR滤波器
分块输出信号, 块长度为B
滤波器输出与期望间的误差
初始化:
滤波器系数
初始数据块
迭代过程:
for k = 0, 1, 2 ... N/B,读入第k块数据,
1.
2. 滑动重组输入信号,并且计算FFT。
(实际只需计算最后一列,前P-1列可以使用上一次迭代保留结果)
2. 计算块输出
3. 计算误差:
4. 计算梯度:
5. 更新滤波器:
算法复杂度:
与FBLMS相仿
def process(self, x_b, d_b, update=True):
'''
Process a block data, and updates the adaptive filter (optional)
Parameters
----------
x_b: float
the new input block signal
d_b: float
the new reference block signal
update: bool, optional
whether or not to update the filter coefficients
'''
self.x = np.concatenate([self.x[self.B:], x_b])
Xf_b = fft(self.x)
# block-update parameters
self.Xf[1:] = self.Xf[:self.M-1] # Xf : Mx2B sliding window
self.Xf[0] = Xf_b # Xf : Mx2B sliding window
y_2B = ifft(np.sum(self.Xf * self.Wf, axis=0)) # [Px2B] element multiply [Px2B] , then ifft
y = np.real(y_2B[self.B:])
e = d_b - y
if update:
E = fft(np.concatenate([np.zeros(self.B), e])) # (2B)
if self.nlms:
norm = np.abs(Xf_b)**2 + 1e-6
E = E/norm
# Set the upper bound of E, to prevent divergence
m_errThreshold = 0.2
Enorm = np.abs(E) # (2B)
# print(E)
for eidx in range(2*self.B):
if Enorm[eidx]>m_errThreshold:
E[eidx] = m_errThreshold*E[eidx]/(Enorm[eidx]+1e-10)
# Update the parameters of the filter
self.Wf = self.Wf + self.mu*E*self.Xf.conj()
# Compute the correlation vector (gradient constraint)
if self.constrained:
waux = ifft(self.Wf, axis=1)
waux[:, self.B:] = 0
self.Wf = fft(waux, axis=1)
self.w = np.real(ifft(self.Wf, axis=1)[:, :self.B].flatten())
return y, e
参考文献
[1] B. Farhang-Boroujeny, Adaptive Filters: theory and applications. John Wiley & Sons, 2013.
[2] Block Adaptive Filters and Frequency Domain Adaptive Filters