In [1]:
import os
import pickle
from collections import Counter
import numpy as np
In [2]:
data_file = './data/simpsons/moes_tavern_lines.txt'
with open(data_file) as inf:
text= inf.read()
# Ignore the "notice" section
text = text[81:]
print(text[:100])
len(text)
Out[2]:
In [3]:
view_sentence_range = (0, 10)
In [4]:
# original:
#num_unique_words = len({word: None for word in text.split()})
#print('Roughly the number of unique words: {}'.format()
## Here we use a counter to find the size of the vocab
wc = Counter(text.lower().split())
print('Vocab:', len(wc))
print('Total words:', sum(wc.values()))
print('Common words:', wc.most_common(10), '...')
In [5]:
scenes = text.split('\n\n')
print('Number of scenes: {}'.format(len(scenes)))
In [6]:
sentence_count_scene = [scene.count('\n') for scene in scenes]
print('Average number of sentences in each scene: {}'.format(np.average(sentence_count_scene)))
In [7]:
sentences = [sentence for scene in scenes for sentence in scene.split('\n')]
print('Number of lines: {}'.format(len(sentences)))
In [8]:
word_count_sentence = [len(sentence.split()) for sentence in sentences]
print('Average number of words in each line: {}'.format(np.average(word_count_sentence)))
In [9]:
print()
print('Sentence {} to {}:'.format(*view_sentence_range))
print()
print('\n'.join(text.split('\n')[view_sentence_range[0]:view_sentence_range[1]]))
In [10]:
# Create vocab_to_int and int_to_vocab.
# This also sorts from most frequent to least frequent.
# For example, integer 0 represents the most frequently used vocab.
vocab_to_int = {}
int_to_vocab = {}
for i, (word, cnt) in enumerate(wc.most_common()):
vocab_to_int[word] = i
int_to_vocab[i] = word
In [11]:
[(k, int_to_vocab[k]) for k in list(int_to_vocab.keys())[:5]]
Out[11]:
In [12]:
[(k, vocab_to_int[k]) for k in list(vocab_to_int.keys())[:5]]
Out[12]:
In [13]:
## make it into a function suitable for the original project
def create_lookup_tables(text):
"""
Create lookup tables for vocabulary
:param text: The text of tv scripts split into words
:return: A tuple of dicts (vocab_to_int, int_to_vocab)
"""
wc = Counter(text)
vocab_to_int = {}
int_to_vocab = {}
for i, (word, cnt) in enumerate(wc.most_common()):
vocab_to_int[word] = i
int_to_vocab[i] = word
return vocab_to_int, int_to_vocab
In [14]:
## quick test
text2 = "today is an interesting day, is it not?"
text_tokens = text2.lower().split()
v2i, i2v = create_lookup_tables(text_tokens)
In [15]:
v2i
Out[15]:
In [16]:
i2v
Out[16]:
In [17]:
## test with given unit test
import problem_unittests as tests
tests.test_create_lookup_tables(create_lookup_tables)
We need to distinguish between "bye" and "bye!".
So we will translate punctuations to tokens. For example, change "!" into "||Exclamation_Mark||".
In [18]:
## the string package has a pre-defined list of punctuations
import string
type(string.punctuation), string.punctuation
Out[18]:
In [19]:
## create a dictionary according to project
def token_lookup():
"""
Generate a dict to turn punctuation into a token.
:return: Tokenize dictionary where the key is the punctuation and the value is the token
"""
#import string
punct_to_token = {'.':'||PERIOD||',
',':'||COMMA||',
'"':'||QUOTE||',
';':'||SEMICOLON||',
'!':'||EXCMARK||',
'?':'||Q||',
'(':'||OPENP||',
')':'||CLOSEP||',
'--':'||DASH||',
'\n':'||NEWLINE||'}
return punct_to_token
In [20]:
# quick test
token_dict = token_lookup()
token_dict
Out[20]:
In [21]:
## test with built-in test
tests.test_tokenize(token_lookup)
In [22]:
for key, token in token_dict.items():
text = text.replace(key, ' {} '.format(token))
text = text.lower()
text = text.split()
vocab_to_int, int_to_vocab = create_lookup_tables(text)
int_text = [vocab_to_int[word] for word in text]
with open('preprocess.p', 'wb') as outf:
pickle.dump((int_text, vocab_to_int, int_to_vocab, token_dict), outf)
In [23]:
len(int_text)
Out[23]:
In [24]:
_ = [print(int_to_vocab[item], end=" ") for item in int_text[:15]]
In [25]:
_ = [print(int_to_vocab[item], end=" ") for item in int_text[16:32]]
In [26]:
## check current directory
os.listdir('.')
Out[26]:
In [27]:
with open('preprocess.p', mode='rb') as inf:
int_text, vocab_to_int, int_to_vocab, token_dict = \
pickle.load(inf)
len(int_text), len(vocab_to_int), len(int_to_vocab), len(token_dict)
Out[27]:
In [28]:
from distutils.version import LooseVersion
import warnings
import tensorflow as tf
# Check TensorFlow Version
assert LooseVersion(tf.__version__) >= LooseVersion('1.0'), 'Please use TensorFlow version 1.0 or newer'
print('TensorFlow Version: {}'.format(tf.__version__))
# Check for a GPU
if not tf.test.gpu_device_name():
warnings.warn('No GPU found. Please use a GPU to train your neural network.')
else:
print('Default GPU Device: {}'.format(tf.test.gpu_device_name()))
In [29]:
def get_inputs():
"""
Create TF Placeholders for input, targets, and learning rate.
:return: Tuple (input, targets, learning rate)
"""
input = tf.placeholder(tf.int32, [None, None], name='input')
targets = tf.placeholder(tf.int32, [None, None], name='targets')
learning_rate = tf.placeholder(tf.float32, name='learning_rate')
return (input, targets, learning_rate)
## supplied tests
tests.test_get_inputs(get_inputs)
Stack one or more BasicLSTMCells in a MultiRNNCell
The Rnn size should be set using rnn_size
In [30]:
def get_init_cell(batch_size, rnn_size):
"""
Create an RNN Cell and initialize it.
:param batch_size: Size of batches
:param rnn_size: Size of RNNs
:return: Tuple (cell, initialize state)
"""
num_layers = 2
# this doesn't seem to work
#lstm_layer = tf.contrib.rnn.BasicLSTMCell(rnn_size)
#cell = tf.contrib.rnn.MultiRNNCell([lstm_layer] * num_layers)
# use staked version instead:
stacked_rnn = []
for i in range(num_layers):
stacked_rnn.append(tf.nn.rnn_cell.LSTMCell(num_units=512, state_is_tuple=True))
cell = tf.nn.rnn_cell.MultiRNNCell(cells=stacked_rnn, state_is_tuple=True)
# yet another version
#cell = tf.contrib.rnn.MultiRNNCell([lstm_layer, lstm_layer])
initial_state = cell.zero_state(batch_size, tf.float32)
initial_state = tf.identity(initial_state, name='initial_state')
return (cell, initial_state)
tests.test_get_init_cell(get_init_cell)
In [31]:
def get_embed(input_data, vocab_size, embed_dim):
"""
Create embedding for <input_data>.
:param input_data: TF placeholder for text input.
:param vocab_size: Number of words in vocabulary.
:param embed_dim: Number of embedding dimensions
:return: Embedded input.
"""
#embedding = tf.Variable(tf.random_uniform((vocab_size, embed_dim), -1, 1))
#embed = tf.nn.embedding_lookup(embedding, input_data)
embed = tf.contrib.layers.embed_sequence(input_data,
vocab_size=vocab_size,
embed_dim=embed_dim)
return embed
## supplied test
tests.test_get_embed(get_embed)
You created a RNN Cell in the get_init_cell() function. Time to use the cell to create a RNN.
In [32]:
def build_rnn(cell, inputs):
"""
Create a RNN using a RNN Cell
:param cell: RNN Cell
:param inputs: Input text data
:return: Tuple (Outputs, Final State)
"""
outputs, final_state = tf.nn.dynamic_rnn(cell, inputs, dtype=tf.float32)
final_state = tf.identity(final_state, name="final_state")
return outputs, final_state
tests.test_build_rnn(build_rnn)
Apply the functions you implemented above to:
Return the logits and final state in the following tuple (Logits, FinalState)
In [45]:
def build_nn(cell, rnn_size, input_data, vocab_size, embed_dim):
"""
Build part of the neural network
:param cell: RNN cell
:param rnn_size: Size of rnns
:param input_data: Input data
:param vocab_size: Vocabulary size
:return: Tuple (Logits, FinalState)
"""
embed = get_embed(input_data, vocab_size, embed_dim=embed_dim)
outputs, final_state = build_rnn(cell, embed)
#logits = tf.contrib.layers.fully_connected(outputs, vocab_size)
logits = tf.contrib.layers.fully_connected(outputs, vocab_size, activation_fn=None)
return logits, final_state
tests.test_build_nn(build_nn)
In [34]:
def test_get_batches(get_batches):
with tf.Graph().as_default():
test_batch_size = 128
test_seq_length = 5
test_int_text = list(range(1000*test_seq_length))
batches = get_batches(test_int_text, test_batch_size, test_seq_length)
# Check type
assert isinstance(batches, np.ndarray),\
'Batches is not a Numpy array'
# Check shape
assert batches.shape == (7, 2, 128, 5),\
'Batches returned wrong shape. Found {}'.format(batches.shape)
tests._print_success_message()
Implement get_batches to create batches of input and targets using int_text. The batches should be a Numpy array with the shape (number of batches, 2, batch size, sequence length). Each batch contains two elements:
If you can't fill the last batch with enough data, drop the last batch.
For exmple, get_batches([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15], 2, 3) would return a Numpy array of the following:
[
# First Batch
[
# Batch of Input
[[ 1 2 3], [ 7 8 9]],
# Batch of targets
[[ 2 3 4], [ 8 9 10]]
],
# Second Batch [ # Batch of Input [[ 4 5 6], [10 11 12]], # Batch of targets [[ 5 6 7], [11 12 13]] ] ]
In [35]:
def get_batches(int_text, batch_size, seq_length):
"""
Return batches of input and target
:param int_text: Text with the words replaced by their ids
:param batch_size: The size of batch
:param seq_length: The length of sequence
:return: Batches as a Numpy array
"""
# TODO: Implement Function
int_text = int_text[16:]
n_batches = (len(int_text)-1)//(batch_size * seq_length)
int_text = int_text[:n_batches * batch_size * seq_length + 1]
int_text_sequences = [int_text[i*seq_length:i*seq_length+seq_length] for i in range(0, n_batches * batch_size)]
int_text = int_text[1:]
int_text_targets = [int_text[i*seq_length:i*seq_length+seq_length] for i in range(0, n_batches * batch_size)]
output = []
for batch in range(n_batches):
inputs = []
targets = []
for size in range(batch_size):
inputs.append(int_text_sequences[size * n_batches + batch])
targets.append(int_text_targets[size * n_batches + batch])
output.append([inputs, targets])
return np.array(output)
## check out why this fails
#tests.test_get_batches(get_batches)
test_get_batches(get_batches)
Tune hyperparameters:
In [63]:
num_epochs = 120
batch_size = 101
rnn_size = 256
seq_length = 20
learning_rate = 0.01
embed_dim = 300
# Show stats for every n number of batches
show_every_n_batches = 100
#show_every_n_batches= get_batches(int_text, batch_size, seq_length).shape[0]
print(show_every_n_batches)
save_dir = './save'
In [64]:
## determine batch size (from someone else)
total_words = len(int_text[16:])
print('total words:', total_words)
print("Batch Size -> words missed\n")
for i in range(-5,5):
try_batch_size = batch_size + i
batches = get_batches(int_text, try_batch_size, seq_length)
flag = ""
if i == 0:
flag = "<<< Current choice. Should be minimum words missed."
print("{:>10} {:>5} {}".format(try_batch_size, total_words - try_batch_size * seq_length * batches.shape[0], flag))
In [65]:
from tensorflow.contrib import seq2seq
train_graph = tf.Graph()
with train_graph.as_default():
vocab_size = len(int_to_vocab)
input_text, targets, lr = get_inputs()
input_data_shape = tf.shape(input_text)
cell, initial_state = get_init_cell(input_data_shape[0], rnn_size)
logits, final_state = build_nn(cell, rnn_size, input_text, vocab_size, embed_dim)
# Probabilities for generating words
probs = tf.nn.softmax(logits, name='probs')
# Loss function
cost = seq2seq.sequence_loss(
logits,
targets,
tf.ones([input_data_shape[0], input_data_shape[1]]))
# Optimizer
optimizer = tf.train.AdamOptimizer(lr)
# Gradient Clipping
gradients = optimizer.compute_gradients(cost)
capped_gradients = [(tf.clip_by_value(grad, -1., 1.), var) for grad, var in gradients]
train_op = optimizer.apply_gradients(capped_gradients)
In [66]:
batches = get_batches(int_text, batch_size, seq_length)
with tf.Session(graph=train_graph) as sess:
sess.run(tf.global_variables_initializer())
for epoch_i in range(num_epochs):
state = sess.run(initial_state, {input_text: batches[0][0]})
for batch_i, (x, y) in enumerate(batches):
feed = {
input_text: x,
targets: y,
initial_state: state,
lr: learning_rate}
train_loss, state, _ = sess.run([cost, final_state, train_op], feed)
# Show every <show_every_n_batches> batches
if (epoch_i * len(batches) + batch_i) % show_every_n_batches == 0:
print('Epoch {:>3} Batch {:>4}/{} train_loss = {:.3f}'.format(
epoch_i,
batch_i,
len(batches),
train_loss))
# Save Model
saver = tf.train.Saver()
saver.save(sess, save_dir)
print('Model Trained and Saved')
In [67]:
# Save parameters for checkpoint
params = (seq_length, save_dir)
pickle.dump(params, open('params.p', 'wb'))
In [68]:
import tensorflow as tf
import numpy as np
import problem_unittests as tests
_, vocab_to_int, int_to_vocab, token_dict = pickle.load(open('preprocess.p', mode='rb'))
seq_length, load_dir = pickle.load(open('params.p', mode='rb'))
Get tensors from loaded_graph using the function get_tensor_by_name(). Get the tensors using the following names:
Return the tensors in the following tuple (InputTensor, InitialStateTensor, FinalStateTensor, ProbsTensor)
In [69]:
def get_tensors(loaded_graph):
"""
Get input, initial state, final state, and probabilities tensor from <loaded_graph>
:param loaded_graph: TensorFlow graph loaded from file
:return: Tuple (InputTensor, InitialStateTensor, FinalStateTensor, ProbsTensor)
"""
InputTensor = loaded_graph.get_tensor_by_name("input:0")
InitialStateTensor = loaded_graph.get_tensor_by_name("initial_state:0")
FinalStateTensor = loaded_graph.get_tensor_by_name("final_state:0")
ProbsTensor = loaded_graph.get_tensor_by_name("probs:0")
return (InputTensor, InitialStateTensor, FinalStateTensor, ProbsTensor)
tests.test_get_tensors(get_tensors)
In [70]:
import random
def weighted_choice(choices):
# From http://stackoverflow.com/questions/3679694/a-weighted-version-of-random-choice
total = sum(w for c, w in choices)
r = random.uniform(0, total)
upto = 0
for c, w in choices:
if upto + w >= r:
return c
upto += w
assert False, "Shouldn't get here"
def pick_from_top_5(choices):
top5 = []
for i in range(min(len(choices), 5)):
index = np.argmax(choices)
top5.append((index, choices[index]))
choices.itemset(index, 0) # Avoid picking this index as argmax again
return weighted_choice(top5)
def pick_word(probabilities, int_to_vocab):
"""
Pick the next word in the generated text
:param probabilities: Probabilites of the next word
:param int_to_vocab: Dictionary of word ids as the keys and words as the values
:return: String of the predicted word
"""
return int_to_vocab[pick_from_top_5(probabilities)]
tests.test_pick_word(pick_word)
In [72]:
gen_length = 100
# homer_simpson, moe_szyslak, or Barney_Gumble
prime_word = 'homer_simpson'
loaded_graph = tf.Graph()
with tf.Session(graph=loaded_graph) as sess:
# Load saved model
loader = tf.train.import_meta_graph(load_dir + '.meta')
loader.restore(sess, load_dir)
# Get Tensors from loaded model
input_text, initial_state, final_state, probs = get_tensors(loaded_graph)
# Sentences generation setup
gen_sentences = [prime_word + ':']
prev_state = sess.run(initial_state, {input_text: np.array([[1]])})
# Generate sentences
for n in range(gen_length):
# Dynamic Input
dyn_input = [[vocab_to_int[word] for word in gen_sentences[-seq_length:]]]
dyn_seq_length = len(dyn_input[0])
# Get Prediction
probabilities, prev_state = sess.run(
[probs, final_state],
{input_text: dyn_input, initial_state: prev_state})
probabilities = probabilities[0]
pred_word = pick_word(probabilities[dyn_seq_length-1], int_to_vocab)
gen_sentences.append(pred_word)
# Remove tokens
tv_script = ' '.join(gen_sentences)
for key, token in token_dict.items():
ending = ' ' if key in ['\n', '(', '"'] else ''
tv_script = tv_script.replace(' ' + token.lower(), key)
tv_script = tv_script.replace('\n ', '\n')
tv_script = tv_script.replace('( ', '(')
print(tv_script)
In [ ]: