运行环境
python3.6.7、tensorflow1.4.0
思路
这里用numpy对cos函数进行间隔采样,取10000个点作为训练数据,再接着取1000个点作为测试数据。对于取出的数据,放入一个数组中,即一个离散的cos函数的序列,对于这个序列,取对应的点作为输出y,再对应地取点前面的10个点作为此点对应的输入X,即在此时刻根据这10个点X来预测对应的一个点y,自定义一个函数generate_data来完成从序列中生成输入数据X和输出数据y的功能。
这里的模型采用了一个两层LSTM模型,隐藏结点设为30个,最后在LSTM模型后设定了一个全连接层,输出预测结果。训练次数设定为5000次,训练开始前根据初始的参数观察模型在测试集上的预测结果,经过5000次训练后再观察模型在测试集上的预测结果,进行对比。
源代码
# -*- coding: UTF-8 -*-
#Author:Yinli
import numpy as np
import tensorflow as tf
from matplotlib import pyplot as plt
# LSTM中隐藏结点的个数和层数
HIDDEN_SIZE = 30
NUM_LAYERS = 2
# 训练序列长度和训练轮数以及batch大小
TIMESTEPS = 10
TRAINING_STEPS = 5000
BTACH_SIZE = 32
# 训练数据和测试数据个数以及采样间隔
TRAINING_EXAMPLES = 10000
TESTING_EXAMPLES = 1000
SAMPLE_GAP = 0.01
# 从给定的序列中抽取输入数据和输出数据
def generate_data(seq):
x = []
y = []
# 每次取从i开始包括i的10项作为输入,第i+10项作为输出
for i in range(len(seq) - TIMESTEPS):
x.append([seq[i:i+TIMESTEPS]])
y.append([seq[i+TIMESTEPS]])
return np.array(x,dtype=np.float32), np.array(y,dtype=np.float32)
# LSTM模型
def lstm_model(X,y,is_training):
# 定义多层LSTM结构
cell = tf.nn.rnn_cell.MultiRNNCell([
tf.nn.rnn_cell.BasicLSTMCell(HIDDEN_SIZE) for _ in range(NUM_LAYERS)
])
# 将LSTM结构连接成RNN网络计算前向传播结果
outputs, _ = tf.nn.dynamic_rnn(cell, X, dtype=tf.float32)
# 因为结果中包括了每一步的输出结果,这里只关心最后一步的输出结果
output = outputs[:,-1,:]
# 结果传入一个全连接层,得到预测结果
predictions = tf.contrib.layers.fully_connected(output, 1, activation_fn=None)
# 如果不是训练过程,则直接返回预测结果
if not is_training:
return predictions, None, None
# 计算损失
loss = tf.losses.mean_squared_error(labels=y, predictions=predictions)
# 定义训练步骤
train_op = tf.contrib.layers.optimize_loss(
loss, tf.train.get_global_step(),
optimizer='Adagrad', learning_rate=0.1
)
# 返回预测结果、损失和训练步骤
return predictions, loss, train_op
# 评估训练结果
def run_eval(sess, test_X, test_y):
# 将测试数据转化为数据集的形式
ds = tf.data.Dataset.from_tensor_slices((test_X,test_y))
ds = ds.batch(1)
X,y = ds.make_one_shot_iterator().get_next()
# 复用训练的参数
with tf.variable_scope("model",reuse=True):
prediction, _, _ = lstm_model(X,[0.0],False)
predictions = []
labels = []
# 将每次测试的结果存到数组中
for i in range(TESTING_EXAMPLES):
p, l = sess.run([prediction,y])
predictions.append(p)
labels.append(l)
# 计算均方差
predictions = np.array(predictions).squeeze()
labels = np.array(labels).squeeze()
rmse = np.sqrt(((predictions-labels) ** 2).mean(axis=0))
print("Mean Square Error is: %f" % rmse)
# 将结果绘制出来并显示
plt.figure()
plt.subplot(211)
plt.ylim(-1,1)
plt.plot(predictions)
plt.ylabel('predictions')
plt.subplot(212)
plt.ylim(-1,1)
plt.plot(labels,'g')
plt.ylabel('real_sin')
plt.show()
def main():
# 计算测试数据的起始和终点
test_start = (TRAINING_EXAMPLES + TIMESTEPS) * SAMPLE_GAP
test_end = test_start + (TESTING_EXAMPLES + TIMESTEPS) * SAMPLE_GAP
# 从0到测试数据起点生成一个序列用来生成训练数据
train_X, train_y = generate_data(
np.cos(np.linspace(0, test_start, TRAINING_EXAMPLES + TIMESTEPS, dtype=np.float32)))
# 从测试数据起点到终点生成一个序列用来生成测试数据
test_X, test_y = generate_data(
np.cos(np.linspace(test_start, test_end, TESTING_EXAMPLES + TIMESTEPS, dtype=np.float32)))
# 将训练数据转化为数据集的形式
ds = tf.data.Dataset.from_tensor_slices((train_X, train_y))
ds = ds.repeat().shuffle(1000).batch(BTACH_SIZE)
X, y = ds.make_one_shot_iterator().get_next()
# 调用模型
with tf.variable_scope("model"):
_, loss, train_op = lstm_model(X, y, True)
with tf.Session() as sess:
# 初始化参数
sess.run(tf.global_variables_initializer())
# 训练前查看模型在测试数据上的表现
run_eval(sess, test_X, test_y)
# 循环训练
for i in range(TRAINING_STEPS):
_, l = sess.run([train_op, loss])
if i % 1000 == 0:
print("训练轮数:", str(i), ",此时损失为:", str(l))
# 训练结束后查看模型在测试数据上的表现
run_eval(sess, test_X, test_y)
if __name__ == '__main__':
main()
运行结果
结果分析
从输出的图像上可以看到,没有训练之前,测试集的数据经过模型输出的结果均在初始值0.0附近,和正确的cos函数图像丝毫不吻合。在经过5000次训练之后,可以看到测试集经过模型输出的预测结果和正确结果之间的均方差大有下降,在图上可视化的结果也可以看到预测值和正确值的图像高度吻合。