前言
VGG是Oxford的Visual Geometry Group的组提出的(大家应该能看出VGG名字的由来了)。该网络是在ILSVRC 2014上的相关工作,主要工作是证明了增加网络的深度能够在一定程度上影响网络最终的性能。VGG有两种结构,分别是VGG16和VGG19,两者并没有本质上的区别,只是网络深度不一样。
VGG原理
VGG16相比AlexNet的一个改进是采用连续的几个3x3的卷积核代替AlexNet中的较大卷积核(11x11,7x7,5x5)。对于给定的感受野(与输出有关的输入图片的局部大小),采用堆积的小卷积核是优于采用大的卷积核,因为多层非线性层可以增加网络深度来保证学习更复杂的模式,而且代价还比较小(参数更少)。
简单来说,在VGG中,使用了3个3x3卷积核来代替7x7卷积核,使用了2个3x3卷积核来代替5*5卷积核,这样做的主要目的是在保证具有相同感知野的条件下,提升了网络的深度,在一定程度上提升了神经网络的效果。
比如,3个步长为1的3x3卷积核的一层层叠加作用可看成一个大小为7的感受野(其实就表示3个3x3连续卷积相当于一个7x7卷积),其参数总量为 3x(9xC^2) ,如果直接使用7x7卷积核,其参数总量为 49xC^2 ,这里 C 指的是输入和输出的通道数。很明显,27xC2小于49xC2,即减少了参数;而且3x3卷积核有利于更好地保持图像性质。
这里解释一下为什么使用2个3x3卷积核可以来代替5*5卷积核:
5x5卷积看做一个小的全连接网络在5x5区域滑动,我们可以先用一个3x3的卷积滤波器卷积,然后再用一个全连接层连接这个3x3卷积输出,这个全连接层我们也可以看做一个3x3卷积层。这样我们就可以用两个3x3卷积级联(叠加)起来代替一个 5x5卷积。
具体如下图所示:
至于为什么使用3个3x3卷积核可以来代替7*7卷积核,推导过程与上述类似,大家可以自行绘图理解。
VGG网络结构
下面是VGG网络的结构(VGG16和VGG19都在):
VGG16包含了16个隐藏层(13个卷积层和3个全连接层),如上图中的D列所示
VGG19包含了19个隐藏层(16个卷积层和3个全连接层),如上图中的E列所示
VGG网络的结构非常一致,从头到尾全部使用的是3x3的卷积和2x2的max pooling。
如果你想看到更加形象化的VGG网络,可以使用经典卷积神经网络(CNN)结构可视化工具来查看高清无码的VGG网络。
VGG优缺点
VGG优点
VGGNet的结构非常简洁,整个网络都使用了同样大小的卷积核尺寸(3x3)和最大池化尺寸(2x2)。
几个小滤波器(3x3)卷积层的组合比一个大滤波器(5x5或7x7)卷积层好:
验证了通过不断加深网络结构可以提升性能。
VGG缺点
VGG耗费更多计算资源,并且使用了更多的参数(这里不是3x3卷积的锅),导致更多的内存占用(140M)。其中绝大多数的参数都是来自于第一个全连接层。VGG可是有3个全连接层啊!
PS:有的文章称:发现这些全连接层即使被去除,对于性能也没有什么影响,这样就显著降低了参数数量。
注:很多pretrained的方法就是使用VGG的model(主要是16和19),VGG相对其他的方法,参数空间很大,最终的model有500多m,AlexNet只有200m,GoogLeNet更少,所以train一个vgg模型通常要花费更长的时间,所幸有公开的pretrained model让我们很方便的使用。
关于感受野:
假设你一层一层地重叠了3个3x3的卷积层(层与层之间有非线性激活函数)。在这个排列下,第一个卷积层中的每个神经元都对输入数据体有一个3x3的视野。
代码篇:VGG训练与测试
这里推荐两个开源库,训练请参考tensorflow-vgg,快速测试请参考VGG-in TensorFlow。
代码我就不介绍了,其实跟上述内容一致,跟着原理看code应该会很快。我快速跑了一下VGG-in TensorFlow,代码亲测可用,效果很nice,就是model下载比较烦。
# -- encoding:utf-8 --
"""
Create on 19/5/25 10:06
"""
import os
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
# 定义外部传入的参数
tf.app.flags.DEFINE_bool(flag_name="is_train",
default_value=True,
docstring="给定是否是训练操作,True表示训练,False表示预测!!")
tf.app.flags.DEFINE_string(flag_name="checkpoint_dir",
default_value="./mnist/models/models_vgg",
docstring="给定模型存储的文件夹,默认为./mnist/models/models_vgg")
tf.app.flags.DEFINE_string(flag_name="logdir",
default_value="./mnist/graph/graph_vgg",
docstring="给定模型日志存储的路径,默认为./mnist/graph/graph_vgg")
tf.app.flags.DEFINE_integer(flag_name="batch_size",
default_value=8,
docstring="给定训练的时候每个批次的样本数目,默认为16.")
tf.app.flags.DEFINE_integer(flag_name="store_per_batch",
default_value=100,
docstring="给定每隔多少个批次进行一次模型持久化的操作,默认为100")
tf.app.flags.DEFINE_integer(flag_name="validation_per_batch",
default_value=100,
docstring="给定每隔多少个批次进行一次模型的验证操作,默认为100")
tf.app.flags.DEFINE_float(flag_name="learning_rate",
default_value=0.001,
docstring="给定模型的学习率,默认0.01")
FLAGS = tf.app.flags.FLAGS
def create_dir_with_not_exits(dir_path):
"""
如果文件的文件夹路径不存在,直接创建
:param dir_path:
:return:
"""
if not os.path.exists(dir_path):
os.makedirs(dir_path)
def layer_normalization(net, eps=1e-8):
# 缩放参数、平移参数y=gamma * x + beta
gamma = tf.get_variable('gamma', shape=[],
initializer=tf.constant_initializer(1))
beta = tf.get_variable('beta', shape=[],
initializer=tf.constant_initializer(0))
# 计算当前批次的均值和标准差
mean, variance = tf.nn.moments(net, axes=(1, 2, 3), keep_dims=True)
# 执行批归一化操作
return tf.nn.batch_normalization(net, mean, variance,
offset=beta, scale=gamma, variance_epsilon=eps)
def create_model(input_x, show_image=False):
"""
构建模型(VGG 11)
:param input_x: 占位符,格式为[None, 784]
:param show_image:是否可视化图像
:return:
"""
# 定义一个网络结构: conv3-64 -> LRN -> MaxPooling -> conv3-128 -> MaxPooling -> conv3-256 -> conv3-256 -> MaxPooling -> FC1024 -> FC10
with tf.variable_scope("net",
initializer=tf.random_normal_initializer(0.0, 0.0001)):
with tf.variable_scope("Input"):
# 这里定义一些图像的处理方式,包括:格式转换、基础处理(大小、剪切...)
net = tf.reshape(input_x, shape=[-1, 28, 28, 1])
print(net.get_shape())
if show_image:
# 可视化图像
tf.summary.image(name='image', tensor=net, max_outputs=5)
# 定义一个网络结构
# layers = [
# ["conv", 3, 3, 1, 64, 1, "relu"],
# ["lrn"],
# ["max_pooling", 2, 2, 2],
# ["conv", 3, 3, 1, 128, 1, "relu"],
# ["max_pooling", 2, 2, 2],
# ["conv", 3, 3, 1, 256, 2, "relu"],
# ["max_pooling", 2, 2, 2],
# ["reshape"],
# ["FC", 1024, "relu"],
# ["FC", 10]
# ]
# layers = [
# ["conv", 3, 3, 1, 64, 1, "relu"],
# ["lrn"],
# ["max_pooling", 2, 2, 2],
# ["conv", 3, 3, 1, 128, 2, "relu"],
# ["ln"],
# ["max_pooling", 2, 2, 2],
# ["conv", 3, 3, 1, 256, 2, "relu"],
# ["ln"],
# ["max_pooling", 2, 2, 2],
# ["reshape"],
# ["FC", 1024, "relu"],
# ["FC", 10]
# ]
layers = [
["conv", 3, 3, 1, 32, 2, "relu"],
["max_pooling", 2, 2, 2],
["conv", 3, 3, 1, 64, 2, "relu"],
# 第一个是池化,第二个窗口高度,第三个是窗口的宽度,第四个是步长
["max_pooling", 2, 2, 2],
["reshape"],
["FC", 1024, "relu"],
["FC", 10]
]
for idx, layer in enumerate(layers):
shape = net.get_shape()
name = layer[0]
if "conv" == name:
# a. 获取相关的参数
# ["conv", 3, 3, 1, 64, 1, "relu" ] -> 名称 窗口高度 窗口宽度 步长(一个值) 输出通道数 重复几个卷积 激活函数(None表示不激活)
filter_height, filter_width, stride, out_channels, num_conv = layer[1:6]
try:
ac = layer[6]
except:
ac = None
# 遍历进行卷积层的构建
for i in range(num_conv):
with tf.variable_scope("CONV_{}_{}".format(idx, i)):
# 获取当前卷积的输入的通道数
shape = net.get_shape()
in_channels = shape[-1]
# 构建变量
filter = tf.get_variable(name='w', shape=[filter_height, filter_width,
in_channels, out_channels])
bias = tf.get_variable(name='b', shape=[out_channels])
# 卷积操作
net = tf.nn.conv2d(input=net, filter=filter,
strides=[1, stride, stride, 1], padding='SAME')
net = tf.nn.bias_add(net, bias)
# 做一个激活操作
if ac is not None:
if "relu" == ac:
net = tf.nn.relu(net)
elif "relu6" == ac:
net = tf.nn.relu6(net)
else:
net = tf.nn.sigmoid(net)
if show_image:
# 对于卷积之后的值做一个可视化操作
shape = net.get_shape()
for k in range(shape[-1]):
image_tensor = tf.reshape(net[:, :, :, k], shape=[-1, shape[1], shape[2], 1])
tf.summary.image(name='image', tensor=image_tensor, max_outputs=5)
elif "lrn" == name:
with tf.variable_scope("LRN_{}".format(idx)):
# lrn(input, depth_radius=5, bias=1, alpha=1, beta=0.5, name=None)
# depth_radius就是ppt上的n,bias就是ppt上的k,beta就是β,alpha就是α
net = tf.nn.local_response_normalization(input=net, depth_radius=5,
bias=1, alpha=1, beta=0.5)
elif "max_pooling" == name:
with tf.variable_scope("Max_Pooling_{}".format(idx)):
ksize_height = layer[1]
ksize_width = layer[2]
stride = layer[3]
net = tf.nn.max_pool(value=net,
ksize=[1, ksize_height, ksize_width, 1],
strides=[1, stride, stride, 1], padding='SAME')
elif "FC" == name:
with tf.variable_scope("FC_{}".format(idx)):
# 获取相关变量,输入的维度,输出的维度大小以及激活函数
dim_size = shape[-1]
unit_size = layer[1]
try:
ac = layer[2]
except:
ac = None
w = tf.get_variable(name='w', shape=[dim_size, unit_size])
b = tf.get_variable(name='b', shape=[unit_size])
net = tf.matmul(net, w) + b
# 做一个激活操作
if ac is not None:
if "relu" == ac:
net = tf.nn.relu(net)
elif "relu6" == ac:
net = tf.nn.relu6(net)
else:
net = tf.nn.sigmoid(net)
elif "reshape" == name:
with tf.variable_scope('reshape'):
dim_size = shape[1] * shape[2] * shape[3]
net = tf.reshape(net, shape=[-1, dim_size])
elif "ln" == name:
with tf.variable_scope("LN_{}".format(idx)):
net = layer_normalization(net)
with tf.variable_scope("Prediction"):
# 每行的最大值对应的下标就是当前样本的预测值
predictions = tf.argmax(net, axis=1)
return net, predictions
def create_loss(labels, logits):
"""
基于给定的实际值labels和预测值logits进行一个交叉熵损失函数的构建
:param labels: 是经过哑编码之后的Tensor对象,形状为[n_samples, n_class]
:param logits: 是神经网络的最原始的输出,形状为[n_samples, n_class], 每一行最大值那个位置对应的就是预测类别,没有经过softmax函数转换。
:return:
"""
with tf.name_scope("loss"):
# loss = tf.reduce_mean(-tf.log(tf.reduce_sum(labels * tf.nn.softmax(logits))))
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=labels, logits=logits))
tf.summary.scalar('loss', loss)
return loss
def create_train_op(loss, learning_rate=0.0001, global_step=None):
"""
基于给定的损失函数构建一个优化器,优化器的目的就是让这个损失函数最小化
:param loss:
:param learning_rate:
:param global_step:
:return:
"""
with tf.name_scope("train"):
optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op = optimizer.minimize(loss, global_step=global_step)
return train_op
def create_accuracy(labels, predictions):
"""
基于给定的实际值和预测值,计算准确率
:param labels: 是经过哑编码之后的Tensor对象,形状为[n_samples, n_class]
:param predictions: 实际的预测类别下标,形状为[n_samples,]
:return:
"""
with tf.name_scope("accuracy"):
# 获取实际的类别下标,形状为[n_samples,]
y_labels = tf.argmax(labels, 1)
# 计算准确率
accuracy = tf.reduce_mean(tf.cast(tf.equal(y_labels, predictions), tf.float32))
tf.summary.scalar('accuracy', accuracy)
return accuracy
def train():
# 对于文件是否存在做一个检测
create_dir_with_not_exits(FLAGS.checkpoint_dir)
create_dir_with_not_exits(FLAGS.logdir)
with tf.Graph().as_default():
# 一、执行图的构建
# 0. 相关输入Tensor对象的构建
input_x = tf.placeholder(dtype=tf.float32, shape=[None, 784], name='input_x')
input_y = tf.placeholder(dtype=tf.float32, shape=[None, 10], name='input_y')
global_step = tf.train.get_or_create_global_step()
# 1. 网络结构的构建
logits, predictions = create_model(input_x)
# 2. 构建损失函数
loss = create_loss(input_y, logits)
# 3. 构建优化器
train_op = create_train_op(loss,
learning_rate=FLAGS.learning_rate,
global_step=global_step)
# 4. 构建评估指标
accuracy = create_accuracy(input_y, predictions)
# 二、执行图的运行/训练(数据加载、训练、持久化、可视化、模型的恢复....)
with tf.Session() as sess:
# a. 创建一个持久化对象(默认会将所有的模型参数全部持久化,因为不是所有的都需要的,最好仅仅持久化的训练的模型参数)
var_list = tf.trainable_variables()
# 是因为global_step这个变量是不参与模型训练的,所以模型不会持久化,这里加入之后,可以明确也持久化这个变量。
var_list.append(global_step)
saver = tf.train.Saver(var_list=var_list)
# a. 变量的初始化操作(所有的非训练变量的初始化 + 持久化的变量恢复)
# 所有变量初始化(如果有持久化的,后面做了持久化后,会覆盖的)
sess.run(tf.global_variables_initializer())
# 做模型的恢复操作
ckpt = tf.train.get_checkpoint_state(FLAGS.checkpoint_dir)
if ckpt and ckpt.model_checkpoint_path:
print("进行模型恢复操作...")
# 恢复模型
saver.restore(sess, ckpt.model_checkpoint_path)
# 恢复checkpoint的管理信息
saver.recover_last_checkpoints(ckpt.all_model_checkpoint_paths)
# 获取一个日志输出对象
train_logdir = os.path.join(FLAGS.logdir, 'train')
validation_logdir = os.path.join(FLAGS.logdir, 'validation')
train_writer = tf.summary.FileWriter(logdir=train_logdir, graph=sess.graph)
validation_writer = tf.summary.FileWriter(logdir=validation_logdir, graph=sess.graph)
# 获取所有的summary输出操作
summary = tf.summary.merge_all()
# b. 训练数据的产生/获取(基于numpy随机产生<可以先考虑一个固定的数据集>)
mnist = input_data.read_data_sets(
train_dir='../datas/mnist', # 给定本地磁盘的数据存储路径
one_hot=True, # 给定返回的数据中是否对Y做哑编码
validation_size=5000 # 给定验证数据集的大小
)
# c. 模型训练
batch_size = FLAGS.batch_size
step = sess.run(global_step)
vn_accuracy_ = 0
while True:
# 开始模型训练
x_train, y_train = mnist.train.next_batch(batch_size=batch_size)
_, loss_, accuracy_, summary_ = sess.run([train_op, loss, accuracy, summary], feed_dict={
input_x: x_train,
input_y: y_train
})
print("第{}次训练后模型的损失函数为:{}, 准确率:{}".format(step, loss_, accuracy_))
train_writer.add_summary(summary_, global_step=step)
# 持久化
if step % FLAGS.store_per_batch == 0:
file_name = 'model_%.3f_%.3f_.ckpt' % (loss_, accuracy_)
save_path = os.path.join(FLAGS.checkpoint_dir, file_name)
saver.save(sess, save_path=save_path, global_step=step)
if step % FLAGS.validation_per_batch == 0:
vn_loss_, vn_accuracy_, vn_summary_ = sess.run([loss, accuracy, summary],
feed_dict={
input_x: mnist.validation.images,
input_y: mnist.validation.labels
})
print("第{}次训练后模型在验证数据上的损失函数为:{}, 准确率:{}".format(step,
vn_loss_,
vn_accuracy_))
validation_writer.add_summary(vn_summary_, global_step=step)
# 退出训练(要求当前的训练数据集上的准确率至少为0.8,然后最近一次验证数据上的准确率为0.8)
if accuracy_ > 0.99 and vn_accuracy_ > 0.99:
# 退出之前再做一次持久化操作
file_name = 'model_%.3f_%.3f_.ckpt' % (loss_, accuracy_)
save_path = os.path.join(FLAGS.checkpoint_dir, file_name)
saver.save(sess, save_path=save_path, global_step=step)
break
step += 1
# 关闭输出流
train_writer.close()
validation_writer.close()
def prediction():
# TODO: 参考以前的代码自己把这个区域的内容填充一下。我下周晚上讲。
# 做一个预测(预测的评估,对mnist.test这个里面的数据进行评估效果的查看)
with tf.Graph().as_default():
pass
def main(_):
if FLAGS.is_train:
# 进入训练的代码执行中
print("开始进行模型训练运行.....")
train()
else:
# 进入测试、预测的代码执行中
print("开始进行模型验证、测试代码运行.....")
prediction()
print("Done!!!!")
if __name__ == '__main__':
# 默认情况下,直接调用当前py文件中的main函数
tf.app.run()