This Jupyter Notebook implement RNN at char level and is inspired by the Minimal character-level Vanilla RNN model written by Andrej Karpathy
Decoding is based on this code from Sherjil Ozair
I did some modifications to the original code to accomodate Jupyter, for instance the orginial code is splited in several files and are optimized to run using parameters from a shell command line. I added comments, some code to test some parts line by line.
Also I've removed the ability to use LSTM or GRU and the embedings. The results are less impressive than original code, but closer to Karpathy's Minimal character-level Vanilla RNN model
Let's dive in :)
In [1]:
import numpy as np
import tensorflow as tf
In [2]:
%matplotlib notebook
import matplotlib
import matplotlib.pyplot as plt
In [3]:
import codecs
import os
import collections
from six.moves import cPickle
from six import text_type
import time
from __future__ import print_function
In [30]:
class Args():
def __init__(self):
'''data directory containing input.txt'''
self.data_dir = 'data_rnn/tinyshakespeare'
'''directory to store checkpointed models'''
self.save_dir = 'save_vec'
'''size of RNN hidden state'''
self.rnn_size = 128
'''minibatch size'''
self.batch_size = 1 #was 40
'''RNN sequence length'''
self.seq_length = 50
'''number of epochs'''
self.num_epochs = 1 # was 5
'''save frequency'''
self.save_every = 500 # was 500
'''Print frequency'''
self.print_every = 100 # was 100
'''clip gradients at this value'''
self.grad_clip = 5.
'''learning rate'''
self.learning_rate = 0.002 # was ?
'''decay rate for rmsprop'''
self.decay_rate = 0.98 # was 0.97?
"""continue training from saved model at this path. Path must contain files saved by previous training process:
'config.pkl' : configuration;
'chars_vocab.pkl' : vocabulary definitions;
'checkpoint' : paths to model file(s) (created by tf).
Note: this file contains absolute paths, be careful when moving files around;
'model.ckpt-*' : file(s) with model definition (created by tf)
"""
self.init_from = 'save_vec'
#self.init_from = None
'''number of characters to sample'''
self.n = 500
'''prime text'''
self.prime = u' '
Transforming the original dataset in vector that can be use by a NN is always necessary.
This Class need to be replaced if you want to deal with other kind of data.
This class is able to cache the preprocessed data:
In [31]:
class TextLoader():
def __init__(self, data_dir, batch_size, seq_length, encoding='utf-8'):
self.data_dir = data_dir
self.batch_size = batch_size
self.seq_length = seq_length
self.encoding = encoding
input_file = os.path.join(data_dir, "input.txt")
vocab_file = os.path.join(data_dir, "vocab.pkl")
tensor_file = os.path.join(data_dir, "data.npy")
if not (os.path.exists(vocab_file) and os.path.exists(tensor_file)):
print("reading text file")
self.preprocess(input_file, vocab_file, tensor_file)
else:
print("loading preprocessed files")
self.load_preprocessed(vocab_file, tensor_file)
self.create_batches()
self.reset_batch_pointer()
def preprocess(self, input_file, vocab_file, tensor_file):
with codecs.open(input_file, "r", encoding=self.encoding) as f:
data = f.read()
counter = collections.Counter(data)
count_pairs = sorted(counter.items(), key=lambda x: -x[1])
self.chars, _ = zip(*count_pairs)
self.vocab_size = len(self.chars)
self.vocab = dict(zip(self.chars, range(len(self.chars))))
with open(vocab_file, 'wb') as f:
cPickle.dump(self.chars, f)
self.tensor = np.array(list(map(self.vocab.get, data)))
np.save(tensor_file, self.tensor)
def load_preprocessed(self, vocab_file, tensor_file):
with open(vocab_file, 'rb') as f:
self.chars = cPickle.load(f)
self.vocab_size = len(self.chars)
self.vocab = dict(zip(self.chars, range(len(self.chars))))
self.tensor = np.load(tensor_file)
self.num_batches = int(self.tensor.size / (self.batch_size *
self.seq_length))
def create_batches(self):
self.num_batches = int(self.tensor.size / (self.batch_size *
self.seq_length))
# When the data (tensor) is too small, let's give them a better error message
if self.num_batches==0:
assert False, "Not enough data. Make seq_length and batch_size small."
self.tensor = self.tensor[:self.num_batches * self.batch_size * self.seq_length]
xdata = self.tensor
ydata = np.copy(self.tensor)
ydata[:-1] = xdata[1:]
ydata[-1] = xdata[0]
self.x_batches = np.split(xdata.reshape(self.batch_size, -1), self.num_batches, 1)
self.y_batches = np.split(ydata.reshape(self.batch_size, -1), self.num_batches, 1)
def vectorize(self, x):
vectorized = np.zeros((len(x), len(x[0]), self.vocab_size))
for i in range(0, len(x)):
for j in range(0, len(x[0])):
vectorized[i][j][x[i][j]] = 1
return vectorized
def next_batch(self):
x, y = self.x_batches[self.pointer], self.y_batches[self.pointer]
self.pointer += 1
x_vectorized = self.vectorize(x)
y_vectorized = self.vectorize(y)
return x_vectorized, y_vectorized
def reset_batch_pointer(self):
self.pointer = 0
In [32]:
## First we open the file
args = Args()
input_file = os.path.join(args.data_dir, "input.txt")
f = codecs.open(input_file, "r", 'utf-8')
data = f.read()
print (data[0:300])
Then we have:
counter = collections.Counter(data)
count_pairs = sorted(counter.items(), key=lambda x: -x[1])
chars, _ = zip(*count_pairs)
vocab_size = len(chars)
vocab = dict(zip(chars, range(len(chars))))
Witch do the same than this:
chars = list(set(data))
data_size, vocab_size = len(data), len(chars)
vocab = { ch:i for i,ch in enumerate(chars) }
Let's see the details here:
In [33]:
counter = collections.Counter(data)
print ('histogram of char from the input data file:', counter)
In [34]:
count_pairs = sorted(counter.items(), key=lambda x: -x[1])
print (count_pairs)
In [35]:
chars, _ = zip(*count_pairs)
print ('chars', chars)
In [36]:
vocab_size = len(chars)
print (vocab_size)
In [37]:
vocab = dict(zip(chars, range(len(chars))))
print (vocab)
It can be used to calculate an ID from vocab
In [38]:
print (vocab['a'])
This is equivalent of the following code by Karpathy: it associate a unique int to any all char used in the file.
In [39]:
# Karpathy orginal code seems to do the same:
chars = list(set(data))
data_size, vocab_size = len(data), len(chars)
vocab = { ch:i for i,ch in enumerate(chars) }
print (vocab)
Now we have to make a tensor out of the data.
The tensor is done using this:
tensor = np.array(list(map(vocab.get, data)))
Let's split the line to see in details how it works:
In [40]:
data_in_array = map(vocab.get, data)
print (len(data_in_array))
print (data_in_array[0:200])
In [41]:
print (data_in_array[0], 'means', data[0],'witch is the first letter in data' )
Then we create a numpy array out of it!
In [42]:
tensor = np.array(data_in_array)
Here a reminder about the "create batches" function
def create_batches(self):
self.num_batches = int(self.tensor.size / (self.batch_size *
self.seq_length))
# When the data (tesor) is too small, let's give them a better error message
if self.num_batches==0:
assert False, "Not enough data. Make seq_length and batch_size small."
self.tensor = self.tensor[:self.num_batches * self.batch_size * self.seq_length]
xdata = self.tensor
ydata = np.copy(self.tensor)
ydata[:-1] = xdata[1:]
ydata[-1] = xdata[0]
self.x_batches = np.split(xdata.reshape(self.batch_size, -1), self.num_batches, 1)
self.y_batches = np.split(ydata.reshape(self.batch_size, -1), self.num_batches, 1)
In [43]:
data_loader = TextLoader(args.data_dir, args.batch_size, args.seq_length)
data_loader.create_batches()
x, y = data_loader.next_batch()
print ('x and y are matrix ', len(x), 'x', len(x[0]) )
print ('there are', len(x), 'batch that contains', len(x[0]), 'vector that have a size of', len(x[0][0]))
In [44]:
print ('x[0] is the first batch of input:')
print (x[0])
print ('x[0][0] is the first char:')
print (x[0][0])
print ('y[0][0] is the first batch of expected char:')
print (y[0][0])
In [45]:
print ('y[0] is x[0] shifted by one, in other words: y[0][x] == x[0][x+1]')
print ('y[0][10] ==', y[0][10])
print ('x[0][11] ==', x[0][11])
In [46]:
class Model():
def __init__(self, args, infer=False):
self.args = args
if infer:
'''Infer is true when the model is used for sampling'''
args.seq_length = 1
hidden_size = args.rnn_size
vocab_size = args.vocab_size
# define place holder to for the input data and the target.
self.input_data = tf.placeholder(tf.float32, [args.batch_size, args.seq_length, vocab_size], name='input_data')
self.target_data = tf.placeholder(tf.float32, [args.batch_size, args.seq_length, vocab_size], name='target_data')
# define the input xs
one_batch_input = tf.squeeze(tf.slice(self.input_data, [0, 0, 0], [1, args.seq_length, vocab_size]),[0])
xs = tf.split(0, args.seq_length, one_batch_input)
# define the target
one_batch_target = tf.squeeze(tf.slice(self.target_data, [0, 0, 0], [1, args.seq_length, vocab_size]),[0])
targets = tf.split(0, args.seq_length, one_batch_target)
#initial_state
self.initial_state = tf.zeros((hidden_size,1))
#last_state = tf.placeholder(tf.float32, (hidden_size, 1))
# model parameters
Wxh = tf.Variable(tf.random_uniform((hidden_size, vocab_size))*0.01, name='Wxh') # input to hidden
Whh = tf.Variable(tf.random_uniform((hidden_size, hidden_size))*0.01, name='Whh') # hidden to hidden
Why = tf.Variable(tf.random_uniform((vocab_size, hidden_size))*0.01, name='Why') # hidden to output
bh = tf.Variable(tf.zeros((hidden_size, 1)), name='bh') # hidden bias
by = tf.Variable(tf.zeros((vocab_size, 1)), name='by') # output bias
loss = tf.zeros([1], name='loss')
hs, ys, ps = {}, {}, {}
hs[-1] = self.initial_state
# forward pass
for t in xrange(args.seq_length):
xs_t = tf.transpose(xs[t])
targets_t = tf.transpose(targets[t])
hs[t] = tf.tanh(tf.matmul(Wxh, xs_t) + tf.matmul(Whh, hs[t-1]) + bh) # hidden state
ys[t] = tf.matmul(Why, hs[t]) + by # unnormalized log probabilities for next chars
ps[t] = tf.exp(ys[t]) / tf.reduce_sum(tf.exp(ys[t])) # probabilities for next chars
loss += -tf.log(tf.reduce_sum(tf.mul(ps[t], targets_t))) # softmax (cross-entropy loss)
#self.probs = ps[t]
self.cost = loss / args.batch_size / args.seq_length
self.final_state = hs[args.seq_length-1]
self.lr = tf.Variable(0.0, trainable=False, name='learning_rate')
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tvars),
args.grad_clip)
optimizer = tf.train.AdamOptimizer(self.lr)
self.train_op = optimizer.apply_gradients(zip(grads, tvars))
def sample(self, sess, chars, vocab, num=200, prime='The '):
state = model.initial_state.eval()
for char in prime[:-1]:
x = np.zeros((1,1, 65))
x[0,0, vocab[char]] = 1
feed = {self.input_data: x, self.initial_state:state}
[state] = sess.run([self.final_state], feed)
def weighted_pick(weights):
t = np.cumsum(weights)
s = np.sum(weights)
return(int(np.searchsorted(t, np.random.rand(1)*s)))
ret = prime
char = prime[-1]
for n in range(num):
x = np.zeros((1,1, 65))
x[0,0, vocab[char]] = 1
feed = {self.input_data: x, self.initial_state:state}
[probs, state] = sess.run([self.probs, self.final_state], feed)
#print ('p', probs.ravel())
#print ('state', state.ravel())
sample = weighted_pick(probs)
#print ('sample', sample)
pred = chars[sample]
ret += pred
char = pred
return ret
def inspect(self, draw=False):
for var in tf.all_variables():
if var in tf.trainable_variables():
print ('t', var.name, var.eval().shape)
if draw:
plt.figure(figsize=(1,1))
plt.figimage(var.eval())
plt.show()
else:
print ('nt', var.name, var.eval().shape)
In [47]:
tf.reset_default_graph()
args = Args()
data_loader = TextLoader(args.data_dir, args.batch_size, args.seq_length)
args.vocab_size = data_loader.vocab_size
print (args.vocab_size)
model = Model(args)
print ("model created")
# Open a session to inspect the model
with tf.Session() as sess:
tf.initialize_all_variables().run()
print('All variable initialized')
model.inspect()
'''
saver = tf.train.Saver(tf.all_variables())
ckpt = tf.train.get_checkpoint_state(args.save_dir)
print (ckpt)
if ckpt and ckpt.model_checkpoint_path:
saver.restore(sess, ckpt.model_checkpoint_path)
model.inspect()
plt.figure(figsize=(1,1))
plt.figimage(model.vectorize.eval())
plt.show()'''
The following code came from the deepdream jupyter tutorial
It allow to draw a graph in Jupyter. It looks cool, but I'm not sure it is usefull.
In [48]:
# this code from:
# https://github.com/tensorflow/tensorflow/blob/master/tensorflow/examples/tutorials/deepdream/deepdream.ipynb
from IPython.display import clear_output, Image, display, HTML
def strip_consts(graph_def, max_const_size=32):
"""Strip large constant values from graph_def."""
strip_def = tf.GraphDef()
for n0 in graph_def.node:
n = strip_def.node.add()
n.MergeFrom(n0)
if n.op == 'Const':
tensor = n.attr['value'].tensor
size = len(tensor.tensor_content)
if size > max_const_size:
tensor.tensor_content = "<stripped %d bytes>"%size
return strip_def
def rename_nodes(graph_def, rename_func):
res_def = tf.GraphDef()
for n0 in graph_def.node:
n = res_def.node.add()
n.MergeFrom(n0)
n.name = rename_func(n.name)
for i, s in enumerate(n.input):
n.input[i] = rename_func(s) if s[0]!='^' else '^'+rename_func(s[1:])
return res_def
def show_graph(graph_def, max_const_size=32):
"""Visualize TensorFlow graph."""
if hasattr(graph_def, 'as_graph_def'):
graph_def = graph_def.as_graph_def()
strip_def = strip_consts(graph_def, max_const_size=max_const_size)
code = """
<script>
function load() {{
document.getElementById("{id}").pbtxt = {data};
}}
</script>
<link rel="import" href="https://tensorboard.appspot.com/tf-graph-basic.build.html" onload=load()>
<div style="height:600px">
<tf-graph-basic id="{id}"></tf-graph-basic>
</div>
""".format(data=repr(str(strip_def)), id='graph'+str(np.random.rand()))
iframe = """
<iframe seamless style="width:800px;height:620px;border:0" srcdoc="{}"></iframe>
""".format(code.replace('"', '"'))
display(HTML(iframe))
In [49]:
# write the graph to help visualizing it
model_fn = 'model.pb'
tf.train.write_graph(sess.graph.as_graph_def(),'.', model_fn, as_text=False)
# Visualizing the network graph. Be sure expand the "mixed" nodes to see their
with tf.gfile.FastGFile(model_fn, 'rb') as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
tmp_def = rename_nodes(graph_def, lambda s:"/".join(s.split('_',1)))
#show_graph(tmp_def)
In [53]:
args = Args()
data_loader = TextLoader(args.data_dir, args.batch_size, args.seq_length)
args.vocab_size = data_loader.vocab_size
# check compatibility if training is continued from previously saved model
if args.init_from is not None:
print ("need to load file from", args.init_from)
# check if all necessary files exist
assert os.path.isdir(args.init_from)," %s must be a a path" % args.init_from
assert os.path.isfile(os.path.join(args.init_from,"config.pkl")),"config.pkl file does not exist in path %s"%args.init_from
assert os.path.isfile(os.path.join(args.init_from,"chars_vocab.pkl")),"chars_vocab.pkl.pkl file does not exist in path %s" % args.init_from
ckpt = tf.train.get_checkpoint_state(args.init_from)
assert ckpt,"No checkpoint found"
assert ckpt.model_checkpoint_path,"No model path found in checkpoint"
# open old config and check if models are compatible
with open(os.path.join(args.init_from, 'config.pkl')) as f:
saved_model_args = cPickle.load(f)
print (saved_model_args)
need_be_same=["model","rnn_size","seq_length"]
for checkme in need_be_same:
assert vars(saved_model_args)[checkme]==vars(args)[checkme],"Command line argument and saved model disagree on '%s' "%checkme
# open saved vocab/dict and check if vocabs/dicts are compatible
with open(os.path.join(args.init_from, 'chars_vocab.pkl')) as f:
saved_chars, saved_vocab = cPickle.load(f)
assert saved_chars==data_loader.chars, "Data and loaded model disagreee on character set!"
assert saved_vocab==data_loader.vocab, "Data and loaded model disagreee on dictionary mappings!"
print ("config loaded")
with open(os.path.join(args.save_dir, 'config.pkl'), 'wb') as f:
cPickle.dump(args, f)
with open(os.path.join(args.save_dir, 'chars_vocab.pkl'), 'wb') as f:
cPickle.dump((data_loader.chars, data_loader.vocab), f)
In [ ]:
print (args.print_every)
In [52]:
tf.reset_default_graph()
model = Model(args)
print ("model created")
cost_optimisation = []
with tf.Session() as sess:
tf.initialize_all_variables().run()
print ("variable initialized")
saver = tf.train.Saver(tf.all_variables())
# restore model
if args.init_from is not None:
saver.restore(sess, ckpt.model_checkpoint_path)
print ("model restored")
for e in range(args.num_epochs):
sess.run(tf.assign(model.lr, args.learning_rate * (args.decay_rate ** e)))
data_loader.reset_batch_pointer()
state = model.initial_state.eval()
for b in range(data_loader.num_batches):
start = time.time()
# Get learning data
x, y = data_loader.next_batch()
# Create the structure for the learning data
feed = {model.input_data: x, model.target_data: y, model.initial_state: state}
# Run a session using train_op
[train_loss], state, _ = sess.run([model.cost, model.final_state, model.train_op], feed)
end = time.time()
if (e * data_loader.num_batches + b) % args.print_every == 0:
cost_optimisation.append(train_loss)
print("{}/{} (epoch {}), train_loss = {:.6f}, time/batch = {:.3f}" \
.format(e * data_loader.num_batches + b,
args.num_epochs * data_loader.num_batches,
e, train_loss, end - start))
if (e * data_loader.num_batches + b) % args.save_every == 0\
or (e==args.num_epochs-1 and b == data_loader.num_batches-1): # save for the last result
checkpoint_path = os.path.join(args.save_dir, 'model.ckpt')
saver.save(sess, checkpoint_path, global_step = e * data_loader.num_batches + b)
print("model saved to {}".format(checkpoint_path))
In [27]:
plt.figure(figsize=(12,5))
plt.plot(range(len(cost_optimisation)), cost_optimisation, label='cost')
plt.legend()
plt.show()
In [28]:
tf.reset_default_graph()
model_fn = 'model.pb'
with open(os.path.join(args.save_dir, 'config.pkl'), 'rb') as f:
saved_args = cPickle.load(f)
with open(os.path.join(args.save_dir, 'chars_vocab.pkl'), 'rb') as f:
chars, vocab = cPickle.load(f)
model = Model(saved_args, True) # True to generate the model in sampling mode
with tf.Session() as sess:
tf.initialize_all_variables().run()
saver = tf.train.Saver(tf.all_variables())
ckpt = tf.train.get_checkpoint_state(args.save_dir)
print (ckpt)
model.inspect(draw=True)
In [29]:
with tf.Session() as sess:
tf.initialize_all_variables().run()
saver = tf.train.Saver(tf.all_variables())
ckpt = tf.train.get_checkpoint_state(args.save_dir)
print (ckpt)
if ckpt and ckpt.model_checkpoint_path:
saver.restore(sess, ckpt.model_checkpoint_path)
print(model.sample(sess, chars, vocab, args.n, args.prime))
That's it!
If you want to acheive better result, you can switch to LSTM with 2 layers, and add an embeding space. All of this is implemented in the original code
Feedback wellcome @dh7net