In [ ]:
import numpy as np
import theano
import lasagne
import os
import pickle
import time
SENTENCE_LENGTH_MAX = 32
EMBEDDING_DIM=50
In [ ]:
import nltk
from nltk.tokenize import TreebankWordTokenizer
sentence_splitter = nltk.data.load('tokenizers/punkt/english.pickle')
tokenizer = TreebankWordTokenizer()
nltk.download('averaged_perceptron_tagger')
From the corpus download page : http://wortschatz.uni-leipzig.de/en/download/
Here's the paper that explains how the corpus was constructed :
In [ ]:
corpus_dir = './data/RNN/'
corpus_text_file = os.path.join(corpus_dir, 'en.wikipedia.2010.100K.txt')
In [ ]:
if not os.path.isfile( corpus_text_file ):
raise RuntimeError("You need to download the corpus file : Use the downloader in 5-Text-Corpus-and-Embeddings.ipynb")
else:
print("Corpus available locally")
In [ ]:
def corpus_sentence_tokens(corpus_text_file=corpus_text_file):
while True:
with open(corpus_text_file, encoding='utf-8') as f:
for line in f.readlines():
# print(line)
# .decode("utf-8")
n,l = line.split('\t') # Strip of the initial numbers
for s in sentence_splitter.tokenize(l): # Split the lines into sentences (~1 each)
tree_banked = tokenizer.tokenize(s)
if len(tree_banked) < SENTENCE_LENGTH_MAX:
yield tree_banked
print("Corpus : Looping")
corpus_sentence_tokens_gen = corpus_sentence_tokens()
In [ ]:
' | '.join(next(corpus_sentence_tokens_gen))
In [ ]:
from nltk.tag.perceptron import PerceptronTagger
pos_tagger = PerceptronTagger(load=True)
' | '.join(list(pos_tagger.classes))
In [ ]:
s = "Let 's see what part of speech analysis on this sample text looks like .".split(' ')
#s = next(corpus_sentence_tokens_gen)
pos_tagger.tag(s)
In [ ]:
tag_list = 'O E'.split(' ')
pos_tagger_entity_tags = set('NNP'.split(' '))
pos_tagger_to_idx = dict([ (t,(1 if t in pos_tagger_entity_tags else 0)) for i,t in enumerate(pos_tagger.classes)])
TAG_SET_SIZE= len(tag_list)
pos_tagger_to_idx['NNP'], pos_tagger_to_idx['VBP']
In [ ]:
glove_dir = './data/RNN/'
glove_100k_50d = 'glove.first-100k.6B.50d.txt'
glove_100k_50d_path = os.path.join(glove_dir, glove_100k_50d)
if not os.path.isfile( glove_100k_50d_path ):
raise RuntimeError("You need to download GloVE Embeddings : Use the downloader in 5-Text-Corpus-and-Embeddings.ipynb")
else:
print("GloVE available locally")
In [ ]:
# Due to size constraints, only use the first 100k vectors (i.e. 100k most frequently used words)
import glove
word_embedding = glove.Glove.load_stanford( glove_100k_50d_path )
In [ ]:
def get_embedding_vec(word):
idx = word_embedding.dictionary.get(word.lower(), -1)
if idx<0:
#print("Missing word : '%s'" % (word,))
return np.zeros( (EMBEDDING_DIM, ), dtype='float32') # UNK
return word_embedding.word_vectors[idx]
In [ ]:
BATCH_SIZE = 64
RNN_HIDDEN_SIZE = EMBEDDING_DIM # ?+1 for capitalisation flag
GRAD_CLIP_BOUND = 5.0
In [ ]:
def batch_sentences(size=BATCH_SIZE):
return [ next(corpus_sentence_tokens_gen) for i in range(size) ]
In [ ]:
# Test it out
batch_test = lambda : batch_sentences(size=4)
print([ ' '.join(s) for s in batch_test()])
In [ ]:
# After sampling a data batch, we transform it into a one hot feature representation with a mask
def prep_batch_for_network(batch_of_sentences, include_targets=False):
sentence_max_length = np.array([ len(w) for w in batch_of_sentences ]).max()
# translate into one-hot matrix, mask values and targets
input_values = np.zeros((len(batch_of_sentences), sentence_max_length, EMBEDDING_DIM), dtype='float32')
mask_values = np.zeros((len(batch_of_sentences), sentence_max_length), dtype='float32')
for i, sent in enumerate(batch_of_sentences):
for j, word in enumerate(sent):
input_values[i,j] = get_embedding_vec(word) # this is word.lower() in dictionary
mask_values[i, 0:len(sent) ] = 1.
if not include_targets:
return input_values, mask_values
target_values = np.zeros((len(batch_of_sentences), sentence_max_length), dtype='int32')
for i, sent in enumerate(batch_of_sentences):
sentence_tags = pos_tagger.tag(sent)
for j, word_tag in enumerate(sentence_tags):
target_values[i,j] = pos_tagger_to_idx[word_tag[1]] # tags are returned as tuples (word, tag)
return input_values, mask_values, target_values
In [ ]:
prep_batch_for_network(["Mr. Smith works at Red Cat Labs .".split(' ')], include_targets=True)
In [ ]:
# Symbolic variables for input. In addition to the usual features and target, we need a mask
rnn_input_sym = theano.tensor.tensor3()
rnn_mask_sym = theano.tensor.matrix()
rnn_words_target_sym = theano.tensor.imatrix() # part-of-speech generated
In [ ]:
rnn_input = lasagne.layers.InputLayer( (None, None, RNN_HIDDEN_SIZE) ) # batch_size, sequence_len, embedding_dim
rnn_mask = lasagne.layers.InputLayer( (None, None, RNN_HIDDEN_SIZE) ) # batch_size, sequence_len, embedding_dim
n_batch, n_time_steps, n_features = rnn_input_sym.shape
rnn_layer_f = lasagne.layers.GRULayer(rnn_input,
num_units=RNN_HIDDEN_SIZE,
gradient_steps=-1,
grad_clipping=GRAD_CLIP_BOUND,
hid_init=lasagne.init.Normal(),
learn_init=True,
mask_input=rnn_mask,
only_return_final=False, # Need all of the output states
)
rnn_layer_b = lasagne.layers.GRULayer(rnn_input,
num_units=RNN_HIDDEN_SIZE,
gradient_steps=-1,
grad_clipping=GRAD_CLIP_BOUND,
hid_init=lasagne.init.Normal(),
learn_init=True,
mask_input=rnn_mask,
only_return_final=False, # Need all of the output states
backwards=True,
)
# Before the decoder layer, we need to reshape the sequence into the batch dimension,
# so that timesteps are decoded independently.
rnn_reshape_f = lasagne.layers.ReshapeLayer(rnn_layer_f, (-1, RNN_HIDDEN_SIZE) )
rnn_reshape_b = lasagne.layers.ReshapeLayer(rnn_layer_b, (-1, RNN_HIDDEN_SIZE) )
# Now concatenate them
rnn_concat = lasagne.layers.ConcatLayer([rnn_reshape_f, rnn_reshape_b])
# Convert them into softmax outputs
rnn_tag_val = lasagne.layers.DenseLayer( rnn_concat, num_units=TAG_SET_SIZE, nonlinearity=lasagne.nonlinearities.softmax)
# And reshape them, so that they are in the original batches-of-sentences shape
rnn_out = lasagne.layers.ReshapeLayer(rnn_tag_val, (-1, n_time_steps, TAG_SET_SIZE))
In [ ]:
# Finally, the output stage - this is for the training (over all the words in the sentences)
rnn_output = lasagne.layers.get_output(rnn_out,
{
rnn_input: rnn_input_sym,
rnn_mask: rnn_mask_sym,
}
)
# We flatten the sequence into the batch dimension before calculating the loss
def rnn_word_cross_ent(net_output, targets):
preds = theano.tensor.reshape(net_output, (-1, TAG_SET_SIZE))
targets_flat = theano.tensor.flatten(targets)
cost = theano.tensor.nnet.categorical_crossentropy(preds, targets_flat)
return cost
rnn_loss = rnn_word_cross_ent(rnn_output, rnn_words_target_sym).mean()
In [ ]:
# For stability during training, gradients are clipped and a total gradient norm constraint can also be
#MAX_GRAD_NORM = 15
MAX_GRAD_NORM = None
rnn_params = lasagne.layers.get_all_params(rnn_out, trainable=True)
rnn_grads = theano.tensor.grad(rnn_loss, rnn_params)
rnn_grads = [theano.tensor.clip(g, -GRAD_CLIP_BOUND, GRAD_CLIP_BOUND) for g in rnn_grads]
if MAX_GRAD_NORM is not None:
rnn_grads, rnn_norm = lasagne.updates.total_norm_constraint( rnn_grads, MAX_GRAD_NORM, return_norm=True)
rnn_updates = lasagne.updates.adam(rnn_grads, rnn_params)
rnn_train = theano.function([rnn_input_sym, rnn_words_target_sym, rnn_mask_sym],
[rnn_loss],
updates=rnn_updates,
)
rnn_predict = theano.function([rnn_input_sym, rnn_mask_sym], [rnn_output])
print("Defined the RNN model")
In [ ]:
t0, iterations_complete = time.time(), 0
In [ ]:
t1, iterations_recent = time.time(), iterations_complete
epochs=1000*1
for epoch_i in range(epochs):
sentences = batch_sentences()
rnn_input_values, rnn_mask_values, rnn_target_values_int = prep_batch_for_network(sentences, include_targets=True)
# Now train the RNN
rnn_loss_, = rnn_train(rnn_input_values, rnn_target_values_int, rnn_mask_values)
iterations_complete += 1
if iterations_complete % 10 == 0:
secs_per_batch = float(time.time() - t1)/ (iterations_complete - iterations_recent)
eta_in_secs = secs_per_batch*(epochs-epoch_i)
print("Iteration {:5d}, loss_train: {:.4f} ({:.1f}s per 1000 batches) eta: {:.0f}m{:02.0f}s".format(
iterations_complete, float(rnn_loss_),
secs_per_batch*1000., np.floor(eta_in_secs/60), np.floor(eta_in_secs % 60), )
)
#print('Iteration {}, output: {}'.format(iteration, disc_output_, )) # , output: {}
t1, iterations_recent = time.time(), iterations_complete
print('Iteration {}, ran in {:.1f}sec'.format(iterations_complete, float(time.time() - t0)))
In [ ]:
rnn_param_values = lasagne.layers.get_all_param_values(rnn_out)
rnn_param_dictionary = dict(
params = rnn_param_values,
iterations_complete=iterations_complete,
)
pickle.dump(rnn_param_dictionary, open('./data/RNN/tagger_rnn_trained.pkl','wb'), protocol=pickle.HIGHEST_PROTOCOL)
In [ ]:
rnn_param_dictionary = pickle.load(open('./data/RNN/tagger_rnn_trained.pkl', 'rb'))
lasagne.layers.set_all_param_values(rnn_out, rnn_param_dictionary['params'])
In [ ]:
def tag_results_for(test_sentences):
input_values, mask_values, target_values_int = prep_batch_for_network(test_sentences, include_targets=True)
rnn_output_, = rnn_predict(input_values, mask_values)
# rnn_output_ here is a softmax-vector at every word location
for i,sent in enumerate(test_sentences[0:5]):
annotated = [
"%s-%d-%d" % (word, target_values_int[i,j], np.argmax(rnn_output_[i,j]), )
for j,word in enumerate(sent)
]
print(' '.join(annotated))
In [ ]:
sentences=[
"Dr. Andrews works at Red Cat Labs .",
"Let 's see what part of speech analysis on this sample text looks like .",
"When are you off to New York , Chaitanya ?",
]
# Uncomment this for 8 sentences from the corpus
#test_sentences = batch_sentences()
test_sentences_mixed = [ s.split(' ') for s in sentences ]
test_sentences_title = [ s.title().split(' ') for s in sentences ]
test_sentences_single = [ s.lower().split(' ') for s in sentences ]
#test_sentences_single = [ s.upper().split(' ') for s in sentences ]
print("Format : WORD-NLTK-RNN\n")
tag_results_for(test_sentences_mixed)
print()
tag_results_for(test_sentences_title)
print()
tag_results_for(test_sentences_single)
In [ ]:
Make the tagger identify different PoS (say : 'verbs')
Make the tagger return several different tags instead
See whether more advanced 'LSTM' nodes would improve the scores
Add a special 'is_uppercase' element to the embedding vector (or, more simply, just replace one of the elements with an indicator). Does this help the NNP accuracy?
In [ ]: