This example trains a RNN to predict the next character in a sequence, and was created by Eben Olson (see : https://github.com/ebenolson/pydata2015). Sampling from the trained model produces somewhat intelligible text, with vocabulary and style resembling the training corpus. For more background and details:
The data used for training is one of :
For an alternative implementation in Keras, see : https://github.com/fchollet/keras/blob/master/examples/lstm_text_generation.py
In [ ]:
import numpy as np
import theano
import theano.tensor as T
import lasagne
from lasagne.utils import floatX
import pickle
import gzip
import random
from collections import Counter
In [ ]:
# Load an interesting corpus :
corpus = gzip.open('../data/RNN/claims.txt.gz').read()
#corpus = gzip.open('../data/RNN/Shakespeare.plays.txt.gz').read()
#corpus = gzip.open('../data/RNN/Shakespeare.poetry.txt.gz').read()
In [ ]:
corpus.split('\n')[0]
In [ ]:
VOCABULARY = set(corpus)
VOCAB_SIZE = len(VOCABULARY)
CHAR_TO_IX = {c: i for i, c in enumerate(VOCABULARY)}
IX_TO_CHAR = {i: c for i, c in enumerate(VOCABULARY)}
CHAR_TO_ONEHOT = {c: np.eye(VOCAB_SIZE)[i] for i, c in enumerate(VOCABULARY)}
In [ ]:
SEQUENCE_LENGTH = 50
BATCH_SIZE = 64
RNN_HIDDEN_SIZE = 200
In [ ]:
# Reserve 10% of the data for validation
train_corpus = corpus[:(len(corpus) * 9 // 10)]
val_corpus = corpus[(len(corpus) * 9 // 10):]
In [ ]:
# Our batch generator will yield sequential portions of the corpus of size SEQUENCE_LENGTH,
# starting from random locations and wrapping around the end of the data.
def data_batch_generator(corpus, size=BATCH_SIZE):
startidx = np.random.randint(0, len(corpus) - SEQUENCE_LENGTH - 1, size=size)
while True:
items = np.array([corpus[start:start + SEQUENCE_LENGTH + 1] for start in startidx])
startidx = (startidx + SEQUENCE_LENGTH) % (len(corpus) - SEQUENCE_LENGTH - 1)
yield items
In [ ]:
# Test it out
gen = data_batch_generator(corpus, size=1)
print(next(gen))
print(next(gen))
print(next(gen))
In [ ]:
# After sampling a data batch, we transform it into a one hot feature representation
# and create a target sequence by shifting by one character
def prep_batch_for_network(batch):
x_seq = np.zeros((len(batch), SEQUENCE_LENGTH, VOCAB_SIZE), dtype='float32')
y_seq = np.zeros((len(batch), SEQUENCE_LENGTH), dtype='int32')
for i, item in enumerate(batch):
for j in range(SEQUENCE_LENGTH):
x_seq[i, j] = CHAR_TO_ONEHOT[ item[j] ]
#x_seq[i, j, :] = CHAR_TO_ONEHOT[ item[j] ]
y_seq[i, j] = CHAR_TO_IX[ item[j+1] ]
return x_seq, y_seq
In [ ]:
# Symbolic variables for input. In addition to the usual features and target,
# we need initial values for the RNN layer's hidden states
x_sym = T.tensor3()
y_sym = T.imatrix()
hid_init_sym = T.matrix()
hid2_init_sym = T.matrix()
In [ ]:
# Our network has two stacked GRU layers processing the input sequence.
l_input = lasagne.layers.InputLayer((None, SEQUENCE_LENGTH, VOCAB_SIZE))
l_input_hid = lasagne.layers.InputLayer((None, RNN_HIDDEN_SIZE))
l_input_hid2 = lasagne.layers.InputLayer((None, RNN_HIDDEN_SIZE))
l_rnn = lasagne.layers.GRULayer(l_input,
num_units=RNN_HIDDEN_SIZE,
grad_clipping=5.,
hid_init=l_input_hid,
#learn_init=True,
)
l_rnn2 = lasagne.layers.GRULayer(l_rnn,
num_units=RNN_HIDDEN_SIZE,
grad_clipping=5.,
hid_init=l_input_hid2,
#learn_init=True,
)
# Before the decoder layer, we need to reshape the sequence into the batch dimension,
# so that timesteps are decoded independently.
l_shp = lasagne.layers.ReshapeLayer(l_rnn2, (-1, RNN_HIDDEN_SIZE))
l_decoder = lasagne.layers.DenseLayer(l_shp,
num_units=VOCAB_SIZE,
nonlinearity=lasagne.nonlinearities.softmax)
l_out = lasagne.layers.ReshapeLayer(l_decoder, (-1, SEQUENCE_LENGTH, VOCAB_SIZE))
In [ ]:
# We extract the hidden state of each GRU layer as well as the output of the decoder.
# Only the hidden state at the last timestep is needed
hid_out, hid2_out, prob_out = lasagne.layers.get_output([l_rnn, l_rnn2, l_out],
{l_input: x_sym,
l_input_hid: hid_init_sym,
l_input_hid2: hid2_init_sym,
})
hid_out_last = hid_out[:, -1]
hid2_out_last = hid2_out[:, -1]
In [ ]:
# We flatten the sequence into the batch dimension before calculating the loss
def calc_cross_ent(net_output, targets):
preds = T.reshape(net_output, (-1, VOCAB_SIZE))
targets = T.flatten(targets)
cost = T.nnet.categorical_crossentropy(preds, targets)
return cost
loss = T.mean(calc_cross_ent(prob_out, y_sym))
In [ ]:
# For stability during training, gradients are clipped and a total gradient norm constraint is also used
MAX_GRAD_NORM = 15
all_params = lasagne.layers.get_all_params(l_out, trainable=True)
param_values = lasagne.layers.get_all_param_values(l_out)
param_dictionary = {'param values': param_values,
'VOCABULARY': VOCABULARY,
'CHAR_TO_IX': CHAR_TO_IX,
'IX_TO_CHAR': IX_TO_CHAR,
}
all_grads = T.grad(loss, all_params)
all_grads = [T.clip(g, -5, 5) for g in all_grads]
all_grads, norm = lasagne.updates.total_norm_constraint(
all_grads, MAX_GRAD_NORM, return_norm=True)
updates = lasagne.updates.adam(all_grads, all_params, learning_rate=0.002)
f_train = theano.function([x_sym, y_sym, hid_init_sym, hid2_init_sym],
[loss, norm, hid_out_last, hid2_out_last],
updates=updates
)
f_val = theano.function([x_sym, y_sym, hid_init_sym, hid2_init_sym], [loss, hid_out_last, hid2_out_last])
In [ ]:
hid = np.zeros((BATCH_SIZE, RNN_HIDDEN_SIZE), dtype='float32')
hid2 = np.zeros((BATCH_SIZE, RNN_HIDDEN_SIZE), dtype='float32')
train_batch_gen = data_batch_generator(train_corpus)
for iteration in range(2*100*100):
x, y = prep_batch_for_network(next(train_batch_gen))
#print(iteration, np.shape(x), np.shape(y), np.shape(hid), np.shape(hid2))
loss_train, norm, hid, hid2 = f_train(x, y, hid, hid2)
if iteration % 100 == 0:
print('Iteration {}, loss_train: {}, norm: {}'.format(iteration, loss_train, norm))
In [ ]:
#pickle.dump(param_dictionary, open('./data/RNN/gru_2layer_trained.pkl','w'), protocol=pickle.HIGHEST_PROTOCOL)
In [ ]:
param_dictionary = pickle.load(open('./data/RNN/gru_2layer_trained_claims.pkl', 'r'))
lasagne.layers.set_all_param_values(l_out, param_dictionary['param values'])
In [ ]:
predict_fn = theano.function([x_sym, hid_init_sym, hid2_init_sym], [prob_out, hid_out_last, hid2_out_last])
In [ ]:
# Calculate validation loss (this takes a minute or so on a CPU)
hid = np.zeros((BATCH_SIZE, RNN_HIDDEN_SIZE), dtype='float32')
hid2 = np.zeros((BATCH_SIZE, RNN_HIDDEN_SIZE), dtype='float32')
val_batch_gen = data_batch_generator(val_corpus)
losses = []
for iteration in range(50):
x, y = prep_batch_for_network(next(val_batch_gen))
#print(iteration, np.shape(x), np.shape(y), np.shape(hid), np.shape(hid2))
loss_val, hid, hid2 = f_val(x, y, hid, hid2)
losses.append(loss_val)
print(np.mean(losses)) # Preloaded data gives a result of 0.89385
In [ ]:
l_input = lasagne.layers.InputLayer((None, 1, VOCAB_SIZE))
l_input_hid = lasagne.layers.InputLayer((None, RNN_HIDDEN_SIZE))
l_input_hid2 = lasagne.layers.InputLayer((None, RNN_HIDDEN_SIZE))
l_rnn = lasagne.layers.GRULayer(l_input,
num_units=RNN_HIDDEN_SIZE,
grad_clipping=5.,
hid_init=l_input_hid,
)
l_rnn2 = lasagne.layers.GRULayer(l_rnn,
num_units=RNN_HIDDEN_SIZE,
grad_clipping=5.,
hid_init=l_input_hid2,
)
l_shp = lasagne.layers.ReshapeLayer(l_rnn2, (-1, RNN_HIDDEN_SIZE))
l_decoder = lasagne.layers.DenseLayer(l_shp,
num_units=VOCAB_SIZE,
nonlinearity=lasagne.nonlinearities.softmax)
l_out = lasagne.layers.ReshapeLayer(l_decoder, (-1, 1, VOCAB_SIZE))
hid_out, hid2_out, prob_out = lasagne.layers.get_output([l_rnn, l_rnn2, l_out], {
l_input: x_sym,
l_input_hid: hid_init_sym,
l_input_hid2: hid2_init_sym,
})
hid_out_last = hid_out[:, -1]
hid2_out_last = hid2_out[:, -1]
prob_out_last = prob_out[0, -1]
In [ ]:
lasagne.layers.set_all_param_values(l_out, param_dictionary['param values'])
In [ ]:
predict_fn = theano.function([x_sym, hid_init_sym, hid2_init_sym], [prob_out_last, hid_out_last, hid2_out_last])
In [ ]:
primers = val_corpus.split('\n')
We feed characters one at a time from the priming sequence into the network.
To obtain a sample string, at each timestep we sample from the output probability distribution, and feed the chosen character back into the network. We terminate after the first linebreak.
In [ ]:
sentence = ''
hid = np.zeros((1, RNN_HIDDEN_SIZE), dtype='float32')
hid2 = np.zeros((1, RNN_HIDDEN_SIZE), dtype='float32')
x = np.zeros((1, 1, VOCAB_SIZE), dtype='float32')
primer = np.random.choice(primers) + '\n'
for c in primer:
p, hid, hid2 = predict_fn(x, hid, hid2)
x[0, 0, :] = CHAR_TO_ONEHOT[c]
for _ in range(500):
p, hid, hid2 = predict_fn(x, hid, hid2)
p = p/(1 + 1e-6)
s = np.random.multinomial(1, p)
sentence += IX_TO_CHAR[s.argmax(-1)]
x[0, 0, :] = s
if sentence[-1] == '\n':
break
print('PRIMER: ' + primer)
print('GENERATED: ' + sentence)
In [ ]:
# Uncomment and run this cell for a solution
#%load model/spoilers/tempsoftmax.py
In [ ]: