说明:本文依据Github上面的一个2000星项目完成。项目作者jinfagang,项目地址,在这里感谢那些开源的程序员,让我们学到更多。
我会尽量将项目进行拆解,希望对大家的学习有所帮助吧。
第十一课 使用RNN生成古诗
1.项目架构
和之前的项目类似,该项目主要包括了三个部分:数据包、数据处理和模型构建、模型训练和结果生成。其中data文件夹放置的是诗词数据,包含有古诗文数据集,每行就代表一首诗,每首诗由标题和内容两部分组成,中间以冒号分割。
2.数据预处理和模型构建
代码位于poems.py文件的process_peoms方法,在里面我做了标注。
主要实现
- 清洗数据,筛选出符合RNN训练标准的诗词;
- 生成诗文向量、字向量、字频率
import collections
import numpy as np
start_token = 'B'
end_token = 'E'
def process_poems(file_name):
# poems -> list of numbers
poems = []
with open(file_name, "r", encoding='utf-8') as f:
for line in f.readlines():
try:
# 将题目和诗文内容分割
title, content = line.strip().split(':')
# 移除空格
content = content.replace(' ', '')
# 对诗文进行过滤(含有特殊字符和过短/过长的诗文)过短或过长会影响到RNN模型的训练
if '_' in content or '(' in content or '(' in content or '《' in content or '[' in content or start_token in content or end_token in content:
continue
if len(content) < 5 or len(content) > 79:
continue
# 处理后的诗文加入前缀B(Begin)和E(End)
content = start_token + content + end_token
poems.append(content)
except ValueError as e:
pass
# 按照诗词字数进行排序
poems = sorted(poems, key=lambda l: len(line))
# 统计每个字出现的次数
all_words = [word for poem in poems for word in poem]
# 计算每个字对应频率
counter = collections.Counter(all_words)
# 按照频率进行倒排
words = sorted(counter.keys(), key=lambda x: counter[x], reverse=True)
words.append(' ')
L = len(words)
# 每个字影射为一个数字ID
word_int_map = dict(zip(words, range(L)))
# 将诗文由字转为对应的数字ID
poems_vector = [list(map(lambda word: word_int_map.get(word, L), poem)) for poem in poems]
# 依次返回数字ID表示的诗句、汉字-ID的映射map、所有的汉字的列表
return poems_vector, word_int_map, words
if __name__ == '__main__':
filepath = r'C:\Users\01\Desktop\机器学习作业\sklearn+tensorflow\[NLP]11POETS\data\poems.txt'
poems_vector, word_to_int, vocabularies = process_poems(filepath)
3.模型构建
代码位于model.py的rnn_model方法,在这里要学的是模型的构建方法,比较值得学习。
# -*- coding: utf-8 -*
import tensorflow as tf
import numpy as np
def rnn_model(model, input_data, output_data, vocab_size, rnn_size=128, num_layers=2, batch_size=64,
learning_rate=0.01):
"""
construct rnn seq2seq model.
:param model: model class
:param input_data: 输入数据占位符
:param output_data: 输出数据占位符
:param vocab_size: words总长度
:param rnn_size: RNN中的单元数
:param num_layers: RNN层数
:param batch_size: 每个batch样本数
:param learning_rate: 学习率
:return: 返回模型状态集
"""
# 声明模型状态集, 由于模型需要返回多个相关值, 故以map集合的形式向外部返回
end_points = {}
# 选择模型的具体cell类型,源代码中使用的是仍是tf.contrib.rnn,在这里做了更新
if model == 'rnn':
cell_fun = tf.nn.rnn_cell.BasicRNNCell
elif model == 'gru':
cell_fun = tf.nn.rnn_cell.GRUCell
elif model == 'lstm':
cell_fun = tf.nn.rnn_cell.LSTMCell
# 构造具体的cell
cell = cell_fun(rnn_size, state_is_tuple=True)
# 将单层的cell变为更深的cell, 以表征更复杂的关联关系
cell = tf.nn.rnn_cell.MultiRNNCell([cell] * num_layers, state_is_tuple=True)
# 初始化cell的状态
if output_data is not None:
# 训练时batch_size容量0
initial_state = cell.zero_state(batch_size, tf.float32)
else:
# 使用时batch_size容量为1
initial_state = cell.zero_state(1, tf.float32)
# tensorflow对于lookup_embedding的操作只能再cpu上进行,其实这个默认是在cpu上操作的。
with tf.device("/cpu:0"):
embedding = tf.get_variable('embedding', initializer=tf.random_uniform(
[vocab_size + 1, rnn_size], -1.0, 1.0))
# 处理之后的shape为(batch_size, n_steps, rnn_size)
inputs = tf.nn.embedding_lookup(embedding, input_data)
outputs, last_state=tf.nn.dynamic_rnn(cell,inputs,initial_state=initial_state)
output=tf.reshape(outputs, [-1,rnn_size])
weights=tf.Variable(tf.truncated_normal([rnn_size,vocab_size+1]))
bias=tf.Variable(tf.zeros(shape=[vocab_size+1]))
logits=tf.nn.bias_add(tf.matmul(output,weights),bias=bias)
if output_data is not None:
labels = tf.one_hot(tf.reshape(output_data, [-1]), depth=vocab_size + 1)
loss = tf.nn.softmax_cross_entropy_with_logits(labels=labels, logits=logits)
total_loss = tf.reduce_mean(loss)
train_op = tf.train.AdamOptimizer(learning_rate).minimize(total_loss)
end_points['initial_state'] = initial_state
end_points['output'] = output
end_points['train_op'] = train_op
end_points['total_loss'] = total_loss
end_points['loss'] = loss
end_points['last_state'] = last_state
else:
prediction = tf.nn.softmax(logits)
end_points['initial_state'] = initial_state
end_points['last_state'] = last_state
end_points['prediction'] = prediction
return end_points