运行环境
python3.6.7、tensorflow1.4.0
数据来源
https://archive.ics.uci.edu/ml/datasets/semeion+handwritten+digit
思路
数据分析
从数据及其说明可以知道,其原始数据共有1593组,每组数据都代表一张手写数字的图片,经过处理后化为256个像素,每个像素取值低于127设为0,高于127设为1,每组数据包含256个0-1特征和10个0-1型one-hot编码,代表了此数字的实际取值。在这里取前1195组数据作为训练集,用后面398组数据进行测试,后398组数据中0~9都有,且分布均匀,可以认为能很好地测试神经网络的分类效果。
模型构建
这里采用含有一个隐含层的神经网络,此隐含层有500个结点,采用ReLU激活函数,输入节点为256个,代表256个像素,输出节点为10个,分别代表0~9的10个数字。损失函数采用交叉熵,为了避免过拟合,引入L2正则化函数,总损失为交叉熵和正则化损失的和,另外为了迭代学习的平缓,采用了指数衰减的学习率进行学习,为了使模型更健壮,采用了滑动平均模型。模型的训练采用了基于batch的随机梯度下降算法,每个batch的数量为100。
实现细节
代码实现总共有四个文件,inference.py、load_data.py、train.py、eval.py。
inference.py是一个工具实现,用来封装神经网络的前向传播过程,可以选择是否提供正则化函数,因为在训练的时候需要提供正则化函数来避免过拟合,而测试验证数据的时候则不需要提供正则化函数,输入要预测的数据的特征张量即可输出预测结果。
load_data.py也是一个工具,是用来从文件中读取数据并将其处理并返回的,返回tr变量包括训练特征、训练标签、测试特征和测试标签四个矩阵。
train.py是用来训练神经网络的,在训练过程中,每10轮训练输出一次当前的损失值,并将当前模型保存至文件,共训练500次。
eval.py是用来测试神经网络的,和训练程序同步运行,每0.2秒读取存在文件中的最新模型,并将此模型用在测试集上,打印输出准确率,持续监视当前的新模型。
源代码
inference.py
# -*- coding: UTF-8 -*-
#Author:Yinli
import tensorflow as tf
'''
这是一个工具,用来封装神经网络的前向传播过程,可以选择是否采用正则化函数
只需传入输入的张量,即可输出最终预测值
因为这里只有一个程序调这个工具,所以命名空间不用声明reuse=true
'''
#定义相关参数,输入节点共784,因为一张图共784个像素点
#输出节点10个,因为0~9共10个数字,中间隐藏层设为500个结点
INPUT_NODE = 256
OUTPUT_NODE = 10
LAYER1_NODE = 500
#在每一层里面都有个weights通过命名空间的不同来达到解耦和的效果,
def get_weight_variable(shape, regularizer):
#取名字为weights的变量记为weights
weights = tf.get_variable("weights", shape, initializer=tf.truncated_normal_initializer(stddev=0.1))
#如果给出了正则化函数,那么将当前的变量正则化之后的损失加入名为losses的集合
if regularizer != None:
tf.add_to_collection("losses", regularizer(weights))
return weights
#定义神经网络的前向传播过程
def inference(input_tensor, regularizer):
#首先在输出层到隐藏层的传播中定义命名空间为layer1
with tf.variable_scope('layer1'):
#给定权重矩阵的形状还有正则化函数,把weights取出来
weights = get_weight_variable([INPUT_NODE, LAYER1_NODE], regularizer)
#初始化偏置项,命名为biases
biases = tf.get_variable("biases", [LAYER1_NODE], initializer=tf.constant_initializer(0.0))
#计算隐藏层的值,用到了激活函数
layer1 = tf.nn.relu(tf.matmul(input_tensor, weights) + biases)
#然后在从隐藏层到输出层的传播中定义命名空间为layer2
with tf.variable_scope('layer2'):
#给定权重矩阵的形状还有正则化函数,把weights取出来
weights = get_weight_variable([LAYER1_NODE, OUTPUT_NODE], regularizer)
#创建初始化偏置项,命名为biases
biases = tf.get_variable("biases", [OUTPUT_NODE], initializer=tf.constant_initializer(0.0))
#计算输出层的值,这里不用激活函数
layer2 = tf.matmul(layer1, weights) + biases
#返回输出层的值
return layer2
load_data.py
# -*- coding: UTF-8 -*-
#Author:Yinli
import numpy as np
import inference
'''
一个工具,用来从文件中读取数据并将其处理并返回结果
返回变量包括训练特征、训练标签、测试特征和测试标签四个矩阵
'''
#定义训练数据和测试数据的数量
NUM_OF_TRAINING = 1195
NUM_OF_TESTING = 1593-1195
def load_data():
#从文件中逐行读取数据
filename = "D:/tensorflow/code/第10次作业/data/semeion.data"
fr = open(filename, 'r', encoding='utf-8')
arrayOfLines = fr.readlines()
#初始化训练特征矩阵,训练标签矩阵,测试特征矩阵和测试标签矩阵
trainingImages = np.zeros([NUM_OF_TRAINING, inference.INPUT_NODE])
trainingLabels = np.zeros([NUM_OF_TRAINING, inference.OUTPUT_NODE])
testingImages = np.zeros([NUM_OF_TESTING, inference.INPUT_NODE])
testingLabels = np.zeros([NUM_OF_TESTING, inference.OUTPUT_NODE])
#处理用于训练的数据
for i in range(NUM_OF_TRAINING):
#去掉每一行末尾的空格和回车
arrayOfLines[i] = arrayOfLines[i].rstrip(' \n')
#将一行数据以空格分开存入列表
currentLine = arrayOfLines[i].split(' ')
#将特征值存入特征矩阵
trainingImages[i][:] = currentLine[:256]
#将标签值存入标签矩阵
trainingLabels[i][:] = currentLine[256:]
#处理用于测试的数据
for i in range(NUM_OF_TESTING):
#去掉每一行末尾的空格和回车
arrayOfLines[i+NUM_OF_TRAINING] = arrayOfLines[i+NUM_OF_TRAINING].rstrip(' \n')
#将一行数据以空格分开存入列表
currentLine = arrayOfLines[i+NUM_OF_TRAINING].split(' ')
#将特征值存入特征矩阵
testingImages[i][:] = currentLine[:256]
#将标签值存入标签矩阵
testingLabels[i][:] = currentLine[256:]
#返回处理后的矩阵
return trainingImages,trainingLabels,testingImages,testingLabels
train.py
# -*- coding: UTF-8 -*-
#Author:Yinli
import os
import tensorflow as tf
import inference
import load_data
'''
训练神经网络的程序,每10轮训练输出一次当前的损失值
并将当前模型保存至文件,共训练500次
'''
#每批训练的数据数
BATCH_SIZE = 100
#定义初始学习率为0.8
LEARNING_RATE_BASE = 0.5
#定义学习率的衰减率为0.99
LEARNING_RATE_DECAY = 0.99
#定义正则化参数为0.0001
REGULARATION_RATE = 0.0001
#定义训练次数为500
TRAINING_STEPS = 500
#定义滑动平均率为0.99
MOVING_AVERAGE_DECAY = 0.99
#定义模型存储路径和文件名
MODEL_SAVE_PATH = "model/"
MODEL_NAME = "model.ckpt"
#定义训练函数
def train():
#先定义输入输出的placeholder
x = tf.placeholder(tf.float32, [None, inference.INPUT_NODE], name='x-input')
y_ = tf.placeholder(tf.float32, [None, inference.OUTPUT_NODE], name='y-input')
#定义正则化函数为l2正则
regularizer = tf.contrib.layers.l2_regularizer(REGULARATION_RATE)
#用工具计算预测结果值
y = inference.inference(x,regularizer)
#初始化全局步数,设定其为不可训练的参数
global_step = tf.Variable(0,trainable=False)
#初始化滑动平均类,给定滑动平均速率和训练轮数的变量
variable_average = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, global_step)
#定义滑动平均的操作,在所有代表神经网络参数的变量上进行滑动平均,除了那些定义trainable=False的变量
#tf.trainable_variables()返回的就是整张计算图中所有没有定义trainable=False的变量
variable_average_op = variable_average.apply(tf.trainable_variables())
#定义交叉熵,sparse_cross_entropy_with_logits是一个tensorflow提供的函数
#logits参数指的是神经网络的前向传播结果,不包含softmax层
#labels参数指的是正确结果,因为传入参数是个长度为10的一维数组,所有要指定argmax函数返回按行比较的最大值索引标号
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=tf.argmax(y_,1))
#用reduce_mean计算当前batch中所有样例的交叉熵平均值
cross_entropy_mean = tf.reduce_mean(cross_entropy)
#总损失等于交叉熵损失加上正则化损失的和,add_n实现的是一个集合元素的相加,这个集合在工具包里实现过
loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
#设置指数衰减的学习率,先指定基础学习率,再指定当前迭代的轮数
#再指定过完所有训练数据需要迭代的次数为每次迭代的间隔
#最后指定学习率衰减速度
learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE, global_step,
load_data.NUM_OF_TRAINING/BATCH_SIZE, LEARNING_RATE_DECAY)
#使用梯度下降算法优化损失函数loss,这里损失函数包括了交叉熵和正则化损失,学习率是指数衰减的
train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss,global_step=global_step)
#因为每过一遍数据都要更新神经网络的参数,还要更新参数的滑动平均值
#所以把两个操作合起来,下面两行程序相当于
#train_op = tf.group(train_step, variables_average_op)
with tf.control_dependencies([train_step,variable_average_op]):
train_op = tf.no_op(name='train')
#初始化tensorflow持久化类
saver = tf.train.Saver()
#创建一个默认会话
with tf.Session() as sess:
#初始化所有参数
tf.global_variables_initializer().run()
trainingImages, trainingLabels, testingImages, testingLabels = load_data.load_data()
#循环训练
for i in range(TRAINING_STEPS):
#定义这一轮训练使用的一个batch的数据的起点和终点
start = (i*BATCH_SIZE) % load_data.NUM_OF_TRAINING
end = min(start+BATCH_SIZE, load_data.NUM_OF_TRAINING)
#run返回三个结果,一个是训练步骤的返回结果,后续没有用,用_记录
#另外返回训练后的损失值和当前迭代步数
_, loss_value, step = sess.run([train_op, loss, global_step],
feed_dict={x:trainingImages[start:end], y_:trainingLabels[start:end]})
#每10轮训练输出一次当前的损失值,并将当前模型保存
#这里指定了当前的轮数,这样每个保存文件后缀会加上当前轮数
if i%10 == 0:
print("经过%d次训练之后,损失为%g" % (step, loss_value))
saver.save(sess,os.path.join(MODEL_SAVE_PATH, MODEL_NAME),global_step=global_step)
#主函数
def main(argv=None):
train()
#入口
if __name__ == '__main__':
tf.app.run()
eval.py
# -*- coding: UTF-8 -*-
#Author:Yinli
import time
import tensorflow as tf
import inference
import train
import load_data
'''
测试函数,每0.2秒读取存在文件中的最新模型
并将此模型用在测试集上,打印输出准确率
'''
#定义每1秒钟加载一次最新的模型,并在测试数据集上测试最新模型的正确率
EVAL_INTERVAL_SECS = 0.2
def evaluate():
with tf.Graph().as_default() as g:
#先定义好输入输出的格式
x = tf.placeholder(tf.float32, [None, inference.INPUT_NODE], name='x-input')
y_ = tf.placeholder(tf.float32, [None, inference.OUTPUT_NODE], name='y-input')
trainingImages, trainingLabels, testingImages, testingLabels = load_data.load_data()
#定义好输入输出的数据集
validate_feed = {x:testingImages, y_:testingLabels}
#计算前向传播的结果,因为测试的时候用不着正则化,所有将正则化函数设为None
y = inference.inference(x,None)
#计算正确率
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction,tf.float32))
#通过变量重命名的方式来加载模型
variable_averages = tf.train.ExponentialMovingAverage(train.MOVING_AVERAGE_DECAY)
variables_to_restore = variable_averages.variables_to_restore()
saver = tf.train.Saver(variables_to_restore)
while True:
with tf.Session() as sess:
#get_checkpoint_state会通过checkpoint自动查找指定目录里的最新模型的文件名
ckpt = tf.train.get_checkpoint_state(train.MODEL_SAVE_PATH)
#如果有模型存在
if ckpt and ckpt.model_checkpoint_path:
#加载模型
saver.restore(sess, ckpt.model_checkpoint_path)
#把此模型保存的迭代的轮数分离出来
global_step = ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1]
#计算此模型在测试集上的准确率,并打印输出
accuracy_score = sess.run(accuracy, feed_dict=validate_feed)
print("经过%s次训练后,测试集正确率为%g" % (global_step, accuracy_score))
#没有保存的模型则返回
else:
print("No checkpoint file found")
return
#线程挂起0.2秒钟
time.sleep(EVAL_INTERVAL_SECS)
#主函数
def main(argv=None):
evaluate()
#入口
if __name__ == '__main__':
tf.app.run()
运行结果
结果分析
从结果中可以看得到,经过训练,总损失逐渐变小,到100次训练之后,总损失维持在0.07的水平,模型趋近收敛。而在100次训练后的模型在测试集上的表现也甚佳,可以稳定在90%的正确率,在最终经过了约500次训练之后,模型在测试集上的最终正确率为91.2%,可以认为此模型对这个semeion手写数字数据集的识别效果很好。