In [ ]:
%load_ext autoreload
%autoreload 2

In [ ]:
# Other imports.
from lxmls import DATA_PATH
import lxmls.readers.simple_sequence as ssr
import lxmls.sequences.hmm as hmmc
import lxmls.readers.pos_corpus as pcc
import lxmls.sequences.confusion_matrix as cm

Exercise 2.1

Load the simple sequence dataset. From the ipython command line create a simple sequence object and look at the training and test set.


In [ ]:
simple = ssr.SimpleSequence()
print(simple.train)
print(simple.test)

In [ ]:
for sequence in simple.train.seq_list:
    print(sequence)

In [ ]:
for sequence in simple.train.seq_list:
    print(sequence.x)

In [ ]:
for sequence in simple.train.seq_list:
    print(sequence.y)

Exercise 2.2

The provided function train_supervised from the hmm.py file implements the above parameter estimates. Run this function given the simple dataset above and look at the estimated probabilities. Are they correct? You can also check the variables ending in counts instead of probs to see the raw counts (for example, typing hmm.initial counts will show you the raw counts of initial states). How are the counts related to the probabilities?


In [ ]:
hmm = hmmc.HMM(simple.x_dict, simple.y_dict)
hmm.train_supervised(simple.train)

print("Initial Probabilities:")
print(hmm.initial_probs)

In [ ]:
print("Transition Probabilities:")
print(hmm.transition_probs)

In [ ]:
print("Final Probabilities:")
print(hmm.final_probs)

In [ ]:
print("Emission Probabilities")
print(hmm.emission_probs)

Exercise 2.3

Convince yourself that the score of a path in the trellis (summing over the scores above) is equivalent to the log-probability $\log P(X = x, Y = y)$, as defined in Eq. 2.2. Use the given function compute_scores on the first training sequence and confirm that the values are correct. You should get the same values as presented below


In [ ]:
initial_scores, transition_scores, final_scores, emission_scores = \
    hmm.compute_scores(simple.train.seq_list[0])
print(initial_scores)

In [ ]:
print(transition_scores)

In [ ]:
print(final_scores)

In [ ]:
print(emission_scores)

Exercise 2.4

Look at the module sequences/log_domain.py. This module implements a function logsum_pair(logx, logy) to add two numbers represented in the log-domain; it returns their sum also represented in the log-domain. The function logsum(logv) sums all components of an array represented in the log-domain. This will be used later in our decoding algorithms. To observe why this is important, type the following:


In [ ]:
import numpy as np

a = np.random.rand(10)
print(np.log(sum(np.exp(a))))
print(np.log(sum(np.exp(10*a))))
print(np.log(sum(np.exp(100*a))))
print(np.log(sum(np.exp(1000*a))))

In [ ]:
from lxmls.sequences.log_domain import logsum

print(logsum(a))
print(logsum(10*a))
print(logsum(100*a))
print(logsum(1000*a))

Exercise 2.5

Run the provided forward-backward algorithm on the first train sequence. Observe that both the forward and the backward passes give the same log-likelihood.


In [ ]:
log_likelihood, forward = hmm.decoder.run_forward(initial_scores, transition_scores, final_scores, emission_scores)
print('Log-Likelihood = {}'.format(log_likelihood))

In [ ]:
log_likelihood, backward = hmm.decoder.run_backward(initial_scores, transition_scores, final_scores, emission_scores)
print('Log-Likelihood = {}'.format(log_likelihood))

Exercise 2.6

Compute the node posteriors for the first training sequence (use the provided compute posteriors function), and look at the output. Note that the state posteriors are a proper probability distribution (the lines of the result sum to 1)


In [ ]:
initial_scores, transition_scores, final_scores, emission_scores = \
    hmm.compute_scores(simple.train.seq_list[0])
state_posteriors, _, _ = hmm.compute_posteriors(initial_scores,
                                                transition_scores,
                                                final_scores,
                                                emission_scores)
print(state_posteriors)

Exercise 2.7

Run the posterior decode on the first test sequence, and evaluate it.


In [ ]:
y_pred = hmm.posterior_decode(simple.test.seq_list[0])
print("Prediction test 0: {}".format(y_pred))
print("Truth test 0: {}".format(simple.test.seq_list[0]))

