1. 从encoder-decoder说起
encoder-decoder是一个框架,在生成模型中使用广泛,这里以翻译系统为例。
- 添加encoder-decoder图片
encoder侧输入源句子:source = (x1, x2, ...xn),source经过encoder进行编码成C(非线性变换为中间语义),decoder拿着C和历史信息生成target = (y1, y2,...yn),当source和target为不同语种时就是翻译系统。
- 添加翻译动图
encoder负责解释source的语义,decoder负责编译语义至target
需要注意的是这里的encoder和decoder可以是RNN模型也可以是attention模型,且encoder和decoder可以分别使用不同的模型。
2. attention机制
- 添加注意力机制的模型
基本款的seq2seq模型使用RNN作为基础模型,局限性在于当句子很长时,输入句子浓缩成固定维度的语义向量C不能表达所有的语义信息而进行后续的decoder。因此attention机制的思路是:与其将encoder处理完的最后一个向量交给decoder去萃取整个句子信息, 不如将encoder生成的每个向量都给decoder,decoder在生成新的序列时,自己决定要将注意力放在哪些向量上面,即组合使用哪些向量。
attention机制流程:
- 拿 Decoder 當下的紅色隱狀態向量 ht 跟 Encoder 所有藍色隱狀態向量 hs 做比較,利用 score 函式計算出 ht 對每個 hs 的注意程度
- 以此注意程度為權重,加權平均所有 Encoder 隱狀態 hs 以取得上下文向量 context vector
- 將此上下文向量與 Decoder 隱狀態結合成一個注意向量 attention vector 並作為該時間的輸出
- 該注意向量會作為 Decoder 下個時間點的輸入
那什么是self-attention呢?
中心思想是在建立序列的每个元素的rep时,同时去注意同序列中其他元素的信息,结合后作为上下文咨询作为自己的rep。
待补充....
3. Transformer
attention机制一般嵌入在seq2seq模型中,而之前的seq2seq模型的encoder和decoder通常使用RNN作为基础模型,虽然attention解决了携带信息的问题,但不能并行运算,两者的结合就是transformer,
在trasformer中,decoder利用Encoder-Deocder Attention关注encoder序列,encoder和decoder各自利用Self-Attention处理自己的序列,没有使用RNN,可并行运算。预计在后序的Seq2Seq模型中,大概率就是transformer一统天下了。
4. 手撕transformer
假设经过一系列的word2vec处理和预处理后得到源语言和目标语言向量:
sour: tf.Tensor(
[[8135 105 10 1304 7925 8136 0 0]
[8135 17 3905 6013 12 2572 7925 8136]], shape=(2, 8), dtype=int64)
tar: tf.Tensor(
[[4201 10 241 80 27 3 4202 0 0 0]
[4201 162 467 421 189 14 7 553 3 4202]], shape=(2, 10), dtype=int64)
shape中的2表示batch_size大小,8和10分别表示源语言和目标语言长度,不足的用0补齐,8135和8136,4201和4202是源语言和目标语言的开头和结尾标识。每个单词再向量化后再多一维得到的shape=(batch_size, seq_len, d_model), 其中d_model为词嵌入空间维度。
4.1 scaled dot product attention
首先这里用到了遮罩的概念,遮罩分为padding mask和look ahead mask,padding mask是将序列中补零的地方遮住不让transformer看到,look ahead mask是确保Decoder只看之前产生的信息,在实际处理时遮罩处的位置置为1,所以不论哪种遮罩,那些值为1的位置就是遮罩存在的地方。
def create_padding_mask(seq):
# padding mask的工作就是把索引序列中為为0的位置置为1
mask = tf.cast(tf.equal(seq, 0), tf.float32)
# broadcasting
return mask[:, tf.newaxis, tf.newaxis, :]
# 遮罩为右上角三角形
def create_look_ahead_mask(size):
mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
return mask # (seq_len, seq_len)
例如:
emb_tar的look_ahead_mask tf.Tensor(
[[0. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[0. 0. 1. 1. 1. 1. 1. 1. 1. 1.]
[0. 0. 0. 1. 1. 1. 1. 1. 1. 1.]
[0. 0. 0. 0. 1. 1. 1. 1. 1. 1.]
[0. 0. 0. 0. 0. 1. 1. 1. 1. 1.]
[0. 0. 0. 0. 0. 0. 1. 1. 1. 1.]
[0. 0. 0. 0. 0. 0. 0. 1. 1. 1.]
[0. 0. 0. 0. 0. 0. 0. 0. 1. 1.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 1.]
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]], shape=(10, 10), dtype=float32)
注意力机制的本质就是拿一个查询(query)去跟一组(key)做运算,最后产生一个输出,并行就是拿多个query去跟一组key做运算而已。每个value的权值由value对应的key跟query计算匹配程度得到。即:
首先准备Q,K,V,这里Q和K直接使用向量化的源语言emb_sour,V是跟Q和K形状相同的二值化张量。直观见下图:
def scaled_dot_product_attention(q, k, v, mask):
matmul_qk = tf.matmul(q, k, transpose_b=True) # (..., seq_len_q, seq_len_k)
dk = tf.cast(tf.shape(k)[-1], tf.float32)
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
# 加入遮罩
if mask is not None:
scaled_attention_logits += (mask * -1e9)
# 对v进行softmax加权平均
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) # (..., seq_len_q, seq_len_k)
output = tf.matmul(attention_weights, v) # (..., seq_len_q, depth_v)
return output, attention_weights
4.2 Multi-head attention
num_heads * depth = d_model
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads # 指定要將'd_model'拆成几个heads
self.d_model = d_model # 在split_heads之前的维度
assert d_model % self.num_heads == 0
self.depth = d_model // self.num_heads # 每個 head维度
self.wq = tf.keras.layers.Dense(d_model) # 分別給q, k, v的3个线性转换
self.wk = tf.keras.layers.Dense(d_model) # 沒有指定activation func
self.wv = tf.keras.layers.Dense(d_model)
self.dense = tf.keras.layers.Dense(d_model) # 多heads串接后通过线性变换
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, v, k, q, mask):
batch_size = tf.shape(q)[0]
# 将q, k, v线性转换到d_model维空间
q = self.wq(q) # (batch_size, seq_len, d_model)
k = self.wq(k) # (batch_size, seq_len, d_model)
v = self.wq(v) # (batch_size, seq_len, d_model)
# 将d_model分为num_heads个depth维度
q = self.split_heads(q, batch_size)
k = self.split_heads(k, batch_size)
v = self.split_heads(v, batch_size)
# 利用broadcasting让每个句子的每个head的qi, ki, vi各自进行注意力机制
# 输出会多一个head维度
# scaled_attention.shape == (batch_size, num_heads, seq_len_q, depth)
# attention_weights.shape == (batch_size, num_heads, seq_len_q, seq_len_k)
scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
# 先transpose在reshape将num_heads个depth维度串接回原来的d_model维
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
# (batch_size, seq_len_q, num_heads, depth)
concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
# (batch_size, seq_len_q, d_model)
# 通过最后一个线性转换
output = self.dense(concat_attention) # (batch_size, seq_len_q, d_model)
return output, attention_weights
- 添加动画
4.3 叠加transformer
- 添加动画
层次结构
Transformer
Encoder
输入 Embedding
位置 Encoding
N个Encoder layers
sub-layer 1: Encoder 自注意力机制
sub-layer 2: Feed Forward
Decoder
输出 Embedding
位置 Encoding
N个Decoder layers
sub-layer 1: Decoder 自注意力机制
sub-layer 2: Decoder-Encoder 注意力机制
sub-layer 3: Feed Forward
Final Dense Layer
4.3.1 Encoder Layer
从下图可知,encoder layer中含有两个sub-layer,分别为FFN和MHA,在Add&Norm中每个sub-layer间存在残差连结来防止梯度消失,同时每个sub-layer都会对最后一维d-model做layer normalization,使其均值和方差接近0和1后输出。
FFN(Feed-Forward Networks)是在encoder layer和decoder layer中都有的feed-forward元件,输入输出维度一样。
def point_wise_feed_forward_network(d_model, dff):
# 对输入做两次线性变换,中间加一ReLU
return tf.keras.Sequential([
tf.keras.layers.Dense(dff, activation='relu'), # (batch_size, seq_len, dff)
tf.keras.layers.Dense(d_model) # (batch_size, seq_len, d_model)
])
class EncoderLayer(tf.keras.layer.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(EncoderLayer, self).__init__()
self.mha = MultiHeadAattention(d_model, num_heads)
self.ffn = point_wise_feed_forward_network(d_model, dff)
# layer norm 很常在RNN-based的模型被使用。一个sub-layer 一个layer norm
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
# 同样的一个sub-layer一个dropout layer
self.dropout1 = tf.keras.layers.Dropout(rate)
self.dropout2 = tf.keras.layers.Dropout(rate)
def call(self, x, training, mask):
# 传入training是因为dropout在训练和测试的行为不同
# 除了atten,其他张量的shape均为(batch_size, input_seq_len, d_model)
# attn.shape == (batch_size, num_heads, input_seq_len, input_seq_len)
# sub-layer_1: MHA
attn_output, attn = self.mha(x, x, x, mask)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(x + attn_output)
# sub_layer_2: FFN
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training) # 記得 training
out2 = self.layernorm2(out1 + ffn_output)
return out2
4.3.2 Decoder layer
整体上,decoder layer包含3部分:
- decoder自身的mask MHA1
- Decoder和Encoder之间的MHA2
- FFN
class DecoderLayer(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(DecoderLayer, self).__init__()
self.mha1 = MultiHeadAttention(d_model, num_heads)
self.mha2 = MultiHeadAttention(d_model, num_heads)
self.ffn = point_wise_feed_forward_network(d_model, dff)
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.dropout2 = tf.keras.layers.Dropout(rate)
self.dropout3 = tf.keras.layers.Dropout(rate)
def call(self, x, enc_output, training, combined_mask, inp_padding_mask):
# 所有sub-layer的输出都是(batch_size, target_seq_len, d_model)
# enc_output为Encoder的输出,shape为(batch_size, input_seq_len, d_model)
# attn_weights_block_1的shape为(batch_size, num_heads, target_seq_len, target_seq_len)
# attn_weights_block_2的shape为(batch_size, num_heads, target_seq_len, input_seq_len)
# sub-layer 1: Decoder做self-attention,v, k, q都是x
# 同时需要decoder的look ahead mask和输出序列的padding mask
attn1, attn_weight1 = self.mha1(x, x, x, combined_mask)
attnn1 = self.dropout1(attn1, training=training)
out1 = self.layernorm1(attn1)
# sub-layer 2: Decoder layer关注Encoder的输出序列
# v, k是enc_output,q为MHA1的结果out1
# 需要用到padding mask避免关注到<pad>
attn2, attn_weight2 = self.mha2(enc_output, enc_output, out1, inp_padding_mask)
attn2 = self.dropout2(attn2, training=training)
out2 = self.layernorm2(attn2 + out1)
# sub-layer 3: FFN
ffn_output = self.ffn(out2)
ffn_output = self.dropout3(ffn_output, training=training)
out3 = self.layernorm3(ffn_output + out2)
return out3, attn_weights_block1, attn_weights_block2
上面的代码中在第一个mha中存在一个参数combined_mask,是自己的(tar的而不是sour)padding mask和look ahead mask的结合,结合方式如下,只需要把两个遮罩取大即可,第二个mha的padding mask是sour的。
tar_padding_mask = create_padding_mask(tar)
look_ahead_mask = create_look_ahead_mask(tar.shape[-1])
combined_mask = tf.maximum(tar_padding_mask, look_ahead_mask)
4.3.3 Positional encoding
注意力机制使得序列之间的观察通过O(1)计算就可达到,从而解决了长依赖的问题,但是无法很好的表达顺序信息,所以加入Positional encoding位置编码给transformer,直接加到word embedding中,维度与d_model相同。
此函数具有一个很好的特性,就是给定一个位置的编码PE(pos),离它k个单位位置的编码PE(pos+k)可以用PE(pos)线性表示出来。
def get_angles(pos, i, d_model):
angle_rates = 1 / np.power(10000, (2 * (i//2)) / np.float32(d_model))
return pos * angle_rates
def positional_encoding(position, d_model):
angle_rads = get_angles(np.arange(position)[:, np.newaxis],
np.arange(d_model)[np.newaxis, :],
d_model)
# apply sin to even indices in the array; 2i
sines = np.sin(angle_rads[:, 0::2])
# apply cos to odd indices in the array; 2i+1
cosines = np.cos(angle_rads[:, 1::2])
pos_encoding = np.concatenate([sines, cosines], axis=-1)
pos_encoding = pos_encoding[np.newaxis, ...]
return tf.cast(pos_encoding, dtype=tf.float32)
4.3.4 Encoder
Encoder包含3个组件:
- 输入的词嵌入层
- 位置编码
- N个Encoder layer
输入:(batch_size, seq_len)
输出:(batch_size, seq_len, d_model)
class Encoder(tf.keras.layers.Layer):
# num_layers: EncoderLayer层数
# input_vocab_size: 把索引转换成词嵌入向量
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, rate=0.1):
super(Encoder, self).__init__()
self.d_model = d_model
self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
self.pos_encoding = positional_encoding(input_vocab_size, self.d_model)
# 建立num_layers个Encoder Layers
self.enc_layer = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(rate)
def call(self, x, training, mask):
# x.shape = (batch_size, input_seq_len)
input_seq_len = tf.shape(x)[1]
# 将2维的索引序列转换成3维的词嵌入向量,根据论文乘上sqrt(d_model),再加上对应的位置编码
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding(:, :input_seq_len, :)
# 对embedding和位置编码的结合通过dropout做regularization
x = self.dropout(x, training=training)
# 通过n个encoder layer编码
for i,enc_layer in enumerate(self.enc_layers):
x = enc_layer(x, training, mask)
return x
4.3.5 Decoder
跟Encoder相同
class Decoder(tf.keras.layers.Layer):
def __init__(self, num_layers, d_model, dff, target_vocab_size, rate=0.1):
super(Decoder, self).__init__()
self.d_model = d_model
self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
self.pos_encoding = positional_encoding(target_vocab_size, self.d_model)
self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(rate)
def call(self, x, enc_output, training, combined_mask, inp_padding_mask):
tar_seq_len = tf.shape(x)[1]
attention_weights = {} # 用来存放每个Decoder layer的注意权重
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding[:, :tar_seq_len, :]
x = self.dropout(x, training=training)
for i, dec_layer in enumerate(self.dec_layers):
x, block1, block2 = dec_layer(x, enc_output, training, combined_mask, inp_padding_mask)
attention_weights['decoder_layer{}_block1'.format(i + 1)] = block1
attention_weights['decoder_layer{}_block2'.format(i + 1)] = block2
return x, attention_weights
4.3.6 组合transformer
由3部分组成:
- Encoder
- Decoder
- Final linear layer
输入:
sour序列:(batch_size, sour_seq_len)
tar序列:(batch_size, tar_seq_len)
输出:
生成序列:(batch_size, tar_seq_len, tar_vocab_size)
注意力权重
class Transformer(tf.keras.Model):
def __init__(self, num_layers, num_heads, dff, input_vocab_size, target_vocab_size, rate=0.1):
super(Transformer, self).__init__()
self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, rate)
self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, rate)
self.final_layer = tf.keras.Dense(target_vocab_size)
def call(self, sour, tar, training, enc_paading_mask, combined_mask, dec_padding_mask):
enc_output = self.encoder(sour, training, enc_padding_mask)
# enc_output = (batch_size, sour_seq_len, d_model)
dec_output, attention_weights = self.decoder(tar, enc_output, training, combined_mask, dec_padding_mask)
# dec_output = (batch_size, tar_seq_len, d_model)
# 将decoder输出经过最后的linear layer
final_output = self.final_layer(dec_output)
# (batch_size, tar_seq_len, target_vocab_size)
retutrn final_output, attention_weights
- 补充最后的视频