U-Net实现语音分离的代码分析

不是自己对着论文复现的,是网上找来自己改的

用到的包

  • Python 3.5
  • Chainer 3.0:一个柔性的神经网络框架,能够简单直观的写出复杂的网络。Chainer 对应地采用了一种叫做 “边定义边运行” 的机制, 即, 网络可以在实际进行前向计算的时候同时被定义。
  • librosa 0.5.0:python的音频处理库
  • cupy 2.0: 一个通过利用CUDA GPU库在Nvidia GPU上实现Numpy数组的库

代码

预处理:ProcessDSD.py

数据集处理,将DSD100数据集的音频文件转换为时频声谱。
DSD 包含两个文件夹,一个是混合音频的文件夹"Mixtures", 另一个是人声、鼓、贝司、其他乐器的分轨音频"Sources"。每个文件夹里包含两个子文件夹,"Dev" 是训练集,"Test"是测试集。

import numpy as np
from librosa.core import load
import util
import os

PATH_DSD_SOURCE = ["DSD100/Sources/Dev", "DSD100/Sources/Test"]
PATH_DSD_MIXTURE = ["DSD100/Mixtures/Dev", "DSD100/Mixtures/Test"]

FILE_MIX = "mixture.wav"
FILE_BASS = "bass.wav"
FILE_DRUMS = "drums.wav"
FILE_OTHER = "other.wav"
FILE_VOCAL = "vocals.wav"


list_source_dir = [os.path.join(PATH_DSD_SOURCE[0], f)
                   for f in os.listdir(PATH_DSD_SOURCE[0])]
list_source_dir.extend([os.path.join(PATH_DSD_SOURCE[1], f)
                        for f in os.listdir(PATH_DSD_SOURCE[1])])
list_source_dir = sorted(list_source_dir)

list_mix_dir = [os.path.join(PATH_DSD_MIXTURE[0], f)
                for f in os.listdir(PATH_DSD_MIXTURE[0])]
list_mix_dir.extend([os.path.join(PATH_DSD_MIXTURE[1], f)
                     for f in os.listdir(PATH_DSD_MIXTURE[1])])
list_mix_dir = sorted(list_mix_dir)


for mix_dir, source_dir in zip(list_mix_dir,  list_source_dir):
    assert(mix_dir.split("/")[-1] == source_dir.split("/")[-1])
    fname = mix_dir.split("/")[-1]
    print("Processing: " + fname)
    y_mix, sr = load(os.path.join(mix_dir, FILE_MIX), sr=None)
    y_vocal, _ = load(os.path.join(source_dir, FILE_VOCAL), sr=None)
    y_inst = sum([load(os.path.join(source_dir, f), sr=None)[0]
                  for f in [FILE_DRUMS, FILE_BASS, FILE_OTHER]])

    assert(y_mix.shape == y_vocal.shape)
    assert(y_mix.shape == y_inst.shape)

    util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)


rand_voc = np.random.randint(100, size=50)
rand_bass = np.random.randint(100, size=50)
rand_drums = np.random.randint(100, size=50)
rand_other = np.random.randint(100, size=50)

count = 1
print("Generating random mix...")
for i_voc, i_bass, i_drums, i_other in \
        zip(rand_voc, rand_bass, rand_drums, rand_other):
    y_vocal, _ = load(os.path.join(list_source_dir[i_voc], FILE_VOCAL), sr=None)
    y_bass, _ = load(os.path.join(list_source_dir[i_bass], FILE_BASS), sr=None)
    y_drums, _ = load(os.path.join(list_source_dir[i_drums], FILE_DRUMS), sr=None)
    y_other, _ = load(os.path.join(list_source_dir[i_other], FILE_OTHER), sr=None)

    minsize = min([y_vocal.size, y_bass.size, y_drums.size, y_other.size])

    y_vocal = y_vocal[:minsize]
    y_inst = y_bass[:minsize] + y_drums[:minsize] + y_other[:minsize]
    y_mix = y_vocal + y_inst

    fname = "dsd_random%02d" % count
    util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)
    print("Saved:" + fname)
    count += 1

主要用到util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)

程序总入口:DoExperiment.py

输入音频路径,训练模型或用现有模型,从原始音频获得分离的人声/分离的音频


"""
Code example for training U-Net
"""
import network

Xlist,Ylist = util.LoadDataset(target="vocal")
print("Dataset loaded.")
network.TrainUNet(Xlist,Ylist,savefile="unet.model",epoch=30)


"""
Code example for performing vocal separation with U-Net
"""
import util

fname = "Say Hello.mp3"
mag, phase = util.LoadAudio(fname)
start = 1024
end = 1024+256

mask = util.ComputeMask(mag[:, start:end], unet_model="/Users/yanyingzi/Study/Signal Seperation/UNet-VocalSeparation-Chainer-master/unet.model", hard=False)

util.SaveAudio(
    "vocal-%s" % fname, mag[:, start:end]*mask, phase[:, start:end])
util.SaveAudio(
    "inst-%s" % fname, mag[:, start:end]*(1-mask), phase[:, start:end])
util.SaveAudio(
    "orig-%s" % fname, mag[:, start:end], phase[:, start:end])

调用了五个主要接口:

util.LoadDataset(target) 加载数据集,target就是要分离的目标,这里为“voice”
network.TrainUNet(Xlist,Ylist,savefile="unet.model",epoch=30) 训练网络
util.LoadAudio(fname) 加载音频
util.ComputeMask(input_mag, unet_model="unet.model", hard=True)计算掩码
util.SaveAudio(fname, mag, phase) 保存音频
(如果有现成的模型,则只需要调用后三个函数)

util.py

def LoadDataset(target="vocal"): # 加载前面处理好的数据集的时频声谱
    filelist_fft = find_files(C.PATH_FFT, ext="npz")[:200]
    Xlist = []
    Ylist = []
    for file_fft in filelist_fft:
        dat = np.load(file_fft)
        Xlist.append(dat["mix"])
        if target == "vocal":
            assert(dat["mix"].shape == dat["vocal"].shape)
            Ylist.append(dat["vocal"])
        else:
            assert(dat["mix"].shape == dat["inst"].shape)
            Ylist.append(dat["inst"])
    return Xlist, Ylist

函数返回的Xlist是混合音频的声谱,Ylist是分轨的target音频的声谱。
find_fileslibrosa.util.find_files

def network.TrainUNet(): # 见后面network.py
def LoadAudio(fname):
    y, sr = load(fname, sr=C.SR) #sr: sampling rate, C.SR = 16000
    spec = stft(y, n_fft=C.FFT_SIZE, hop_length=C.H, win_length=C.FFT_SIZE) # C.FFT_SIZE = 1024,C.H = 512
    mag = np.abs(spec)
    mag /= np.max(mag) # 标准化
    phase = np.exp(1.j*np.angle(spec))
    return mag, phase

loadlibrosa.core.load, 加载各种格式的音频

  • 参数为:
    path:音频路径
    sr:音频频率(可以不用原始的音频频率,他有重采样的功能)
    mono:该值为true时候是单通道、否则为双通道
    offset:读音频的开始时间,也就是可以不从头开始读取音频
    duration:持续时间,可以不加载全部时间,通过与offset合作读取其中一段音频
    dtype:返回的音频信号值的数据格式,一般不设置
    res_type:重采样的格式,一般不用
  • 返回值:
    y:音频的信号值,是个numpy一维数组
    sr:音频的采样值,如果参数没有设置返回的是原始采样率

stftlibrosa.core.stft,短时傅里叶变化

  • 参数为:
    y:信号值
    n_fft: 每个傅里叶窗口包含的样本量(建议为2的指数)
    hop_length: 步长,每帧之间的采样数
    win_length: 窗长,小于等于n_fft(默认 win_length = n_fft,使用整帧)
    window: 窗类型,汉明窗等
    center: 特征是信号的中心还是起始点
    dtype: 返回的音频信号值的数据格式,一般不设置
    pad_model: 对于边缘进行pad
  • 返回值:
    D:短时傅里叶变化后的矩阵(时频声谱)D.shape=(number of frequency bins , number of time frames)=(1 + n_fft/2, n_frames)

