In [ ]:
import numpy as np
import theano
import theano.tensor as T
import lasagne
import os
#thanks Muammar
PAD_ix=-1
In [ ]:
with open("./train.csv") as fin:
ids,words,transcripts = zip(*[line.split(',') for line in list(fin)[1:]])
words = [word+"@" for word in words]
transcripts = [["START"]+ts[:-2].split()+["END"] for ts in transcripts]
In [ ]:
for word, trans in zip(words[:5],phonemes[:5]):
print word,':',trans
In [ ]:
phonemes = list(set([token for ts in transcripts for token in ts]))
phoneme_to_ix = {ph:i for i,ph in enumerate(phonemes)}
In [ ]:
letters = list(set([token for word in words for token in word]))
letter_to_ix = {l:i for i,l in enumerate(letters)}
In [ ]:
import matplotlib.pyplot as plt
%matplotlib inline
plt.hist(map(len,transcripts),bins=25);
# truncate names longer than MAX_LEN characters.
MAX_LEN = min([60,max(list(map(len,transcripts)))])
#ADJUST IF YOU ARE UP TO SOMETHING SERIOUS
In [ ]:
def as_matrix(sequences,token_to_i, max_len=None,PAX_ix=PAD_ix):
max_len = max_len or max(map(len,sequences))
matrix = np.zeros((len(sequences),max_len),dtype='int8') -1
for i,seq in enumerate(sequences):
row_ix = map(token_to_i.get,seq)[:max_len]
matrix[i,:len(row_ix)] = row_ix
return matrix
In [ ]:
print as_matrix(words[:10],letter_to_ix)
In [ ]:
input_sequence = T.matrix('token sequence','int32')
target_phonemes = T.matrix('target phonemes','int32')
In [ ]:
from lasagne.layers import InputLayer,DenseLayer,EmbeddingLayer
from lasagne.layers import RecurrentLayer,LSTMLayer,GRULayer,CustomRecurrentLayer
In [ ]:
##ENCODER
l_in = lasagne.layers.InputLayer(shape=(None, None),input_var=input_sequence)
l_mask = lasagne.layers.InputLayer(shape=(None, None),input_var=T.neq(input_sequence,-1))
l_emb = lasagne.layers.EmbeddingLayer(l_in, len(letters), 40)
l_rnn = lasagne.layers.GRULayer(l_emb,256,only_return_final=True,mask_input=l_mask)
##DECODER
transc_in = lasagne.layers.InputLayer(shape=(None, None),input_var=target_phonemes)
transc_mask = lasagne.layers.InputLayer(shape=(None, None),input_var=T.neq(target_phonemes,-1))
transc_emb = lasagne.layers.EmbeddingLayer(transc_in, len(phonemes), 50)
transc_rnn = lasagne.layers.GRULayer(transc_emb,256,hid_init=l_rnn,mask_input=transc_mask)
#flatten batch and time to be compatible with feedforward layers (will un-flatten later)
transc_rnn_flat = lasagne.layers.reshape(transc_rnn, (-1,transc_rnn.output_shape[-1]))
l_out = lasagne.layers.DenseLayer(transc_rnn_flat,len(phonemes),nonlinearity=lasagne.nonlinearities.softmax)
In [ ]:
# Model weights
weights = lasagne.layers.get_all_params(l_out,trainable=True)
print weights
In [ ]:
network_output = lasagne.layers.get_output(l_out)
network_output = <reshape to [batch_i, time_tick, number_of_phonemes] symbolically>
#If you use dropout do not forget to create deterministic version for evaluation
In [ ]:
predictions_flat = network_output[:,:-1,:].reshape([-1,len(phonemes)])
targets_flat = target_phonemes[:,1:].ravel()
#do not count loss for '-1' tokens
mask = T.nonzero(T.neq(targets_flat,-1))
loss = T.nnet.categorical_crossentropy(predictions_flat[mask],targets_flat[mask])
updates = lasagne.updates.adam(loss.mean(),weights)
In [ ]:
#training
train = theano.function([input_sequence, target_phonemes], loss, updates=updates, allow_input_downcast=True)
#computing loss without training
compute_cost = theano.function([input_sequence, target_phonemes], loss, allow_input_downcast=True)
In [ ]:
#compile the function that computes probabilities for next token given previous text.
network_output = <network output reshaped to [batch,tick,phoneme] format>
last_word_probas = <a matrix [batch_i, n_phonemes], counting all phonemes>
probs = <a function that predicts probabilities coming after the last token
In [ ]:
def generate_transcript(word,transcript_prefix = ("START",),END_phoneme="END"
temperature=1,sample=True):
transcript = list(transcript_prefix)
while True:
next_phoneme_probs = <a vector of probabilities of the next token>
next_phoneme_probs = <maybe apply temperature>
if sample:
next_phoneme = <phoneme sampled with these probabilities (string character)>
else:
next_phoneme = <most likely phoneme>
transcript.append(next_phoneme)
if next_phoneme==END_phoneme:
break
return transcript
In [ ]:
words = np.array(words)
transcripts = np.array(transcripts)
In [ ]:
def sample_batch(words,transcripts, batch_size):
<sample random batch of words and transcripts>
<convert both into network-edible format (as_matrix)>
return words_batch,transcripts_batch
In [ ]:
from tqdm import tqdm
print("Training ...")
#total N iterations
n_epochs=100
# how many minibatches are there in the epoch
batches_per_epoch = 500
#how many training sequences are processed in a single function call
batch_size=10
for epoch in range(n_epochs):
avg_cost = 0;
for _ in tqdm(range(batches_per_epoch)):
x,y = sample_batch(words,transcripts,batch_size)
avg_cost += train(x, y).mean()
print("Epoch {} average loss = {}".format(epoch, avg_cost / batches_per_epoch))
for i in range(5):
ind = np.random.randint(len(words))
print words[ind],':', ' '.join(generate_transcript(words[ind],sample=False)[1:-1])
In [ ]:
In [ ]: