This example trains a RNN to predict the next character in a sequence. Sampling from the trained model produces somewhat intelligble text, with vocabulary and style resembling the training corpus. For more background and details:
The data used for training is a collection of patent claims obtained from http://www.cl.uni-heidelberg.de/statnlpgroup/pattr/
In [1]:
import numpy as np
import theano
import theano.tensor as T
import lasagne
from lasagne.utils import floatX
import pickle
import gzip
import random
from collections import Counter
In [2]:
# Load the corpus and look at an example
corpus = gzip.open('claims.txt.gz').read()
In [3]:
corpus.split('\n')[0]
Out[3]:
In [4]:
# Find the set of characters used in the corpus and construct mappings between characters,
# integer indices, and one hot encodings
VOCABULARY = set(corpus)
VOCAB_SIZE = len(VOCABULARY)
CHAR_TO_IX = {c: i for i, c in enumerate(VOCABULARY)}
IX_TO_CHAR = {i: c for i, c in enumerate(VOCABULARY)}
CHAR_TO_ONEHOT = {c: np.eye(VOCAB_SIZE)[i] for i, c in enumerate(VOCABULARY)}
In [5]:
SEQUENCE_LENGTH = 50
BATCH_SIZE = 50
RNN_HIDDEN_SIZE = 200
In [6]:
# Reserve 10% of the data for validation
train_corpus = corpus[:(len(corpus) * 9 // 10)]
val_corpus = corpus[(len(corpus) * 9 // 10):]
In [7]:
# Our batch generator will yield sequential portions of the corpus of size SEQUENCE_LENGTH,
# starting from random locations and wrapping around the end of the data.
def data_batch_generator(corpus, size=BATCH_SIZE):
startidx = np.random.randint(0, len(corpus) - SEQUENCE_LENGTH - 1, size=size)
while True:
items = np.array([corpus[start:start + SEQUENCE_LENGTH + 1] for start in startidx])
startidx = (startidx + SEQUENCE_LENGTH) % (len(corpus) - SEQUENCE_LENGTH - 1)
yield items
In [8]:
# Test it out
gen = data_batch_generator(corpus, size=1)
print(next(gen))
print(next(gen))
print(next(gen))
In [9]:
# After sampling a data batch, we transform it into a one hot feature representation
# and create a target sequence by shifting by one character
def prep_batch_for_network(batch):
x_seq = np.zeros((len(batch), SEQUENCE_LENGTH, VOCAB_SIZE), dtype='float32')
y_seq = np.zeros((len(batch), SEQUENCE_LENGTH), dtype='int32')
for i, item in enumerate(batch):
for j in range(SEQUENCE_LENGTH):
x_seq[i, j] = CHAR_TO_ONEHOT[item[j]]
y_seq[i, j] = CHAR_TO_IX[item[j + 1]]
return x_seq, y_seq
In [10]:
# Symbolic variables for input. In addition to the usual features and target,
# we need initial values for the RNN layer's hidden states
x_sym = T.tensor3()
y_sym = T.imatrix()
hid_init_sym = T.matrix()
hid2_init_sym = T.matrix()
In [11]:
l_input = lasagne.layers.InputLayer((None, SEQUENCE_LENGTH, VOCAB_SIZE))
l_input_hid = lasagne.layers.InputLayer((None, RNN_HIDDEN_SIZE))
l_input_hid2 = lasagne.layers.InputLayer((None, RNN_HIDDEN_SIZE))
# Our network has two stacked GRU layers processing the input sequence.
l_rnn = lasagne.layers.GRULayer(l_input,
num_units=RNN_HIDDEN_SIZE,
grad_clipping=5.,
hid_init=l_input_hid,
)
l_rnn2 = lasagne.layers.GRULayer(l_rnn,
num_units=RNN_HIDDEN_SIZE,
grad_clipping=5.,
hid_init=l_input_hid2,
)
l_shp = lasagne.layers.ReshapeLayer(l_rnn2, (-1, RNN_HIDDEN_SIZE))
# Before the decoder layer, we need to reshape the sequence into the batch dimension,
# so that timesteps are decoded independently.
l_decoder = lasagne.layers.DenseLayer(l_shp,
num_units=VOCAB_SIZE,
nonlinearity=lasagne.nonlinearities.softmax)
l_out = lasagne.layers.ReshapeLayer(l_decoder, (-1, SEQUENCE_LENGTH, VOCAB_SIZE))
In [12]:
# We extract the hidden state of each GRU layer as well as the output of the decoder.
# Only the hidden state at the last timestep is needed
hid_out, hid2_out, prob_out = lasagne.layers.get_output([l_rnn, l_rnn2, l_out],
{l_input: x_sym,
l_input_hid: hid_init_sym,
l_input_hid2: hid2_init_sym,
})
hid_out = hid_out[:, -1]
hid2_out = hid2_out[:, -1]
In [13]:
# We flatten the sequence into the batch dimension before calculating the loss
def calc_cross_ent(net_output, targets):
preds = T.reshape(net_output, (-1, VOCAB_SIZE))
targets = T.flatten(targets)
cost = T.nnet.categorical_crossentropy(preds, targets)
return cost
loss = T.mean(calc_cross_ent(prob_out, y_sym))
In [14]:
# For stability during training, gradients are clipped and a total gradient norm constraint is also used
MAX_GRAD_NORM = 15
all_params = lasagne.layers.get_all_params(l_out, trainable=True)
all_grads = T.grad(loss, all_params)
all_grads = [T.clip(g, -5, 5) for g in all_grads]
all_grads, norm = lasagne.updates.total_norm_constraint(
all_grads, MAX_GRAD_NORM, return_norm=True)
updates = lasagne.updates.adam(all_grads, all_params, learning_rate=0.002)
f_train = theano.function([x_sym, y_sym, hid_init_sym, hid2_init_sym],
[loss, norm, hid_out, hid2_out],
updates=updates
)
f_val = theano.function([x_sym, y_sym, hid_init_sym, hid2_init_sym], [loss, hid_out, hid2_out])
In [ ]:
# Training takes a while - you may want to skip this and the next cell, and load the pretrained weights instead
hid = np.zeros((BATCH_SIZE, RNN_HIDDEN_SIZE), dtype='float32')
hid2 = np.zeros((BATCH_SIZE, RNN_HIDDEN_SIZE), dtype='float32')
train_batch_gen = data_batch_generator(train_corpus)
for iteration in range(20000):
x, y = prep_batch_for_network(next(train_batch_gen))
loss_train, norm, hid, hid2 = f_train(x, y, hid, hid2)
if iteration % 250 == 0:
print('Iteration {}, loss_train: {}, norm: {}'.format(iteration, loss_train, norm))
In [15]:
param_values = lasagne.layers.get_all_param_values(l_out)
d = {'param values': param_values,
'VOCABULARY': VOCABULARY,
'CHAR_TO_IX': CHAR_TO_IX,
'IX_TO_CHAR': IX_TO_CHAR,
}
#pickle.dump(d, open('gru_2layer_trained.pkl','w'), protocol=pickle.HIGHEST_PROTOCOL)
In [16]:
# Load pretrained weights into network
d = pickle.load(open('gru_2layer_trained.pkl', 'r'))
lasagne.layers.set_all_param_values(l_out, d['param values'])
In [17]:
predict_fn = theano.function([x_sym, hid_init_sym, hid2_init_sym], [prob_out, hid_out, hid2_out])
In [18]:
# Calculate validation loss
hid = np.zeros((BATCH_SIZE, RNN_HIDDEN_SIZE), dtype='float32')
hid2 = np.zeros((BATCH_SIZE, RNN_HIDDEN_SIZE), dtype='float32')
val_batch_gen = data_batch_generator(val_corpus)
losses = []
for iteration in range(50):
x, y = prep_batch_for_network(next(val_batch_gen))
loss_val, hid, hid2 = f_val(x, y, hid, hid2)
losses.append(loss_val)
print(np.mean(losses))
In [19]:
# For faster sampling, we rebuild the network with a sequence length of 1
l_input = lasagne.layers.InputLayer((None, 1, VOCAB_SIZE))
l_input_hid = lasagne.layers.InputLayer((None, RNN_HIDDEN_SIZE))
l_input_hid2 = lasagne.layers.InputLayer((None, RNN_HIDDEN_SIZE))
# Our network has two stacked GRU layers processing the input sequence.
l_rnn = lasagne.layers.GRULayer(l_input,
num_units=RNN_HIDDEN_SIZE,
grad_clipping=5.,
hid_init=l_input_hid,
)
l_rnn2 = lasagne.layers.GRULayer(l_rnn,
num_units=RNN_HIDDEN_SIZE,
grad_clipping=5.,
hid_init=l_input_hid2,
)
l_shp = lasagne.layers.ReshapeLayer(l_rnn2, (-1, RNN_HIDDEN_SIZE))
l_decoder = lasagne.layers.DenseLayer(l_shp,
num_units=VOCAB_SIZE,
nonlinearity=lasagne.nonlinearities.softmax)
l_out = lasagne.layers.ReshapeLayer(l_decoder, (-1, 1, VOCAB_SIZE))
hid_out, hid2_out, prob_out = lasagne.layers.get_output([l_rnn, l_rnn2, l_out],
{l_input: x_sym,
l_input_hid: hid_init_sym,
l_input_hid2: hid2_init_sym,
})
hid_out = hid_out[:, -1]
hid2_out = hid2_out[:, -1]
prob_out = prob_out[0, -1]
In [20]:
lasagne.layers.set_all_param_values(l_out, d['param values'])
In [21]:
predict_fn = theano.function([x_sym, hid_init_sym, hid2_init_sym], [prob_out, hid_out, hid2_out])
In [22]:
# We will use random sentences from the validation corpus to 'prime' the network
primers = val_corpus.split('\n')
In [23]:
# We feed character one at a time from the priming sequence into the network.
# To obtain a sample string, at each timestep we sample from the output probability distribution,
# and feed the chosen character back into the network. We terminate after the first linebreak.
sentence = ''
hid = np.zeros((1, RNN_HIDDEN_SIZE), dtype='float32')
hid2 = np.zeros((1, RNN_HIDDEN_SIZE), dtype='float32')
x = np.zeros((1, 1, VOCAB_SIZE), dtype='float32')
primer = np.random.choice(primers) + '\n'
for c in primer:
p, hid, hid2 = predict_fn(x, hid, hid2)
x[0, 0, :] = CHAR_TO_ONEHOT[c]
for _ in range(500):
p, hid, hid2 = predict_fn(x, hid, hid2)
p = p/(1 + 1e-6)
s = np.random.multinomial(1, p)
sentence += IX_TO_CHAR[s.argmax(-1)]
x[0, 0, :] = s
if sentence[-1] == '\n':
break
print('PRIMER: ' + primer)
print('GENERATED: ' + sentence)
In [24]:
# Uncomment and run this cell for a solution
#%load spoilers/tempsoftmax.py