(没想通为什么 number of frequency bins = 1 + n_fft/2 )
网上的解释:The continuous Fourier transform possesses symmetries when computed on real signals (Hermitian symmetry). The discrete version, an FFT (of even length) possesses a slighty twisted symmetry.就是说FFT计算出来的结果是频率上对称的,存在"duplicated" in positive and negative frequencies.)
(phase也看不懂)

mag 是magnitude,做了绝对值
phase = np.exp(1.j*np.angle(spec)) 返回spec的复数角度

def SaveAudio(fname, mag, phase):
    y = istft(mag*phase, hop_length=C.H, win_length=C.FFT_SIZE)
    write_wav(fname, y, C.SR, norm=True)

istftlibrosa.core.istft
write_wavlibrosa.output.write_wav

def SaveSpectrogram(y_mix, y_vocal, y_inst, fname, original_sr=44100):
    y_mix = resample(y_mix, original_sr, C.SR)
    y_vocal = resample(y_vocal, original_sr, C.SR)
    y_inst = resample(y_inst, original_sr, C.SR)

    S_mix = np.abs(
        stft(y_mix, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
    S_vocal = np.abs(
        stft(y_vocal, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
    S_inst = np.abs(
        stft(y_inst, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)

    norm = S_mix.max()
    S_mix /= norm
    S_vocal /= norm
    S_inst /= norm

    np.savez(os.path.join(C.PATH_FFT, fname+".npz"),
             mix=S_mix, vocal=S_vocal, inst=S_inst)

stftlibrosa.core.stft,短时傅里叶变化。SaveSpectrogram这个函数是处理数据集时用的,将数据集里的音频转化为声谱。

def ComputeMask(input_mag, unet_model="unet.model", hard=True):
    unet = network.UNet()
    unet.load(unet_model)
    config.train = False
    config.enable_backprop = False
    mask = unet(input_mag[np.newaxis, np.newaxis, 1:, :]).data[0, 0, :, :]
    mask = np.vstack((np.zeros(mask.shape[1], dtype="float32"), mask))
    if hard:  # hard mask
        hard_mask = np.zeros(mask.shape, dtype="float32")
        hard_mask[mask > 0.5] = 1
        return hard_mask
    else:    # soft mask
        return mask

network.UNet() 调用U-Net神经网络,用训练好的模型来计算 hard mask 或 soft mask。

network.np

UNet class

from chainer import Chain, serializers, optimizers, cuda, config
import chainer.links as L
import chainer.functions as F
import numpy as np
#import const

#cp = cuda.cupy

class UNet(Chain):
    def __init__(self):
        super(UNet, self).__init__()
        with self.init_scope():
            self.conv1 = L.Convolution2D(1, 16, 4, 2, 1)
            self.norm1 = L.BatchNormalization(16)
            self.conv2 = L.Convolution2D(16, 32, 4, 2, 1)
            self.norm2 = L.BatchNormalization(32)
            self.conv3 = L.Convolution2D(32, 64, 4, 2, 1)
            self.norm3 = L.BatchNormalization(64)
            self.conv4 = L.Convolution2D(64, 128, 4, 2, 1)
            self.norm4 = L.BatchNormalization(128)
            self.conv5 = L.Convolution2D(128, 256, 4, 2, 1)
            self.norm5 = L.BatchNormalization(256)
            self.conv6 = L.Convolution2D(256, 512, 4, 2, 1)
            self.norm6 = L.BatchNormalization(512)
            self.deconv1 = L.Deconvolution2D(512, 256, 4, 2, 1)
            self.denorm1 = L.BatchNormalization(256)
            self.deconv2 = L.Deconvolution2D(512, 128, 4, 2, 1)
            self.denorm2 = L.BatchNormalization(128)
            self.deconv3 = L.Deconvolution2D(256, 64, 4, 2, 1)
            self.denorm3 = L.BatchNormalization(64)
            self.deconv4 = L.Deconvolution2D(128, 32, 4, 2, 1)
            self.denorm4 = L.BatchNormalization(32)
            self.deconv5 = L.Deconvolution2D(64, 16, 4, 2, 1)
            self.denorm5 = L.BatchNormalization(16)
            self.deconv6 = L.Deconvolution2D(32, 1, 4, 2, 1)

    def __call__(self, X):

        print X.shape

        h1 = F.leaky_relu(self.norm1(self.conv1(X)))
        print h1.shape
        h2 = F.leaky_relu(self.norm2(self.conv2(h1)))
        print h2.shape
        h3 = F.leaky_relu(self.norm3(self.conv3(h2)))
        print h3.shape
        h4 = F.leaky_relu(self.norm4(self.conv4(h3)))
        print h4.shape
        h5 = F.leaky_relu(self.norm5(self.conv5(h4)))
        print h5.shape
        h6 = F.leaky_relu(self.norm6(self.conv6(h5)))
        print h6.shape
        dh = F.relu(F.dropout(self.denorm1(self.deconv1(h6))))
        print dh.shape
        dh = F.relu(F.dropout(self.denorm2(self.deconv2(F.concat((dh, h5))))))
        print dh.shape
        dh = F.relu(F.dropout(self.denorm3(self.deconv3(F.concat((dh, h4))))))
        print dh.shape
        dh = F.relu(self.denorm4(self.deconv4(F.concat((dh, h3)))))
        print dh.shape
        dh = F.relu(self.denorm5(self.deconv5(F.concat((dh, h2)))))
        print dh.shape
        dh = F.sigmoid(self.deconv6(F.concat((dh, h1))))
        print dh.shape
        return dh

    def load(self, fname="unet.model"):
        serializers.load_npz(fname, self)

    def save(self, fname="unet.model"):
        serializers.save_npz(fname, self)

TrainUNet:

class UNetTrainmodel(Chain):
    def __init__(self, unet):
        super(UNetTrainmodel, self).__init__()
        with self.init_scope():
            self.unet = unet

    def __call__(self, X, Y):
        O = self.unet(X)
        self.loss = F.mean_absolute_error(X*O, Y)
        return self.loss


def TrainUNet(Xlist, Ylist, epoch=40, savefile="unet.model"):
    assert(len(Xlist) == len(Ylist))
    unet = UNet()
    model = UNetTrainmodel(unet)
    model.to_gpu(0)
    opt = optimizers.Adam()
    opt.setup(model)
    config.train = True
    config.enable_backprop = True
    itemcnt = len(Xlist)
    itemlength = [x.shape[1] for x in Xlist]
    subepoch = sum(itemlength) // const.PATCH_LENGTH // const.BATCH_SIZE * 4
    for ep in range(epoch):
        sum_loss = 0.0
        for subep in range(subepoch):
            X = np.zeros((const.BATCH_SIZE, 1, 512, const.PATCH_LENGTH),
                         dtype="float32")
            Y = np.zeros((const.BATCH_SIZE, 1, 512, const.PATCH_LENGTH),
                         dtype="float32")
            idx_item = np.random.randint(0, itemcnt, const.BATCH_SIZE)
            for i in range(const.BATCH_SIZE):
                randidx = np.random.randint(
                    itemlength[idx_item[i]]-const.PATCH_LENGTH-1)
                X[i, 0, :, :] = \
                    Xlist[idx_item[i]][1:, randidx:randidx+const.PATCH_LENGTH]
                Y[i, 0, :, :] = \
                    Ylist[idx_item[i]][1:, randidx:randidx+const.PATCH_LENGTH]
            opt.update(model, cp.asarray(X), cp.asarray(Y))
            sum_loss += model.loss.data * const.BATCH_SIZE

        print("epoch: %d/%d  loss=%.3f" % (ep+1, epoch, sum_loss))

    unet.save(savefile)

总结

U-Net其实就是输入空间与输出空间相同的一个映射。在训练时,输入是(MixSpec, TargetSpec) = (混合声谱,目标声谱),用Adam方法优化参数θ来得到一个mask = UNet_θ(MixSpec),MixSpec*mask使得尽可能逼近TargetSpec。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容