本文是对官方文档 的学习笔记。
Keras 支持RNN 的理念:
- 好用: 内建
keras.layers.RNN
,keras.layers.LSTM
,keras.layers.GRU
层,帮助你快速搭建 RNN - 易于定制: 可以迅速定制一个RNN Cell, 并加入到现有架构中, 快速验证idea。
内建RNN Layer : 一个简单的例子
有三种内建 RNN Layer:
-
keras.layers.SimpleRNN
, 内部全连接,讲输入传递到输出
-
keras.layers.GRU
, first proposed in Cho et al., 2014.keras.layers.LSTM
, first proposed in Hochreiter & Schmidhuber, 1997.
这是一个顺序模型的简单示例,该模型处理整数序列,将每个整数嵌入到64维向量中,然后使用LSTM层处理向量序列。
model = keras.Sequential()
# Add an Embedding layer expecting input vocab of size 1000, and
# output embedding dimension of size 64.
model.add(layers.Embedding(input_dim=1000, output_dim=64))
# Add a LSTM layer with 128 internal units.
model.add(layers.LSTM(128))
# Add a Dense layer with 10 units.
model.add(layers.Dense(10))
model.summary()
内建 RNN 支持一些列有用的 feature
- dropout ,and recurrent_dropout
- 以倒序的方式处理输入 go_backwards
- unroll, 可以在CPU 上大大加速对短输入的训练
更多详情, RNN API documentation.
输出和状态
一般来说, RNN 会在接受整个序列后,输出一个 Vector (多对一), 输出Vector
维度在函数中只定。 当然,也可以通过设置 return_sequences=True 是的 RNN 对每一个输入的Sample (序列中每个元素)输出一个Vector(多对多)。 比如多层LSTM, 中间层LSTM 就会输出序列。
model = keras.Sequential()
model.add(layers.Embedding(input_dim=1000, output_dim=64))
# The output of GRU will be a 3D tensor of shape (batch_size, timesteps, 256)
model.add(layers.GRU(256, return_sequences=True))
# The output of SimpleRNN will be a 2D tensor of shape (batch_size, 128)
model.add(layers.SimpleRNN(128))
model.add(layers.Dense(10))
model.summary()
另外,RNN层可以返回其最终内部状态。返回的状态可用于稍后恢复RNN执行,或初始化另一个RNN。此设置通常在编码器-解码器序列到序列模型中使用,其中编码器的最终状态用作解码器的初始状态。
要将RNN图层配置为返回其内部状态,请在创建图层时将return_state参数设置为True。请注意,LSTM具有2个状态 Tensor ,但GRU仅具有1个。
要配置图层的初始状态,只需使用其他关键字参数initial_state调用图层即可。请注意,状态的形状需要与图层的单位大小匹配,如以下示例所示。
encoder_vocab = 1000
decoder_vocab = 2000
encoder_input = layers.Input(shape=(None,))
encoder_embedded = layers.Embedding(input_dim=encoder_vocab, output_dim=64)(
encoder_input
)
# Return states in addition to output
output, state_h, state_c = layers.LSTM(64, return_state=True, name="encoder")(
encoder_embedded
)
encoder_state = [state_h, state_c]
decoder_input = layers.Input(shape=(None,))
decoder_embedded = layers.Embedding(input_dim=decoder_vocab, output_dim=64)(
decoder_input
)
# Pass the 2 states to a new LSTM layer, as initial state
decoder_output = layers.LSTM(64, name="decoder")(
decoder_embedded, initial_state=encoder_state
)
output = layers.Dense(10)(decoder_output)
model = keras.Model([encoder_input, decoder_input], output)
model.summary()
RNN Layer and RNN cells
对应 RNN Layer Tensorflow 2 还提供 RNN Cells。 与 Layer 每次处理一个序列不同, 每个 RNN Cell 每次只能处理一个 Timestamp。 RNN Cell 的意义在于提供给开发者供开发者自己组装 RNN Layer ,一般用于研究。
内置的 RNN Cell
keras.layers.SimpleRNNCell
corresponds to theSimpleRNN
layer.keras.layers.GRUCell
corresponds to theGRU
layer.keras.layers.LSTMCell
corresponds to theLSTM
layer.
跨批次状态
跨批次状态 (cross-batch statefulness) 会用在处理非常长的sequnce(甚至是无限长)。
RNN 会认为每个 Sample 都是独立的, 所以在每次 Batch 结束后都会重置状态。 但是如果处理非常长的序列的时候, 有时候需要先把长序列给分成一组一组的短序列,然后再把短序列送给 RNN处理。 如果每次batch 都清空状态的话, 就无法实现用短序列拼接出来一个长序列的目的了。
如果想让 RNN在 batch 以后不重置状态, 可以设置 stateful=True
。
如果有个序列 s = [t0, t1, ... t1546, t1547], 可以将其分为
s1 = [t0, t1, ... t100]
s2 = [t101, ... t201]
...
s16 = [t1501, ... t1547]
处理的时候
lstm_layer = layers.LSTM(64, stateful=True)
for s in sub_sequences:
output = lstm_layer(s)
完整的例子
paragraph1 = np.random.random((20, 10, 50)).astype(np.float32)
paragraph2 = np.random.random((20, 10, 50)).astype(np.float32)
paragraph3 = np.random.random((20, 10, 50)).astype(np.float32)
lstm_layer = layers.LSTM(64, stateful=True)
output = lstm_layer(paragraph1)
output = lstm_layer(paragraph2)
output = lstm_layer(paragraph3)
# reset_states() will reset the cached state to the original initial_state.
# If no initial_state was provided, zero-states will be used by default.
lstm_layer.reset_states()
RNN 状态复用
如果想获取 RNN 层的状态, 并将其用在其他层中, 则需要从 layer.states 获取状态。(不在layer.weights() 中 )。 设置初始化状态使用 new_layer(inputs, initial_state=layer.states)
paragraph1 = np.random.random((20, 10, 50)).astype(np.float32)
paragraph2 = np.random.random((20, 10, 50)).astype(np.float32)
paragraph3 = np.random.random((20, 10, 50)).astype(np.float32)
lstm_layer = layers.LSTM(64, stateful=True)
output = lstm_layer(paragraph1)
output = lstm_layer(paragraph2)
existing_state = lstm_layer.states
new_lstm_layer = layers.LSTM(64)
new_output = new_lstm_layer(paragraph3, initial_state=existing_state)
双向 RNN
某些序列,比如文本, 双向处理可以带来更好的效果。 Keras 提供 keras.layers.Bidirectional
用来构建双向 RNN 网络。
model = keras.Sequential()
model.add(
layers.Bidirectional(layers.LSTM(64, return_sequences=True), input_shape=(5, 10))
)
model.add(layers.Bidirectional(layers.LSTM(32)))
model.add(layers.Dense(10))
model.summary()
关于双向 RNN , 更多信息在 the API docs.
性能优化与 CuDNN 核
TF2.0 中 当发现有GPU的时候 LSTM , GRU 会自动使用CuDNN . 但是如果改了他们的默认设置, 他们可能不会自动使用 CuDNN 。 例如:
- 把 activation 从 tanh 修改成其他函数
- 把 recurrent_activation 从 sigmoid 换成函数
- recurrent_dropout > 0
- unroll = TRUE
- use_bias = False
- 当输入数据未严格右填充时使用屏蔽(如果掩码对应于严格右填充数据,则仍可以使用CuDNN。这是最常见的情况)。
尽可能使用 CuDNN
建立一个简单的LSTM模型来演示性能差异。将MNIST数字的行序列(作为时间步长处理每一行像素)用作输入序列,并预测该数字的标签。
batch_size = 64
# Each MNIST image batch is a tensor of shape (batch_size, 28, 28).
# Each input sequence will be of size (28, 28) (height is treated like time).
input_dim = 28
units = 64
output_size = 10 # labels are from 0 to 9
# Build the RNN model
def build_model(allow_cudnn_kernel=True):
# CuDNN is only available at the layer level, and not at the cell level.
# This means `LSTM(units)` will use the CuDNN kernel,
# while RNN(LSTMCell(units)) will run on non-CuDNN kernel.
if allow_cudnn_kernel:
# The LSTM layer with default options uses CuDNN.
lstm_layer = keras.layers.LSTM(units, input_shape=(None, input_dim))
else:
# Wrapping a LSTMCell in a RNN layer will not use CuDNN.
lstm_layer = keras.layers.RNN(
keras.layers.LSTMCell(units), input_shape=(None, input_dim)
)
model = keras.models.Sequential(
[
lstm_layer,
keras.layers.BatchNormalization(),
keras.layers.Dense(output_size),
]
)
return model
mnist = keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
sample, sample_label = x_train[0], y_train[0]
model = build_model(allow_cudnn_kernel=True)
model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer="sgd",
metrics=["accuracy"],
)
model.fit(
x_train, y_train, validation_data=(x_test, y_test), batch_size=batch_size, epochs=1
)
可以将其和如下不用 CuDNN的比较一下性能
noncudnn_model = build_model(allow_cudnn_kernel=False)
noncudnn_model.set_weights(model.get_weights())
noncudnn_model.compile(
loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer="sgd",
metrics=["accuracy"],
)
noncudnn_model.fit(
x_train, y_train, validation_data=(x_test, y_test), batch_size=batch_size, epochs=1
)
在安装了NVIDIA GPU和CuDNN的计算机上运行时,与使用常规TensorFlow内核的模型相比,使用CuDNN构建的模型的训练速度要快得多。
相同的支持CuDNN的模型也可以用于在仅CPU的环境中运行。下面的tf.device注释只是强制放置设备。如果没有可用的GPU,默认情况下该模型将在CPU上运行。
import matplotlib.pyplot as plt
with tf.device("CPU:0"):
cpu_model = build_model(allow_cudnn_kernel=True)
cpu_model.set_weights(model.get_weights())
result = tf.argmax(cpu_model.predict_on_batch(tf.expand_dims(sample, 0)), axis=1)
print(
"Predicted result is: %s, target result is: %s" % (result.numpy(), sample_label)
)
plt.imshow(sample, cmap=plt.get_cmap("gray"))
将 list/dict 作为RNN的输入(或嵌套输入)
嵌套结构允许实施者在单个时间步之内包括更多信息。例如,一个视频帧可以同时具有音频和视频输入。在这种情况下,数据形状可能是:
[batch, timestep, {"video": [height, width, channel], "audio": [frequency]}]
在另一个示例中,笔迹数据可以具有笔的当前位置的坐标x和y以及压力信息。因此数据表示可以是:
[batch, timestep, {"location": [x, y], "pressure": [force]}]
以下代码提供了一个示例,说明如何构建接受此类结构化输入的自定义RNN单元。
定制化Cell 以支持嵌套输入
class NestedCell(keras.layers.Layer):
def __init__(self, unit_1, unit_2, unit_3, **kwargs):
self.unit_1 = unit_1
self.unit_2 = unit_2
self.unit_3 = unit_3
self.state_size = [tf.TensorShape([unit_1]), tf.TensorShape([unit_2, unit_3])]
self.output_size = [tf.TensorShape([unit_1]), tf.TensorShape([unit_2, unit_3])]
super(NestedCell, self).__init__(**kwargs)
def build(self, input_shapes):
# expect input_shape to contain 2 items, [(batch, i1), (batch, i2, i3)]
i1 = input_shapes[0][1]
i2 = input_shapes[1][1]
i3 = input_shapes[1][2]
self.kernel_1 = self.add_weight(
shape=(i1, self.unit_1), initializer="uniform", name="kernel_1"
)
self.kernel_2_3 = self.add_weight(
shape=(i2, i3, self.unit_2, self.unit_3),
initializer="uniform",
name="kernel_2_3",
)
def call(self, inputs, states):
# inputs should be in [(batch, input_1), (batch, input_2, input_3)]
# state should be in shape [(batch, unit_1), (batch, unit_2, unit_3)]
input_1, input_2 = tf.nest.flatten(inputs)
s1, s2 = states
output_1 = tf.matmul(input_1, self.kernel_1)
output_2_3 = tf.einsum("bij,ijkl->bkl", input_2, self.kernel_2_3)
state_1 = s1 + output_1
state_2_3 = s2 + output_2_3
output = (output_1, output_2_3)
new_states = (state_1, state_2_3)
return output, new_states
def get_config(self):
return {"unit_1": self.unit_1, "unit_2": unit_2, "unit_3": self.unit_3}
RNN Model 以支持嵌套输入/输出
unit_1 = 10
unit_2 = 20
unit_3 = 30
i1 = 32
i2 = 64
i3 = 32
batch_size = 64
num_batches = 10
timestep = 50
cell = NestedCell(unit_1, unit_2, unit_3)
rnn = keras.layers.RNN(cell)
input_1 = keras.Input((None, i1))
input_2 = keras.Input((None, i2, i3))
outputs = rnn((input_1, input_2))
model = keras.models.Model([input_1, input_2], outputs)
model.compile(optimizer="adam", loss="mse", metrics=["accuracy"])
用随机生成数据进行训练
input_1_data = np.random.random((batch_size * num_batches, timestep, i1))
input_2_data = np.random.random((batch_size * num_batches, timestep, i2, i3))
target_1_data = np.random.random((batch_size * num_batches, unit_1))
target_2_data = np.random.random((batch_size * num_batches, unit_2, unit_3))
input_data = [input_1_data, input_2_data]
target_data = [target_1_data, target_2_data]
model.fit(input_data, target_data, batch_size=batch_size)