Do the same for the second test sequence:


In [ ]:
y_pred = hmm.posterior_decode(simple.test.seq_list[1])
print("Prediction test 1", y_pred)
print("Truth test 1", simple.test.seq_list[1])

What is wrong? Note the observations for the second test sequence: the observation tennis was never seen at training time, so the probability for it will be zero (no matter what state). This will make all possible state sequences have zero probability. As seen in the previous lecture, this is a problem with generative models, which can be corrected using smoothing (among other options).

Change the train supervised method to add smoothing:

def train_supervised(self,sequence_list, smoothing):

Solution

def train_supervised(self, dataset, smoothing=0):
    """ Train an HMM from a list of sequences containing observations
    and the gold states. This is just counting and normalizing."""
    # Set all counts to zeros (optionally, smooth).
    self.clear_counts(smoothing)
    # Count occurrences of events.
    self.collect_counts_from_corpus(dataset)
    # Normalize to get probabilities.
    self.compute_parameters()

Mode detaials see class HMM in lxmls.sequences.hmm


In [ ]:
hmm.train_supervised(simple.train, smoothing=0.1)

y_pred = hmm.posterior_decode(simple.test.seq_list[0])
print("Prediction test 0 with smoothing")
print(y_pred)
print("Truth test 0")
print(simple.test.seq_list[0])

In [ ]:
y_pred = hmm.posterior_decode(simple.test.seq_list[1])
print("Prediction test 1 with smoothing")
print(y_pred)
print("Truth test 1")
print(simple.test.seq_list[1])

Exercise 2.8

Implement a method for performing Viterbi decoding in file sequence classification decoder.py.

Hint: look at the implementation of run forward. Also check the help for the numpy methods max and argmax. This method will be called by

def viterbi_decode(self, sequence)

in the module sequence classifier.py.

Solution

def run_viterbi(self, initial_scores, transition_scores, final_scores, emission_scores):

    length = np.size(emission_scores, 0) # Length of the sequence.
    num_states = np.size(initial_scores) # Number of states.

    # Viterbi variables.
    viterbi = np.zeros([length, num_states]) + logzero()
    backtrack = np.zeros([length, num_states], dtype=int)

    # Initialization.
    viterbi[0,:] = emission_scores[0,:] + initial_scores

    # viterbi loop.
    for pos in xrange(1,length):
        for current_state in xrange(num_states):
            # Note the fact that multiplication in log domain turns a sum and sum turns a logsum
            viterbi_score = viterbi[pos-1, :] + transition_scores[pos-1, current_state, :]
            viterbi[pos, current_state] = np.max(viterbi_score)
            viterbi[pos, current_state] += emission_scores[pos, current_state]
            backtrack[pos, current_state] = np.argmax(viterbi_score)

    best_score = np.max(viterbi[-1, :] + final_scores)

    best_path = np.zeros(length, dtype=int)
    best_path[-1] = np.argmax(viterbi[-1, :] + final_scores)

    for pos in xrange(length-2,-1,-1):
        #best_path[pos] = int(np.argmax(backtrack[pos+1]))
        best_path[pos] = backtrack[pos+1, best_path[pos+1]]

    return best_path , best_score

In [ ]:
y_pred, score = hmm.viterbi_decode(simple.test.seq_list[0])
print("Viterbi decoding Prediction test 0 with smoothing")
print(y_pred, score)
print("Truth test 0")
print(simple.test.seq_list[0])

In [ ]:
y_pred, score = hmm.viterbi_decode(simple.test.seq_list[1])
print("Viterbi decoding Prediction test 1 with smoothing")
print(y_pred, score)
print("Truth test 1")
print(simple.test.seq_list[1])

Exercise 2.9

Test the model using both posterior decoding and Viterbi decoding on both the train and test set, using the methods in class HMM


In [ ]:
%matplotlib inline

corpus = pcc.PostagCorpus()
train_seq = corpus.read_sequence_list_conll("%s/train-02-21.conll" % DATA_PATH, max_sent_len=15, max_nr_sent=1000)
test_seq = corpus.read_sequence_list_conll("%s/test-23.conll" % DATA_PATH, max_sent_len=15, max_nr_sent=1000)
dev_seq = corpus.read_sequence_list_conll("%s/dev-22.conll" % DATA_PATH, max_sent_len=15, max_nr_sent=1000)
hmm = hmmc.HMM(corpus.word_dict, corpus.tag_dict)
hmm.train_supervised(train_seq)
hmm.print_transition_matrix()

In [ ]:
viterbi_pred_train = hmm.viterbi_decode_corpus(train_seq)
posterior_pred_train = hmm.posterior_decode_corpus(train_seq)
eval_viterbi_train = hmm.evaluate_corpus(train_seq, viterbi_pred_train)
eval_posterior_train = hmm.evaluate_corpus(train_seq, posterior_pred_train)
print("Train Set Accuracy: Posterior Decode %.3f, Viterbi Decode: %.3f"%(
eval_posterior_train,eval_viterbi_train))

In [ ]:
viterbi_pred_test = hmm.viterbi_decode_corpus(test_seq)
posterior_pred_test = hmm.posterior_decode_corpus(test_seq)
eval_viterbi_test = hmm.evaluate_corpus(test_seq, viterbi_pred_test)
eval_posterior_test = hmm.evaluate_corpus(test_seq, posterior_pred_test)
print("Test Set Accuracy: Posterior Decode %.3f, Viterbi Decode: %.3f" % (eval_posterior_test, eval_viterbi_test))

In [ ]:
best_smothing = hmm.pick_best_smoothing(train_seq, dev_seq, [10, 1, 0.1, 0])

hmm.train_supervised(train_seq, smoothing=best_smothing)
viterbi_pred_test = hmm.viterbi_decode_corpus(test_seq)
posterior_pred_test = hmm.posterior_decode_corpus(test_seq)
eval_viterbi_test = hmm.evaluate_corpus(test_seq, viterbi_pred_test)
eval_posterior_test = hmm.evaluate_corpus(test_seq, posterior_pred_test)
print("Best Smoothing %f --  Test Set Accuracy: Posterior Decode %.3f, Viterbi Decode: %.3f" % (best_smothing, eval_posterior_test, eval_viterbi_test))

In [ ]:
confusion_matrix = cm.build_confusion_matrix(test_seq.seq_list, viterbi_pred_test,
                                             len(corpus.tag_dict), hmm.get_num_states())

cm.plot_confusion_bar_graph(confusion_matrix, corpus.tag_dict,
                            range(hmm.get_num_states()), 'Confusion matrix')

Exercice 2.10

Implement the method to update the counts given the state and transition posteriors

def update_counts(self, sequence, state_posteriors, transition_posteriors):

Look at the code for EM algorithm in file sequences/hmm.py and check it for yourself.

Solution

def update_counts(self, sequence, state_posteriors, transition_posteriors):
    """ Used in the E-step in EM."""

    num_states = self.get_num_states()  # Number of states.
    length = len(sequence.x)  # Length of the sequence.

    # Take care of initial probs
    for y in range(num_states):
        self.initial_counts[y] += state_posteriors[0, y]
    for pos in range(length):
        x = sequence.x[pos]
        for y in range(num_states):
            self.emission_counts[x, y] += state_posteriors[pos, y]
            if pos > 0:
                for y_prev in range(num_states):
                    self.transition_counts[y, y_prev] += transition_posteriors[pos-1, y, y_prev]

    # Final position
    for y in range(num_states):
        self.final_counts[y] += state_posteriors[length-1, y]

Exercise 2.11

Run 20 epochs of the EM algorithm for part of speech induction:


In [ ]:
# Train with EM.
hmm.train_EM(train_seq, 0.1, 20, evaluate=True)
viterbi_pred_test = hmm.viterbi_decode_corpus(test_seq)
posterior_pred_test = hmm.posterior_decode_corpus(test_seq)
eval_viterbi_test = hmm.evaluate_corpus(test_seq, viterbi_pred_test)
eval_posterior_test = hmm.evaluate_corpus(test_seq, posterior_pred_test)

In [ ]:
confusion_matrix = cm.build_confusion_matrix(test_seq.seq_list, viterbi_pred_test,
                                             len(corpus.tag_dict), hmm.get_num_states())
cm.plot_confusion_bar_graph(confusion_matrix, corpus.tag_dict,
                            range(hmm.get_num_states()), 'Confusion matrix')