In [1]:
%load_ext autoreload
%autoreload 2
import theano, theano.tensor as T
import numpy as np
import theano_lstm
import random
Today we will train a nonsensical language model !
We will first collect some language data, convert it to numbers, and then feed it to a recurrent neural network and ask it to predict upcoming words. When we are done we will have a machine that can generate sentences from our made-up language ad-infinitum !
The first step here is to get some data. Since we are basing our language on nonsense, we need to generate good nonsense using a sampler.
Our sampler will take a probability table as input, e.g. a language where people are equally likely to say "a" or "b" would be written as follows:
nonsense = Sampler({"a": 0.5, "b": 0.5})
We get samples from this language like this:
word = nonsense()
We overloaded the __call__
method and got this syntactic sugar.
In [192]:
## Fake dataset:
class Sampler:
def __init__(self, prob_table):
total_prob = 0.0
if type(prob_table) is dict:
for key, value in prob_table.items():
total_prob += value
elif type(prob_table) is list:
prob_table_gen = {}
for key in prob_table:
prob_table_gen[key] = 1.0 / (float(len(prob_table)))
total_prob = 1.0
prob_table = prob_table_gen
else:
raise ArgumentError("__init__ takes either a dict or a list as its first argument")
if total_prob <= 0.0:
raise ValueError("Probability is not strictly positive.")
self._keys = []
self._probs = []
for key in prob_table:
self._keys.append(key)
self._probs.append(prob_table[key] / total_prob)
def __call__(self):
sample = random.random()
seen_prob = 0.0
for key, prob in zip(self._keys, self._probs):
if (seen_prob + prob) >= sample:
return key
else:
seen_prob += prob
return key
In [ ]:
samplers = {
"punctuation": Sampler({".": 0.49, ",": 0.5, ";": 0.03, "?": 0.05, "!": 0.05}),
"stop": Sampler({"the": 10, "from": 5, "a": 9, "they": 3, "he": 3, "it" : 2.5, "she": 2.7, "in": 4.5}),
"noun": Sampler(["cat", "broom", "boat", "dog", "car", "wrangler", "mexico", "lantern", "book", "paper", "joke","calendar", "ship", "event"]),
"verb": Sampler(["ran", "stole", "carried", "could", "would", "do", "can", "carry", "catapult", "jump", "duck"]),
"adverb": Sampler(["rapidly", "calmly", "cooly", "in jest", "fantastically", "angrily", "dazily"])
}
To create sentences from our language we create a simple recursion that goes as follows:
In [193]:
def generate_nonsense(word = ""):
if word.endswith("."):
return word
else:
if len(word) > 0:
word += " "
word += samplers["stop"]()
word += " " + samplers["noun"]()
if random.random() > 0.7:
word += " " + samplers["adverb"]()
if random.random() > 0.7:
word += " " + samplers["adverb"]()
word += " " + samplers["verb"]()
if random.random() > 0.8:
word += " " + samplers["noun"]()
if random.random() > 0.9:
word += "-" + samplers["noun"]()
if len(word) > 500:
word += "."
else:
word += " " + samplers["punctuation"]()
return generate_nonsense(word)
def generate_dataset(total_size, ):
sentences = []
for i in range(total_size):
sentences.append(generate_nonsense())
return sentences
# generate dataset
lines = generate_dataset(100)
In [ ]:
### Utilities:
class Vocab:
__slots__ = ["word2index", "index2word", "unknown"]
def __init__(self, index2word = None):
self.word2index = {}
self.index2word = []
# add unknown word:
self.add_words(["**UNKNOWN**"])
self.unknown = 0
if index2word is not None:
self.add_words(index2word)
def add_words(self, words):
for word in words:
if word not in self.word2index:
self.word2index[word] = len(self.word2index)
self.index2word.append(word)
def __call__(self, line):
"""
Convert from numerical representation to words
and vice-versa.
"""
if type(line) is np.ndarray:
return " ".join([self.index2word[word] for word in line])
if type(line) is list:
if len(line) > 0:
if line[0] is int:
return " ".join([self.index2word[word] for word in line])
indices = np.zeros(len(line), dtype=np.int32)
else:
line = line.split(" ")
indices = np.zeros(len(line), dtype=np.int32)
for i, word in enumerate(line):
indices[i] = self.word2index.get(word, self.unknown)
return indices
@property
def size(self):
return len(self.index2word)
def __len__(self):
return len(self.index2word)
In [ ]:
vocab = Vocab()
for line in lines:
vocab.add_words(line.split(" "))
To send our sentences in one big chunk to our neural network we transform each sentence into a row vector and place each of these rows into a bigger matrix that holds all these rows. Not all sentences have the same length, so we will pad those that are too short with 0s in pad_into_matrix
:
In [168]:
def pad_into_matrix(rows, padding = 0):
if len(rows) == 0:
return np.array([0, 0], dtype=np.int32)
lengths = map(len, rows)
width = max(lengths)
height = len(rows)
mat = np.empty([height, width], dtype=rows[0].dtype)
mat.fill(padding)
for i, row in enumerate(rows):
mat[i, 0:len(row)] = row
return mat, list(lengths)
# transform into big numerical matrix of sentences:
numerical_lines = []
for line in lines:
numerical_lines.append(vocab(line))
numerical_lines, numerical_lengths = pad_into_matrix(numerical_lines)
Now the real work is upon us! Thank goodness we have our language data ready. We now create a recurrent neural network by connecting an Embedding $E$ for each word in our corpus, and stacking some special cells together to form a prediction function. Mathematically we want:
$$\mathrm{argmax_{E, \Phi}} {\bf P}(w_{k+1}| w_{k}, \dots, w_{0}; E, \Phi) = f(x, h)$$with $f(\cdot, \cdot)$ the function our recurrent neural network performs at each timestep that takes as inputs:
and outputs a probability distribution $\hat{p}$ over the next word.
We have $x = E[ w_{k}]$ our observation at time $k$, and $h$ the internal state of our neural network, and $\Phi$ is the set of parameters used by our classifier, and recurrent neural network, and $E$ is the embedding for our words.
In practice we will obtain $E$ and $\Phi$ iteratively using gradient descent on the error our network is making in its prediction. To do this we define our error as the Kullback-Leibler divergence (a distance between probability distributions) between our estimate of $\hat{p} = {\bf P}(w_{k+1}| w_{k}, \dots, w_{0}; E, \Phi)$ and the actual value of ${\bf P}(w_{k+1}| w_{k}, \dots, w_{0})$ from the data (e.g. a probability distribution that is 1 for word $w_k$ and 0 elsewhere).
To build this predictive model we make use of theano_lstm, a Python module for building recurrent neural networks using Theano. The first step we take is to declare what kind of cells we want to use by declaring a celltype. There are many different celltypes we can use, but the most common these days (and incidentally most effective) are RNN
and LSTM
. For a more in-depth discussion of how these work I suggest checking out Arxiv, or Alex Graves' website, or Wikipedia. Here we use celltype = LSTM
.
self.model = StackedCells(input_size, celltype=celltype, layers =[hidden_size] * stack_size)
Once we've declared what kind of cells we want to use, we can now choose to add an Embedding to map integers (indices) to vectors (and in our case map words to their indices, then indices to word vectors we wish to train). Intuitively this lets the network separate and recognize what it is "seeing" or "receiving" at each timestep. To add an Embedding we create Embedding(vocabulary_size, size_of_embedding_vectors)
and insert it at the begging of the StackedCells
's layers list (thereby telling StackedCells
that this Embedding layer needs to be activated before the other ones):
# add an embedding
self.model.layers.insert(0, Embedding(vocab_size, input_size))
The final output of our network needs to be a probability distribution over the next words (but in different application areas this could be a sentiment classification, a decision, a topic, etc...) so we add another layer that maps the internal state of the LSTMs to a probability distribution over the all the words in our language. To ensure that our prediction is indeed a probability distribution we "activate" our layer with a Softmax, meaning that we will exponentiate every value of the output, $q_i = e^{x_i}$, so that all values are positive, and then we will divide the output by its sum so that the output sums to 1:
$$p_i = \frac{q_i}{\sum_j q_j}\text{, and }\sum_i p_i = 1.$$# add a classifier:
self.model.layers.append(Layer(hidden_size, vocab_size, activation = softmax))
For convenience we wrap this all in one class below.
We have now defined our network. At each timestep we can produce a probability distribution for each input index:
def create_prediction(self, greedy=False):
def step(idx, *states):
# new hiddens are the states we need to pass to LSTMs
# from past. Because the StackedCells also include
# the embeddings, and those have no state, we pass
# a "None" instead:
new_hiddens = [None] + list(states)
new_states = self.model.forward(idx, prev_hiddens = new_hiddens)
return new_states[1:]
...
Our inputs are an integer matrix Theano symbolic variable:
...
# in sequence forecasting scenario we take everything
# up to the before last step, and predict subsequent
# steps ergo, 0 ... n - 1, hence:
inputs = self.input_mat[:, 0:-1]
num_examples = inputs.shape[0]
# pass this to Theano's recurrence relation function:
....
Scan receives our recurrence relation step
from above, and also needs to know what will be outputted at each step in outputs_info
. We give outputs_info
a set of variables corresponding to the hidden states of our StackedCells. Some of the layers have no hidden state, and thus we should simply pass a None
to Theano, while others do require some initial state. In those cases with wrap their initial state inside a dictionary:
def has_hidden(layer):
"""
Whether a layer has a trainable
initial hidden state.
"""
return hasattr(layer, 'initial_hidden_state')
def matrixify(vector, n):
return T.repeat(T.shape_padleft(vector), n, axis=0)
def initial_state(layer, dimensions = None):
"""
Initalizes the recurrence relation with an initial hidden state
if needed, else replaces with a "None" to tell Theano that
the network **will** return something, but it does not need
to send it to the next step of the recurrence
"""
if dimensions is None:
return layer.initial_hidden_state if has_hidden(layer) else None
else:
return matrixify(layer.initial_hidden_state, dimensions) if has_hidden(layer) else None
def initial_state_with_taps(layer, dimensions = None):
"""Optionally wrap tensor variable into a dict with taps=[-1]"""
state = initial_state(layer, dimensions)
if state is not None:
return dict(initial=state, taps=[-1])
else:
return None
Let's now create these inital states (note how we skip layer 1, the embeddings by doing self.model.layers[1:]
in the iteration, this is because there is no point in passing these embeddings around in our recurrence because word vectors are only seen at the timestep they are received in this network):
# choose what gets outputted at each timestep:
outputs_info = [initial_state_with_taps(layer, num_examples) for layer in self.model.layers[1:]]
result, _ = theano.scan(fn=step,
sequences=[inputs.T],
outputs_info=outputs_info)
if greedy:
return result[0]
# softmaxes are the last layer of our network,
# and are at the end of our results list:
return result[-1].transpose((2,0,1))
# we reorder the predictions to be:
# 1. what row / example
# 2. what timestep
# 3. softmax dimension
Our error function uses theano_lstm
's masked_loss
method. This method allows us to define ranges over which a probability distribution should obey a particular target distribution. We control this method by setting start and end points for these ranges. In doing so we mask the areas where we do not care what the network predicted.
In our case our network predicts words we care about during the sentence, but when we pad our short sentences with 0s to fill our matrix, we do not care what the network does there, because this is happening outside the sentence we collected:
def create_cost_fun (self):
# create a cost function that
# takes each prediction at every timestep
# and guesses next timestep's value:
what_to_predict = self.input_mat[:, 1:]
# because some sentences are shorter, we
# place masks where the sentences end:
# (for how long is zero indexed, e.g. an example going from `[2,3)`)
# has this value set 0 (here we substract by 1):
for_how_long = self.for_how_long - 1
# all sentences start at T=0:
starting_when = T.zeros_like(self.for_how_long)
self.cost = masked_loss(self.predictions,
what_to_predict,
for_how_long,
starting_when).sum()
We now have a cost function. To perform gradient descent we now need to tell Theano how each parameter must be updated at every training epoch. We theano_lstm
's create_optimization_udpates
method to generate a dictionary of updates and to apply special gradient descent rules that accelerate and facilitate training (for instance scaling the gradients when they are too large or too little, and preventing gradients from becoming too big and making our model numerically unstable -- in this example we use Adadelta:
def create_training_function(self):
updates, _, _, _, _ = create_optimization_updates(self.cost, self.params, method="adadelta")
self.update_fun = theano.function(
inputs=[self.input_mat, self.for_how_long],
outputs=self.cost,
updates=updates,
allow_input_downcast=True)
PS: our parameters are obtained by calling self.model.params
:
@property
def params(self):
return self.model.params
In [189]:
from theano_lstm import Embedding, LSTM, RNN, StackedCells, Layer, create_optimization_updates, masked_loss
def softmax(x):
"""
Wrapper for softmax, helps with
pickling, and removing one extra
dimension that Theano adds during
its exponential normalization.
"""
return T.nnet.softmax(x.T)
def has_hidden(layer):
"""
Whether a layer has a trainable
initial hidden state.
"""
return hasattr(layer, 'initial_hidden_state')
def matrixify(vector, n):
return T.repeat(T.shape_padleft(vector), n, axis=0)
def initial_state(layer, dimensions = None):
"""
Initalizes the recurrence relation with an initial hidden state
if needed, else replaces with a "None" to tell Theano that
the network **will** return something, but it does not need
to send it to the next step of the recurrence
"""
if dimensions is None:
return layer.initial_hidden_state if has_hidden(layer) else None
else:
return matrixify(layer.initial_hidden_state, dimensions) if has_hidden(layer) else None
def initial_state_with_taps(layer, dimensions = None):
"""Optionally wrap tensor variable into a dict with taps=[-1]"""
state = initial_state(layer, dimensions)
if state is not None:
return dict(initial=state, taps=[-1])
else:
return None
class Model:
"""
Simple predictive model for forecasting words from
sequence using LSTMs. Choose how many LSTMs to stack
what size their memory should be, and how many
words can be predicted.
"""
def __init__(self, hidden_size, input_size, vocab_size, stack_size=1, celltype=LSTM):
# declare model
self.model = StackedCells(input_size, celltype=celltype, layers =[hidden_size] * stack_size)
# add an embedding
self.model.layers.insert(0, Embedding(vocab_size, input_size))
# add a classifier:
self.model.layers.append(Layer(hidden_size, vocab_size, activation = softmax))
# inputs are matrices of indices,
# each row is a sentence, each column a timestep
self._stop_word = theano.shared(np.int32(999999999), name="stop word")
self.for_how_long = T.ivector()
self.input_mat = T.imatrix()
self.priming_word = T.iscalar()
self.srng = T.shared_randomstreams.RandomStreams(np.random.randint(0, 1024))
# create symbolic variables for prediction:
self.predictions = self.create_prediction()
# create symbolic variable for greedy search:
self.greedy_predictions = self.create_prediction(greedy=True)
# create gradient training functions:
self.create_cost_fun()
self.create_training_function()
self.create_predict_function()
def stop_on(self, idx):
self._stop_word.set_value(idx)
@property
def params(self):
return self.model.params
def create_prediction(self, greedy=False):
def step(idx, *states):
# new hiddens are the states we need to pass to LSTMs
# from past. Because the StackedCells also include
# the embeddings, and those have no state, we pass
# a "None" instead:
new_hiddens = [None] + list(states)
new_states = self.model.forward(idx, prev_hiddens = new_hiddens)
if greedy:
new_idxes = new_states[-1]
new_idx = new_idxes.argmax()
# provide a stopping condition for greedy search:
return ([new_idx.astype(self.priming_word.dtype)] + new_states[1:-1]), theano.scan_module.until(T.eq(new_idx,self._stop_word))
else:
return new_states[1:]
# in sequence forecasting scenario we take everything
# up to the before last step, and predict subsequent
# steps ergo, 0 ... n - 1, hence:
inputs = self.input_mat[:, 0:-1]
num_examples = inputs.shape[0]
# pass this to Theano's recurrence relation function:
# choose what gets outputted at each timestep:
if greedy:
outputs_info = [dict(initial=self.priming_word, taps=[-1])] + [initial_state_with_taps(layer) for layer in self.model.layers[1:-1]]
result, _ = theano.scan(fn=step,
n_steps=200,
outputs_info=outputs_info)
else:
outputs_info = [initial_state_with_taps(layer, num_examples) for layer in self.model.layers[1:]]
result, _ = theano.scan(fn=step,
sequences=[inputs.T],
outputs_info=outputs_info)
if greedy:
return result[0]
# softmaxes are the last layer of our network,
# and are at the end of our results list:
return result[-1].transpose((2,0,1))
# we reorder the predictions to be:
# 1. what row / example
# 2. what timestep
# 3. softmax dimension
def create_cost_fun (self):
# create a cost function that
# takes each prediction at every timestep
# and guesses next timestep's value:
what_to_predict = self.input_mat[:, 1:]
# because some sentences are shorter, we
# place masks where the sentences end:
# (for how long is zero indexed, e.g. an example going from `[2,3)`)
# has this value set 0 (here we substract by 1):
for_how_long = self.for_how_long - 1
# all sentences start at T=0:
starting_when = T.zeros_like(self.for_how_long)
self.cost = masked_loss(self.predictions,
what_to_predict,
for_how_long,
starting_when).sum()
def create_predict_function(self):
self.pred_fun = theano.function(
inputs=[self.input_mat],
outputs =self.predictions,
allow_input_downcast=True
)
self.greedy_fun = theano.function(
inputs=[self.priming_word],
outputs=T.concatenate([T.shape_padleft(self.priming_word), self.greedy_predictions]),
allow_input_downcast=True
)
def create_training_function(self):
updates, _, _, _, _ = create_optimization_updates(self.cost, self.params, method="adadelta")
self.update_fun = theano.function(
inputs=[self.input_mat, self.for_how_long],
outputs=self.cost,
updates=updates,
allow_input_downcast=True)
def __call__(self, x):
return self.pred_fun(x)
In [ ]:
# construct model & theano functions:
model = Model(
input_size=10,
hidden_size=10,
vocab_size=len(vocab),
stack_size=1, # make this bigger, but makes compilation slow
celltype=RNN # use RNN or LSTM
)
model.stop_on(vocab.word2index["."])
In [191]:
# train:
for i in range(10000):
error = model.update_fun(numerical_lines, numerical_lengths)
if i % 100 == 0:
print("epoch %(epoch)d, error=%(error).2f" % ({"epoch": i, "error": error}))
if i % 500 == 0:
print(vocab(model.greedy_fun(vocab.word2index["the"])))