一、什么是迁移学习
聪明人都喜欢"偷懒",因为这样的偷懒能帮我们节省大量的时间提高效率。有一种偷懒是 "站在巨人的肩膀上",也就是表示要善于学习先辈的经验。这句话放在机器学习中就是指的迁移学习。
迁移学习是一种机器学习技术,顾名思义就是指将知识从一个领域迁移到另一个领域的能力。
我们知道,神经网络需要用数据来训练,它从数据中获得信息,进而把它们转换成相应的权重。这些权重能够被提取出来,迁移到其他的神经网络中,我们"迁移"了这些学来的特征,就不需要从零开始训练一个神经网络了 。
迁移学习的价值
复用现有知识域数据,已有的大量工作不至于完全丢弃;
不需要再去花费巨大代价去重新采集和标定庞大的新数据集,也有可能数据根本无法获取;
对于快速出现的新领域,能够快速迁移和应用,体现时效性优势。
二、迁移学习的载体:预训练模型
在计算机视觉领域中,迁移学习通常是通过使用预训练模型来体现的。预训练模型是在大型基准数据集上训练的模型,用于解决相似的问题。由于训练这种模型的计算成本较高,因此,导入已发布的成果并使用相应的模型是比较常见的做法。
1、keras.Application
Kera的应用模块Application提供了带有预训练权重的Keras模型,这些模型可以用来进行预测、特征提取和finetune。
目前,Keras 包含有 5 个预训练模型,分别为:VGG16,VGG19,ResNet50,InceptionV3,Xception,MobileNet
(1)VGG16/ VGG19
Keras 导入 VGG16 和 VGG19 模型及默认参数如下:
from keras.applications import vgg16
from keras.applications import vgg19
vgg16.VGG16(include_top=True, weights='imagenet', input_tensor=None, input_shape=None, pooling=None, classes=1000)
vgg19.VGG19(include_top=True, weights='imagenet', input_tensor=None, input_shape=None, pooling=None, classes=1000)
(2)ResNet50
Keras 导入 ResNet50 模型及默认参数如下:
from keras.applications import resnet50
resnet50.ResNet50(include_top=True, weights='imagenet', input_tensor=None, input_shape=None, pooling=None, classes=1000)
(3)InceptionV3
Keras 导入 InceptionV3 模型及默认参数如下:
from keras.applications import inception_v3
inception_v3.InceptionV3(include_top=True, weights='imagenet', input_tensor=None, input_shape=None, pooling=None, classes=1000)
(4)Xception
Keras 导入 Xception 模型及默认参数如下
from keras.applications import xception
xception.Xception(include_top=True, weights='imagenet', input_tensor=None, input_shape=None, pooling=None, classes=1000)
(5)MobileNet
Keras 导入 MobileNet 模型及默认参数如下:
from keras.applications import mobilenet
mobilenet.MobileNet(input_shape=None, alpha=1.0, depth_multiplier=1, dropout=1e-3, include_top=True, weights='imagenet', input_tensor=None, pooling=None, classes=1000)
举例:使用预训练模型输出图像分类预测
import numpy as np
from keras.preprocessing import image
from keras.applications.inception_v3 import InceptionV3
from keras.applications.inception_v3 import preprocess_input
from keras.applications.inception_v3 import decode_predictions
# 新建模型,此处实际上是导入预训练模型
model = InceptionV3()
model.summary()
# 按照 InceptionV3 模型的默认输入尺寸,载入 demo1 图像
img = image.load_img('demo1.jpg', target_size=(299, 299))
# 提取特征
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
# 预测并输出概率最高的三个类别
preds = model.predict(x)
print('Predicted:', decode_predictions(preds, top=3)[0])
2、自己下载预训练权重
VGG16:
WEIGHTS_PATH = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.1/vgg16_weights_tf_dim_ordering_tf_kernels.h5’
WEIGHTS_PATH_NO_TOP = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.1/vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5’
VGG19:
WEIGHTS_PATH = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.4/xception_weights_tf_dim_ordering_tf_kernels.h5’
WEIGHTS_PATH_NO_TOP = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.4/xception_weights_tf_dim_ordering_tf_kernels_notop.h5’
RESNET50:
WEIGHTS_PATH = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.2/resnet50_weights_tf_dim_ordering_tf_kernels.h5’
WEIGHTS_PATH_NO_TOP = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.2/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5’
INCEPTIONS_V3:
WEIGHTS_PATH = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.5/inception_v3_weights_tf_dim_ordering_tf_kernels.h5’
WEIGHTS_PATH_NO_TOP = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.5/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5’
XCEPTION:
WEIGHTS_PATH = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.4/xception_weights_tf_dim_ordering_tf_kernels.h5’
WEIGHTS_PATH_NO_TOP = ‘https://github.com/fchollet/deep-learning-models/releases/download/v0.4/xception_weights_tf_dim_ordering_tf_kernels_notop.h5’
三、代码实现
现在应用迁移学习来实现一个特定的图像分类任务
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import os, shutil
FILE_DIR = os.path.dirname(os.path.abspath(__file__))
from tqdm import tqdm
from sklearn.datasets import load_files
# 载入画图所需要的库 matplotlib
import matplotlib.pyplot as plt
# 导入karas神经网络框架
import keras
from keras.optimizers import Adam
from keras.models import Model
from keras.utils import np_utils,plot_model
from keras.preprocessing import image
from keras.applications import inception_v3
from keras.layers import Conv2D, GlobalAveragePooling2D, Activation, Dropout, Dense
from keras.callbacks import ModelCheckpoint,EarlyStopping,ReduceLROnPlateau,TensorBoard
#========================================================
# 全局参数
#========================================================
# 训练参数
num_epochs = 10
batch_size = 32
# 模型参数存储
weight_url = 'D://saved_models/V3.hdf5'
best_weight_url = 'D://saved_models/V3_best.hdf5'
#========================================================
# 文件准备
#========================================================
def image_preparation(original_dir, base_dir, labels):
'''
图像分类文件准备, 将文件复制到训练\验证\测试集目录
INPUT -> 原始数据集地址, 数据集存放地址, 分类列表
'''
# 定义文件地址
train_dir = os.path.join(base_dir, 'train')
if not os.path.exists(train_dir):
os.mkdir(train_dir)
validation_dir = os.path.join(base_dir, 'validation')
if not os.path.exists(validation_dir):
os.mkdir(validation_dir)
test_dir = os.path.join(base_dir, 'test')
if not os.path.exists(test_dir):
os.mkdir(test_dir)
names = locals()
# 图片迁移
for label in labels:
names["train_"+str(label)+"dir"] = os.path.join(train_dir, str(label))
if not os.path.exists(names["train_"+str(label)+"dir"]):
os.mkdir(names["train_"+str(label)+"dir"])
names["validation_"+str(label)+"dir"] = os.path.join(validation_dir, str(label))
if not os.path.exists(names["validation_"+str(label)+"dir"]):
os.mkdir(names["validation_"+str(label)+"dir"])
names["test_"+str(label)+"dir"] = os.path.join(test_dir, str(label))
if not os.path.exists(names["test_"+str(label)+"dir"]):
os.mkdir(names["test_"+str(label)+"dir"])
fnames = [str(label)+'.{}.jpg'.format(i) for i in range(1000)]
for fname in fnames:
src = os.path.join(original_dir, fname)
dst = os.path.join(names["train_"+str(label)+"dir"], fname)
shutil.copyfile(src, dst)
fnames = [str(label)+'.{}.jpg'.format(i) for i in range(1000, 1500)]
for fname in fnames:
src = os.path.join(original_dir, fname)
dst = os.path.join(names["validation_"+str(label)+"dir"], fname)
shutil.copyfile(src, dst)
fnames = [str(label)+'.{}.jpg'.format(i) for i in range(1500, 2000)]
for fname in fnames:
src = os.path.join(original_dir, fname)
dst = os.path.join(names["test_"+str(label)+"dir"], fname)
shutil.copyfile(src, dst)
print('total train '+str(label)+' images:', len(os.listdir(names["train_"+str(label)+"dir"])))
print('total validation '+str(label)+' images:', len(os.listdir(names["validation_"+str(label)+"dir"])))
print('total test '+str(label)+' images:', len(os.listdir(names["test_"+str(label)+"dir"])))
# 将数据分别存到各个文件夹
originial_dataset_dir = 'D:\download\kaggle_original_data'
base_dir = 'D:\cats_and_dogs'
if not os.path.exists(base_dir):
os.mkdir(base_dir)
image_preparation(originial_dataset_dir, base_dir, ['cat','dog'])
# 分类数
n_classes = 0
for fn in os.listdir(os.path.join(base_dir, 'train')):
n_classes += 1
#========================================================
# 图像加载
#========================================================
def load_dataset(path):
data = load_files(path)
data_files = np.array(data['filenames'])
data_targets = np_utils.to_categorical(np.array(data['target']), n_classes)
return data_files, data_targets
train_files, train_targets = load_dataset(os.path.join(base_dir, 'train'))
valid_files, valid_targets = load_dataset(os.path.join(base_dir, 'validation'))
#========================================================
# 图像预处理
#========================================================
def path_to_tensor(img_path):
'''单个图片格式处理'''
img = image.load_img(img_path, target_size=(299, 299))
x = image.img_to_array(img)
# 将3维张量转化为格式为(1, 299, 299, 3)的4维张量并进行归一化到0-1
return np.expand_dims(x, axis=0).astype('float32')/255.0
def paths_to_tensor(img_paths):
'''批量图片格式处理'''
list_of_tensors = [path_to_tensor(img_path) for img_path in tqdm(img_paths)]
return np.vstack(list_of_tensors)
train_tensors = paths_to_tensor(train_files)
valid_tensors = paths_to_tensor(valid_files)
#========================================================
# 模型声明
#========================================================
def InceptionV3_model(lr=0.005):
'''构造基于InceptionV3的迁移学习模型'''
base_model = inception_v3.InceptionV3(weights='imagenet', include_top=False)
# 冻结base_model所有层,这样就可以正确获得bottleneck特征
for layer in base_model.layers:
layer.trainable = False
x = base_model.output
# 重新配置全连接层,添加自己的全链接分类层
x = GlobalAveragePooling2D(name='average_pooling2d_new')(x)
x = Dense(1024, activation='relu', name='dense_new')(x)
predictions = Dense(n_classes, activation='softmax', name='dense_output')(x)
# 创建最终模型
model = Model(inputs=base_model.input, outputs=predictions)
# 模型编译
adam = Adam(lr=lr, beta_1=0.9, beta_2=0.999, epsilon=1e-8)
model.compile(loss='categorical_crossentropy',
optimizer=adam,
metrics=['accuracy'])
# model.summary()
# plot_model(model, to_file='V3_model.png')
return model
# 实例化模型
V3_model = InceptionV3_model()
#========================================================
# 模型训练
#========================================================
def train(X, Y, X_val, Y_val, model):
# 载入已保存的权重, 继续训练
if os.path.exists(weight_url):
model.load_weights(weight_url)
# 训练过程中的回调函数(检查点\早期停止\动态学习率\训练日志)
Checkpoint = ModelCheckpoint(filepath=best_weight_url,
save_best_only=True,
verbose=1)
EarlyStop = EarlyStopping(monitor='val_loss',
patience=5,
mode='auto',
verbose=1)
lrate = ReduceLROnPlateau(monitor='val_loss',
factor=0.1, # 每次减少学习率的因子,学习率将以lr = lr*factor的形式被减少
patience=3, # 当patience个epoch过去而模型性能不提升时,学习率减少的动作会被触发
mode='auto',
min_delta=0.0001, # 阈值,用来确定是否进入检测值的“平原区”
cooldown=0, # 学习率减少后,会经过cooldown个epoch才重新进行正常操作
min_lr=0, # 学习率的下限
verbose=1)
tb = TensorBoard(log_dir=FILE_DIR, # log 目录
histogram_freq=1, # 按照何等频率(epoch)来计算直方图,0为不计算
batch_size=batch_size, # 用多大量的数据计算直方图
write_graph=True, # 是否存储网络结构图
write_grads=False, # 是否可视化梯度直方图
write_images=False, # 是否可视化参数
embeddings_freq=0,
embeddings_layer_names=None,
embeddings_metadata=None)
history_ft = model.fit(X, Y,
validation_data = (X_val, Y_val),
# validation_split = 0.2,
epochs=num_epochs,
batch_size=batch_size,
# steps_per_epoch=None, # steps_per_epoch=10,则就是将一个epoch分为10份,不能和batch_size共同使用
# validation_steps=None, # 当steps_per_epoch被启用的时候才有用,验证集的batch_size
callbacks=[Checkpoint, EarlyStop, lrate, tb],
verbose=1
)
# 参数保存,留待下次继续训练
model.save_weights(weight_url, overwrite=True)
return history_ft
def plot_training(data):
'''绘制模型正确率曲线和损失曲线'''
acc = history.history['acc']
val_acc = history.history['val_acc']
loss = history.history['loss']
val_loss = history.history['val_loss']
# 正确率曲线
plt.figure()
plt.title('Train and valid accuracy')
plt.plot(data.epoch,acc,label="train_acc")
plt.plot(data.epoch,val_acc,label="val_acc")
plt.scatter(data.epoch,data.history['acc'],marker='*')
plt.scatter(data.epoch,data.history['val_acc'],marker='*')
plt.legend()
plt.show()
# 损失曲线
plt.figure()
plt.title('Train and valid loss')
plt.plot(data.epoch,loss,label="train_loss")
plt.plot(data.epoch,val_loss,label="val_loss")
plt.scatter(data.epoch,data.history['loss'],marker='*')
plt.scatter(data.epoch,data.history['val_loss'],marker='*')
plt.legend()
plt.show()
history = train(X=train_tensors, Y=train_targets, X_val=valid_tensors, Y_val=valid_targets, model=V3_model)
plot_training(history)
#========================================================
# 模型预测
#========================================================
def img_predict(model, img_path):
'''判断单张图片'''
prediction = model.predict(path_to_tensor(img_path))
index = np.argmax(prediction)
return index
# 加载最佳的模型参数
V3_model.load_weights(best_weight_url)
img_predict(V3_model, 'D://cats_and_dogs/test/cat/cat.8.jpg')