本文是对官方文档 的学习笔记。
Keras 支持RNN 的理念:
- 好用: 内建
keras.layers.RNN,keras.layers.LSTM,keras.layers.GRU层,帮助你快速搭建 RNN - 易于定制: 可以迅速定制一个RNN Cell, 并加入到现有架构中, 快速验证idea。
内建RNN Layer : 一个简单的例子
有三种内建 RNN Layer:
-
keras.layers.SimpleRNN, 内部全连接,讲输入传递到输出
-
keras.layers.GRU, first proposed in Cho et al., 2014.keras.layers.LSTM, first proposed in Hochreiter & Schmidhuber, 1997.
这是一个顺序模型的简单示例,该模型处理整数序列,将每个整数嵌入到64维向量中,然后使用LSTM层处理向量序列。
model = keras.Sequential()
# Add an Embedding layer expecting input vocab of size 1000, and
# output embedding dimension of size 64.
model.add(layers.Embedding(input_dim=1000, output_dim=64))
# Add a LSTM layer with 128 internal units.
model.add(layers.LSTM(128))
# Add a Dense layer with 10 units.
model.add(layers.Dense(10))
model.summary()
内建 RNN 支持一些列有用的 feature
- dropout ,and recurrent_dropout
- 以倒序的方式处理输入 go_backwards
- unroll, 可以在CPU 上大大加速对短输入的训练
更多详情, RNN API documentation.
输出和状态
一般来说, RNN 会在接受整个序列后,输出一个 Vector (多对一), 输出Vector
维度在函数中只定。 当然,也可以通过设置 return_sequences=True 是的 RNN 对每一个输入的Sample (序列中每个元素)输出一个Vector(多对多)。 比如多层LSTM, 中间层LSTM 就会输出序列。
model = keras.Sequential()
model.add(layers.Embedding(input_dim=1000, output_dim=64))
# The output of GRU will be a 3D tensor of shape (batch_size, timesteps, 256)
model.add(layers.GRU(256, return_sequences=True))
# The output of SimpleRNN will be a 2D tensor of shape (batch_size, 128)
model.add(layers.SimpleRNN(128))
model.add(layers.Dense(10))
model.summary()
另外,RNN层可以返回其最终内部状态。返回的状态可用于稍后恢复RNN执行,或初始化另一个RNN。此设置通常在编码器-解码器序列到序列模型中使用,其中编码器的最终状态用作解码器的初始状态。
要将RNN图层配置为返回其内部状态,请在创建图层时将return_state参数设置为True。请注意,LSTM具有2个状态 Tensor ,但GRU仅具有1个。
要配置图层的初始状态,只需使用其他关键字参数initial_state调用图层即可。请注意,状态的形状需要与图层的单位大小匹配,如以下示例所示。
encoder_vocab = 1000
decoder_vocab = 2000
encoder_input = layers.Input(shape=(None,))
encoder_embedded = layers.Embedding(input_dim=encoder_vocab, output_dim=64)(
encoder_input
)
# Return states in addition to output
output, state_h, state_c = layers.LSTM(64, return_state=True, name="encoder")(
encoder_embedded
)
encoder_state = [state_h, state_c]
decoder_input = layers.Input(shape=(None,))
decoder_embedded = layers.Embedding(input_dim=decoder_vocab, output_dim=64)(
decoder_input
)
# Pass the 2 states to a new LSTM layer, as initial state
decoder_output = layers.LSTM(64, name="decoder")(
decoder_embedded, initial_state=encoder_state
)
output = layers.Dense(10)(decoder_output)
model = keras.Model([encoder_input, decoder_input], output)
model.summary()
RNN Layer and RNN cells
对应 RNN Layer Tensorflow 2 还提供 RNN Cells。 与 Layer 每次处理一个序列不同, 每个 RNN Cell 每次只能处理一个 Timestamp。 RNN Cell 的意义在于提供给开发者供开发者自己组装 RNN Layer ,一般用于研究。
内置的 RNN Cell
keras.layers.SimpleRNNCellcorresponds to theSimpleRNNlayer.keras.layers.GRUCellcorresponds to theGRUlayer.keras.layers.LSTMCellcorresponds to theLSTMlayer.
跨批次状态
跨批次状态 (cross-batch statefulness) 会用在处理非常长的sequnce(甚至是无限长)。
RNN 会认为每个 Sample 都是独立的, 所以在每次 Batch 结束后都会重置状态。 但是如果处理非常长的序列的时候, 有时候需要先把长序列给分成一组一组的短序列,然后再把短序列送给 RNN处理。 如果每次batch 都清空状态的话, 就无法实现用短序列拼接出来一个长序列的目的了。
如果想让 RNN在 batch 以后不重置状态, 可以设置 stateful=True 。
如果有个序列 s = [t0, t1, ... t1546, t1547], 可以将其分为
s1 = [t0, t1, ... t100]
s2 = [t101, ... t201]
...
s16 = [t1501, ... t1547]
处理的时候
lstm_layer = layers.LSTM(64, stateful=True)
for s in sub_sequences:
output = lstm_layer(s)
完整的例子
paragraph1 = np.random.random((20, 10, 50)).astype(np.float32)
paragraph2 = np.random.random((20, 10, 50)).astype(np.float32)
paragraph3 = np.random.random((20, 10, 50)).astype(np.float32)
lstm_layer = layers.LSTM(64, stateful=True)
output = lstm_layer(paragraph1)
output = lstm_layer(paragraph2)
output = lstm_layer(paragraph3)
# reset_states() will reset the cached state to the original initial_state.
# If no initial_state was provided, zero-states will be used by default.
lstm_layer.reset_states()
RNN 状态复用
如果想获取 RNN 层的状态, 并将其用在其他层中, 则需要从 layer.states 获取状态。(不在layer.weights() 中 )。 设置初始化状态使用 new_layer(inputs, initial_state=layer.states)
paragraph1 = np.random.random((20, 10, 50)).astype(np.float32)
paragraph2 = np.random.random((20, 10, 50)).astype(np.float32)
paragraph3 = np.random.random((20, 10, 50)).astype(np.float32)
lstm_layer = layers.LSTM(64, stateful=True)
output = lstm_layer(paragraph1)
output = lstm_layer(paragraph2)
existing_state = lstm_layer.states
new_lstm_layer = layers.LSTM(64)
new_output = new_lstm_layer(paragraph3, initial_state=existing_state)
双向 RNN
某些序列,比如文本, 双向处理可以带来更好的效果。 Keras 提供 keras.layers.Bidirectional
用来构建双向 RNN 网络。
model = keras.Sequential()
model.add(
layers.Bidirectional(layers.LSTM(64, return_sequences=True), input_shape=(5, 10))
)
model.add(layers.Bidirectional(layers.LSTM(32)))
model.add(layers.Dense(10))
model.summary()
关于双向 RNN , 更多信息在 the API docs.
性能优化与 CuDNN 核
TF2.0 中 当发现有GPU的时候 LSTM , GRU 会自动使用CuDNN . 但是如果改了他们的默认设置, 他们可能不会自动使用 CuDNN 。 例如:
- 把 activation 从 tanh 修改成其他函数
- 把 recurrent_activation 从 sigmoid 换成函数
- recurrent_dropout > 0
- unroll = TRUE
- use_bias = False
- 当输入数据未严格右填充时使用屏蔽(如果掩码对应于严格右填充数据,则仍可以使用CuDNN。这是最常见的情况)。
尽可能使用 CuDNN
建立一个简单的LSTM模型来演示性能差异。将MNIST数字的行序列(作为时间步长处理每一行像素)用作输入序列,并预测该数字的标签。
batch_size = 64
# Each MNIST image batch is a tensor of shape (batch_size, 28, 28).
# Each input sequence will be of size (28, 28) (height is treated like time).
input_dim = 28
units = 64
output_size = 10 # labels are from 0 to 9
# Build the RNN model
def build_model(allow_cudnn_kernel=True):
# CuDNN is only available at the layer level, and not at the cell level.
# This means `LSTM(units)` will use the CuDNN kernel,
# while RNN(LSTMCell(units)) will run on non-CuDNN kernel.
if allow_cudnn_kernel:
# The LSTM layer with default options uses CuDNN.
lstm_layer = keras.layers.LSTM(units, input_shape=(None, input_dim))
else:
# Wrapping a LSTMCell in a RNN layer will not use CuDNN.
lstm_layer = keras.layers.RNN(
keras.layers.LSTMCell(units), input_shape=(None, input_dim)
)
model = keras.models.Sequential(
[
lstm_layer,
keras.layers.BatchNormalization(),
keras.layers.Dense(output_size),
]
)
return model
mnist = keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
sample, sample_label = x_train[0], y_train[0]
model = build_model(allow_cudnn_kernel=True)
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer="sgd",
metrics=["accuracy"],
)
model.fit(
x_train, y_train, validation_data=(x_test, y_test), batch_size=batch_size, epochs=1
)
可以将其和如下不用 CuDNN的比较一下性能
noncudnn_model = build_model(allow_cudnn_kernel=False)
noncudnn_model.set_weights(model.get_weights())
noncudnn_model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer="sgd",
metrics=["accuracy"],
)
noncudnn_model.fit(
x_train, y_train, validation_data=(x_test, y_test), batch_size=batch_size, epochs=1
)
在安装了NVIDIA GPU和CuDNN的计算机上运行时,与使用常规TensorFlow内核的模型相比,使用CuDNN构建的模型的训练速度要快得多。
相同的支持CuDNN的模型也可以用于在仅CPU的环境中运行。下面的tf.device注释只是强制放置设备。如果没有可用的GPU,默认情况下该模型将在CPU上运行。
import matplotlib.pyplot as plt
with tf.device("CPU:0"):
cpu_model = build_model(allow_cudnn_kernel=True)
cpu_model.set_weights(model.get_weights())
result = tf.argmax(cpu_model.predict_on_batch(tf.expand_dims(sample, 0)), axis=1)
print(
"Predicted result is: %s, target result is: %s" % (result.numpy(), sample_label)
)
plt.imshow(sample, cmap=plt.get_cmap("gray"))
将 list/dict 作为RNN的输入(或嵌套输入)
嵌套结构允许实施者在单个时间步之内包括更多信息。例如,一个视频帧可以同时具有音频和视频输入。在这种情况下,数据形状可能是:
[batch, timestep, {"video": [height, width, channel], "audio": [frequency]}]
在另一个示例中,笔迹数据可以具有笔的当前位置的坐标x和y以及压力信息。因此数据表示可以是:
[batch, timestep, {"location": [x, y], "pressure": [force]}]
以下代码提供了一个示例,说明如何构建接受此类结构化输入的自定义RNN单元。
定制化Cell 以支持嵌套输入
class NestedCell(keras.layers.Layer):
def __init__(self, unit_1, unit_2, unit_3, **kwargs):
self.unit_1 = unit_1
self.unit_2 = unit_2
self.unit_3 = unit_3
self.state_size = [tf.TensorShape([unit_1]), tf.TensorShape([unit_2, unit_3])]
self.output_size = [tf.TensorShape([unit_1]), tf.TensorShape([unit_2, unit_3])]
super(NestedCell, self).__init__(**kwargs)
def build(self, input_shapes):
# expect input_shape to contain 2 items, [(batch, i1), (batch, i2, i3)]
i1 = input_shapes[0][1]
i2 = input_shapes[1][1]
i3 = input_shapes[1][2]
self.kernel_1 = self.add_weight(
shape=(i1, self.unit_1), initializer="uniform", name="kernel_1"
)
self.kernel_2_3 = self.add_weight(
shape=(i2, i3, self.unit_2, self.unit_3),
initializer="uniform",
name="kernel_2_3",
)
def call(self, inputs, states):
# inputs should be in [(batch, input_1), (batch, input_2, input_3)]
# state should be in shape [(batch, unit_1), (batch, unit_2, unit_3)]
input_1, input_2 = tf.nest.flatten(inputs)
s1, s2 = states
output_1 = tf.matmul(input_1, self.kernel_1)
output_2_3 = tf.einsum("bij,ijkl->bkl", input_2, self.kernel_2_3)
state_1 = s1 + output_1
state_2_3 = s2 + output_2_3
output = (output_1, output_2_3)
new_states = (state_1, state_2_3)
return output, new_states
def get_config(self):
return {"unit_1": self.unit_1, "unit_2": unit_2, "unit_3": self.unit_3}
RNN Model 以支持嵌套输入/输出
unit_1 = 10
unit_2 = 20
unit_3 = 30
i1 = 32
i2 = 64
i3 = 32
batch_size = 64
num_batches = 10
timestep = 50
cell = NestedCell(unit_1, unit_2, unit_3)
rnn = keras.layers.RNN(cell)
input_1 = keras.Input((None, i1))
input_2 = keras.Input((None, i2, i3))
outputs = rnn((input_1, input_2))
model = keras.models.Model([input_1, input_2], outputs)
model.compile(optimizer="adam", loss="mse", metrics=["accuracy"])
用随机生成数据进行训练
input_1_data = np.random.random((batch_size * num_batches, timestep, i1))
input_2_data = np.random.random((batch_size * num_batches, timestep, i2, i3))
target_1_data = np.random.random((batch_size * num_batches, unit_1))
target_2_data = np.random.random((batch_size * num_batches, unit_2, unit_3))
input_data = [input_1_data, input_2_data]
target_data = [target_1_data, target_2_data]
model.fit(input_data, target_data, batch_size=batch_size)