在本实验中构造了一系列正弦曲线的变形:
X=np.sin(T, dtype=np.float32)0.01np.random.rand()(T+np.random.rand()10)+np.random.rand()*10
样子的话,大概是:
预测的任务是,已知某条线前N个数(N为400到1600之间的随机数),预测接下来的200个数。也就是我们要处理一个变长的输入,这就需要用到ragedtensor。
这里面的主要知识点可以被概括为:
- Raged tensor的使用。这里采用了from_tensor方法构造出了一个Raged tensor来存储一系列长度不一致的序列。
- 利用karas处理变长序列。由于karas中的LSTM是更高级的API,因此可以直接对Raged tensor进行处理。
- Seq2seq框架。这里采用的是tensorflow_addons中的seq2seq框架,由于此框架本身对文本处理的支持比较多,在应对我们的(相对简单的)实数序列的案例中,反而需要进行一些定制。主要是对sampler的定制。
- Earlystop机制。直接采用karas的earlystop即可。
预测效果:
完整代码
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
T=np.arange(2000)*0.05
X=np.sin(T, dtype=np.float32)*0.01*np.random.rand()*(T+np.random.rand()*10)+np.random.rand()*10
plt.plot(X)
batch_size = 32
max_time = 200
hidden_size = 128
YLEN=max_time
def generateTrain(records=32):
trainXs=[]
trainYs=[]
XLens=[]
YLens=[]
for i in range(records):
X=(np.sin(T)*0.01*np.random.rand()*(T+np.random.rand()*10)+np.random.rand()*10).astype(np.float32)
xi=X[:1800].copy()
sepre=np.random.randint(400,1600)
xi[sepre::]=0
yi=X[sepre:sepre+YLEN].copy()
trainXs.append(np.expand_dims(xi,axis=0))
trainYs.append(np.expand_dims(yi,axis=0))
XLens.append(sepre)
YLens.append(YLEN)
trainX=np.concatenate(trainXs,axis=0)
trainY=np.concatenate(trainYs,axis=0)
seqLen=np.array(XLens)
outLen=np.array(YLens)
return (trainX,trainY,seqLen,outLen)
def generateTest():
X=(np.sin(T)*0.01*np.random.rand()*(T+np.random.rand()*10)+np.random.rand()*10).astype(np.float32)
xi=X[:1800].copy()
yi=X[1800:1800+YLEN].copy()
testX=np.expand_dims(xi, axis=0)
testY=np.expand_dims(yi, axis=0)
seqLen_test=np.array([1800])
outLen_test=np.array([YLEN])
return (testX,testY,seqLen_test,outLen_test)
def gen_raged_train():
while True:
trainX,trainY,seqLen,_ = generateTrain()
raged_train_x = tf.RaggedTensor.from_tensor(trainX, lengths=seqLen)
raged_train_x = tf.expand_dims(raged_train_x, -1)
tensor_train_y = tf.convert_to_tensor(trainY)
yield raged_train_x,tensor_train_y
dataset = tf.data.Dataset.from_generator(
gen_raged_train,
output_signature=(
tf.RaggedTensorSpec(shape=(batch_size, None, 1), dtype=tf.float32, ragged_rank=1),
tf.TensorSpec(shape=(batch_size, 200,), dtype=tf.float32))
)
import tensorflow_addons as tfa
import tensorflow as tf
inputs = tf.keras.layers.Input(shape=[None, 1], ragged=True)
encoding, state_h, state_c = tf.keras.layers.LSTM(hidden_size, return_state=True)(inputs)
encoder_state = [state_h, state_c]
decoder_cell = tf.keras.layers.LSTMCell(hidden_size)
sample_fn = lambda x: x
end_fn = lambda x:False
sampler = tfa.seq2seq.InferenceSampler(sample_fn = sample_fn, sample_shape=[hidden_size],sample_dtype=tf.float32,end_fn=end_fn)
decoder = tfa.seq2seq.BasicDecoder(decoder_cell, sampler, maximum_iterations=200)
input_lengths = tf.fill([batch_size], max_time)
initial_state = decoder_cell.get_initial_state(encoding)
output, state, lengths = decoder(
tf.convert_to_tensor(encoding), initial_state=initial_state)
logits = output.rnn_output
output_layer = tf.keras.layers.Dense(1)
out_seq = tf.squeeze(output_layer(logits))
print(out_seq.shape)
model = tf.keras.Model(inputs=inputs, outputs=out_seq)
model.compile(optimizer="Adam", loss="mse", metrics=["mse", "mae", "mape"])
early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=100)
model.fit(dataset.take(128),validation_data=val_data, epochs=1024,callbacks=[early_stop])
model.save('saved_model/my_model')
plt.figure()
plt.plot(X)
plt.plot([np.nan]*2000+model.predict(np.expand_dims(X,0)).tolist())