In [1]:
import pickle
import os
import re
def load_text(path):
input_file = os.path.join(path)
with open(input_file, 'r') as f:
text_data = f.read()
return text_data
def preprocess_and_save_data(text, token_lookup, create_lookup_tables):
token_dict = token_lookup()
for key, token in token_dict.items():
text = text.replace(key, '{}'.format(token))
text = list(text)
vocab_to_in, int_to_vocab = create_lookup_tables(text)
int_text = [vocab_to_in[word] for word in text]
pickle.dump((int_text, vocab_to_in, int_to_vocab, token_dict), open('preprocess.p', 'wb'))
def load_preprocess():
return pickle.load(open('preprocess.p', mode='rb'))
def save_params(params):
pickle.dump(params, open('params.p', 'wb'))
def load_params():
return pickle.load(open('params.p', mode='rb'))
In [2]:
data_path = './text/modu.txt'
text = load_text(data_path)
text = text.replace(' ', '')
# reduce 2 or more empty line to 1
text = re.sub(r'\n{2,}', '\n', text)
# replace ...... with 。
text = re.sub(r'\.+', '。', text)
# remove the content in 《》
text = re.sub(r'《.*》', '', text)
# remove ――
text = re.sub(r'――', '', text)
text = re.sub(r'\u3000', '', text)
num_words_for_training = 100000
text = text[:num_words_for_training]
lines_of_text = text.split('\n')
print(len(lines_of_text))
print(lines_of_text[:20])
In [3]:
print(lines_of_text[-10:])
In [4]:
def create_lookup_tables(input_data):
vocab = set(input_data)
vocab_to_int = {word: idx for idx, word in enumerate(vocab)}
int_to_vocab = dict(enumerate(vocab))
return vocab_to_int, int_to_vocab
def token_lookup():
""" Lookup tables for Chinese punctuations
"""
symbols = set(['。', ',', '“', "”", ';', '!', '?', '(', ')', '\n'])
tokens = ["P", "C", "Q", "T", "S", "E", "M", "I", "O", "R"]
return dict(zip(symbols, tokens))
In [5]:
# process and save the processed data
preprocess_and_save_data(''.join(lines_of_text), token_lookup, create_lookup_tables)
In [6]:
int_text, vocab_to_int, int_to_vocab, token_dict = load_preprocess()
In [7]:
import warnings
import tensorflow as tf
import numpy as np
# Check TensorFlow Version, need > 1
print('TensorFlow Version: {}'.format(tf.__version__))
# Check for a GPU
if not tf.test.gpu_device_name():
warnings.warn('No GPU found. Please use a GPU to train your neural network.')
else:
print('Default GPU Device: {}'.format(tf.test.gpu_device_name()))
In [8]:
# 训练循环次数
num_epochs = 200
# batch大小
batch_size = 256
# lstm层中包含的unit个数
rnn_size = 512
# embedding layer的大小
embed_dim = 1000
# 训练步长
seq_length = 60
# 学习率
learning_rate = 0.002
# 每多少步打印一次训练信息
show_every_n_batches = 60
# 保存session状态的位置
save_dir = './save'
Create the placeholders for input, targets and learning_Rate
In [9]:
def get_inputs():
# inputs和targets的类型都是整数的
inputs = tf.placeholder(tf.int32, [None, None], name='inputs')
targets = tf.placeholder(tf.int32, [None, None], name='targets')
learning_rate = tf.placeholder(tf.float32, name='learning_rate')
return inputs, targets, learning_rate
Create rnn cell,use lstm cell,and create lstm layers. Use dropout,and initialize lstm layer
In [10]:
def get_init_cell(batch_size, rnn_size):
# lstm层数
num_layers = 2
# dropout时的保留概率
keep_prob = 0.8
# 创建包含rnn_size个神经元的lstm cell
cell = tf.contrib.rnn.BasicLSTMCell(rnn_size)
# 使用dropout机制防止overfitting等
drop = tf.contrib.rnn.DropoutWrapper(cell, output_keep_prob=keep_prob)
# 创建2层lstm层
cell = tf.contrib.rnn.MultiRNNCell([drop for _ in range(num_layers)])
# 初始化状态为0.0
init_state = cell.zero_state(batch_size, tf.float32)
# 使用tf.identify给init_state取个名字,后面生成文字的时候,要使用这个名字来找到缓存的state
init_state = tf.identity(init_state, name='init_state')
return cell, init_state
Create embedding layers
In [11]:
def get_embed(input_data, vocab_size, embed_dim):
# create tf variable based on embedding layer size and vocab size
embedding = tf.Variable(tf.random_uniform((vocab_size, embed_dim)), dtype=tf.float32)
return tf.nn.embedding_lookup(embedding, input_data)
创建rnn节点,使用dynamic_rnn方法计算出output和final_state
In [12]:
def build_rnn(cell, inputs):
outputs, final_state = tf.nn.dynamic_rnn(cell, inputs, dtype=tf.float32)
final_state = tf.identity(final_state, name="final_state")
return outputs, final_state
In [13]:
def build_nn(cell, rnn_size, input_data, vocab_size, embed_dim):
embed = get_embed(input_data, vocab_size, rnn_size)
outputs, final_state = build_rnn(cell, embed)
logits = tf.contrib.layers.fully_connected(outputs, vocab_size, activation_fn=None,
weights_initializer = tf.truncated_normal_initializer(stddev=0.1),
biases_initializer=tf.zeros_initializer())
return logits, final_state
Define get_batches to train batch by batch
In [14]:
def get_batches(int_text, batch_size, seq_length):
# 计算有多少个batch可以创建
n_batches = (len(int_text) // (batch_size * seq_length))
# 计算每一步的原始数据,和位移一位之后的数据
batch_origin = np.array(int_text[: n_batches * batch_size * seq_length])
batch_shifted = np.array(int_text[1: n_batches * batch_size * seq_length + 1])
# 将位移之后的数据的最后一位,设置成原始数据的第一位,相当于在做循环
batch_shifted[-1] = batch_origin[0]
batch_origin_reshape = np.split(batch_origin.reshape(batch_size, -1), n_batches, 1)
batch_shifted_reshape = np.split(batch_shifted.reshape(batch_size, -1), n_batches, 1)
batches = np.array(list(zip(batch_origin_reshape, batch_shifted_reshape)))
return batches
In [15]:
from tensorflow.contrib import seq2seq
train_graph = tf.Graph()
with train_graph.as_default():
vocab_size = len(int_to_vocab)
input_text, targets, lr = get_inputs()
input_data_shape = tf.shape(input_text)
# 创建rnn的cell和初始状态节点,rnn的cell已经包含了lstm,dropout
# 这里的rnn_size表示每个lstm cell中包含了多少的神经元
cell, initial_state = get_init_cell(input_data_shape[0], rnn_size)
# 创建计算loss和finalstate的节点
logits, final_state = build_nn(cell, rnn_size, input_text, vocab_size, embed_dim)
# 使用softmax计算最后的预测概率
probs = tf.nn.softmax(logits, name='probs')
# 计算loss
cost = seq2seq.sequence_loss(
logits,
targets,
tf.ones([input_data_shape[0], input_data_shape[1]]))
# 使用Adam提督下降
optimizer = tf.train.AdamOptimizer(lr)
# 裁剪一下Gradient输出,最后的gradient都在[-1, 1]的范围内
gradients = optimizer.compute_gradients(cost)
capped_gradients = [(tf.clip_by_value(grad, -1., 1.), var) for grad, var in gradients if grad is not None]
train_op = optimizer.apply_gradients(capped_gradients)
Now, let's train the model
In [16]:
# 获得训练用的所有batch
batches = get_batches(int_text, batch_size, seq_length)
# 打开session开始训练,将上面创建的graph对象传递给session
with tf.Session(graph=train_graph) as sess:
sess.run(tf.global_variables_initializer())
for epoch_i in range(num_epochs):
state = sess.run(initial_state, {input_text: batches[0][0]})
for batch_i, (x, y) in enumerate(batches):
feed = {
input_text: x,
targets: y,
initial_state: state,
lr: learning_rate}
train_loss, state, _ = sess.run([cost, final_state, train_op], feed)
# 打印训练信息
if (epoch_i * len(batches) + batch_i) % show_every_n_batches == 0:
print('Epoch {:>3} Batch {:>4}/{} train_loss = {:.3f}'.format(
epoch_i,
batch_i,
len(batches),
train_loss))
# 保存模型
saver = tf.train.Saver()
saver.save(sess, save_dir)
print('Model Trained and Saved')
save_params((seq_length, save_dir))
In [27]:
# Save the trained model
save_params((seq_length, save_dir))
In [2]:
import tensorflow as tf
import numpy as np
_, vocab_to_int, int_to_vocab, token_dict = load_preprocess()
seq_length, load_dir = load_params()
要使用保存的模型,我们要将保存下来的变量(tensor)通过指定的name获取到
In [3]:
def get_tensors(loaded_graph):
inputs = loaded_graph.get_tensor_by_name("inputs:0")
initial_state = loaded_graph.get_tensor_by_name("init_state:0")
final_state = loaded_graph.get_tensor_by_name("final_state:0")
probs = loaded_graph.get_tensor_by_name("probs:0")
return inputs, initial_state, final_state, probs
def pick_word(probabilities, int_to_vocab):
chances = []
for idx, prob in enumerate(probabilities):
if prob >= 0.05:
chances.append(int_to_vocab[idx])
if len(chances) < 1:
return str(int_to_vocab[np.argmax(probabilities)])
else:
rand = np.random.randint(0, len(chances))
return str(chances[rand])
In [4]:
# 生成文本的长度
gen_length = 500
# 文章开头的字,指定一个即可,这个字必须是在训练词汇列表中的
prime_word = '来'
loaded_graph = tf.Graph()
with tf.Session(graph=loaded_graph) as sess:
# 加载保存过的session
loader = tf.train.import_meta_graph(load_dir + '.meta')
loader.restore(sess, load_dir)
# 通过名称获取缓存的tensor
input_text, initial_state, final_state, probs = get_tensors(loaded_graph)
# 准备开始生成文本
gen_sentences = [prime_word]
prev_state = sess.run(initial_state, {input_text: np.array([[1]])})
# 开始生成文本
for n in range(gen_length):
dyn_input = [[vocab_to_int[word] for word in gen_sentences[-seq_length:]]]
dyn_seq_length = len(dyn_input[0])
probabilities, prev_state = sess.run(
[probs, final_state],
{input_text: dyn_input, initial_state: prev_state})
# print(probabilities)
# print(len(probabilities))
pred_word = pick_word(probabilities[0][dyn_seq_length - 1], int_to_vocab)
gen_sentences.append(pred_word)
# 将标点符号还原
novel = ''.join(gen_sentences)
for key, token in token_dict.items():
ending = ' ' if key in ['\n', '(', '“'] else ''
novel = novel.replace(token.upper(), key)
novel = novel.replace('\n ', '\n')
novel = novel.replace('( ', '(')
print(novel)
In [37]:
vocab_size
Out[37]:
In [47]:
len(probabilities)
Out[47]:
In [49]:
max(probabilities[0][0])
Out[49]:
In [ ]: