In [ ]:
import numpy as np
import theano
import theano.tensor as T
import lasagne
import os
#thanks @keskarnitish
In [ ]:
start_token = " "
with open("names") as f:
names = f.read()[:-1].split('\n')
names = [start_token+name for name in names]
In [ ]:
print ('n samples = ',len(names))
for x in names[::1000]:
print (x)
In [ ]:
#all unique characters go here
tokens = <all unique characters in the dataset>
tokens = list(tokens)
print ('n_tokens = ',len(tokens))
In [ ]:
#!token_to_id = <dictionary of symbol -> its identifier (index in tokens list)>
token_to_id = {t:i for i,t in enumerate(tokens) }
#!id_to_token = < dictionary of symbol identifier -> symbol itself>
id_to_token = {i:t for i,t in enumerate(tokens)}
In [ ]:
import matplotlib.pyplot as plt
%matplotlib inline
plt.hist(list(map(len,names),bins=25));
# truncate names longer than ~80% percentile
MAX_LEN = ?!
In [ ]:
names_ix = list(map(lambda name: list(map(token_to_id.get,name)),names))
#crop long names and pad short ones
for i in range(len(names_ix)):
names_ix[i] = names_ix[i][:MAX_LEN] #crop too long
if len(names_ix[i]) < MAX_LEN:
names_ix[i] += [token_to_id[" "]]*(MAX_LEN - len(names_ix[i])) #pad too short
assert len(set(map(len,names_ix)))==1
names_ix = np.array(names_ix)
In [ ]:
input_sequence = T.matrix('token sequencea','int32')
target_values = T.matrix('actual next token','int32')
In [ ]:
from lasagne.layers import InputLayer,DenseLayer,EmbeddingLayer
from lasagne.layers import RecurrentLayer,LSTMLayer,GRULayer,CustomRecurrentLayer
In [ ]:
l_in = lasagne.layers.InputLayer(shape=(None, None),input_var=input_sequence)
#!<Your neural network>
l_emb = <embedding layer or one-hot encoding>
l_rnn = <some recurrent layer(or several such layers)>
#flatten batch and time to be compatible with feedforward layers (will un-flatten later)
l_rnn_flat = lasagne.layers.reshape(l_rnn, (-1,l_rnn.output_shape[-1]))
l_out = <last dense layer (or several layers), returning probabilities for all possible next tokens>
In [1]:
# Model weights
weights = lasagne.layers.get_all_params(l_out,trainable=True)
print( weights)
In [ ]:
network_output = <NN output via lasagne>
#If you use dropout do not forget to create deterministic version for evaluation
In [ ]:
predicted_probabilities_flat = network_output
correct_answers_flat = target_values.ravel()
loss = <Loss function - a simple categorical crossentropy will do, maybe add some regularizer>
updates = <your favorite optimizer>
In [ ]:
#training
train = theano.function([input_sequence, target_values], loss, updates=updates, allow_input_downcast=True)
#computing loss without training
compute_cost = theano.function([input_sequence, target_values], loss, allow_input_downcast=True)
In [ ]:
#compile the function that computes probabilities for next token given previous text.
#reshape back into original shape
next_word_probas = network_output.reshape((input_sequence.shape[0],input_sequence.shape[1],len(tokens)))
#predictions for next tokens (after sequence end)
last_word_probas = next_word_probas[:,-1]
probs = theano.function([input_sequence],last_word_probas,allow_input_downcast=True)
In [ ]:
def generate_sample(seed_phrase=None,N=MAX_LEN,t=1,n_snippets=1):
'''
The function generates text given a phrase of length at least SEQ_LENGTH.
parameters:
sample_fun - max_ or proportional_sample_fun or whatever else you implemented
The phrase is set using the variable seed_phrase
The optional input "N" is used to set the number of characters of text to predict.
'''
if seed_phrase is None:
seed_phrase=start_token
if len(seed_phrase) > MAX_LEN:
seed_phrase = seed_phrase[-MAX_LEN:]
assert type(seed_phrase) is str
snippets = []
for _ in range(n_snippets):
sample_ix = []
x = [token_to_id.get(c,0) for c in seed_phrase]
x = np.array([x])
for i in range(N):
# Pick the character that got assigned the highest probability
p = probs(x).ravel()
p = p**t / np.sum(p**t)
ix = np.random.choice(np.arange(len(tokens)),p=p)
sample_ix.append(ix)
x = np.hstack((x[-MAX_LEN+1:],[[ix]]))
random_snippet = seed_phrase + ''.join(id_to_token[ix] for ix in sample_ix)
snippets.append(random_snippet)
print("----\n %s \n----" % '; '.join(snippets))
In [ ]:
def sample_batch(data, batch_size):
rows = data[np.random.randint(0,len(data),size=batch_size)]
return rows[:,:-1],rows[:,1:]
In [ ]:
print("Training ...")
#total N iterations
n_epochs=100
# how many minibatches are there in the epoch
batches_per_epoch = 500
#how many training sequences are processed in a single function call
batch_size=10
for epoch in xrange(n_epochs):
print "Generated names"
generate_sample(n_snippets=10)
avg_cost = 0;
for _ in range(batches_per_epoch):
x,y = sample_batch(names_ix,batch_size)
avg_cost += train(x, y)
print("Epoch {} average loss = {}".format(epoch, avg_cost / batches_per_epoch))
In [ ]:
In [ ]:
generate_sample(n_snippets=100)
In [ ]:
generate_sample(seed=" A")