In [ ]:
%load_ext autoreload
%autoreload 2
In [ ]:
# Other imports.
from lxmls import DATA_PATH
import lxmls.readers.simple_sequence as ssr
import lxmls.sequences.hmm as hmmc
import lxmls.readers.pos_corpus as pcc
import lxmls.sequences.confusion_matrix as cm
In [ ]:
simple = ssr.SimpleSequence()
print(simple.train)
print(simple.test)
In [ ]:
for sequence in simple.train.seq_list:
print(sequence)
In [ ]:
for sequence in simple.train.seq_list:
print(sequence.x)
In [ ]:
for sequence in simple.train.seq_list:
print(sequence.y)
The provided function train_supervised from the hmm.py file implements the above parameter estimates. Run this function given the simple dataset above and look at the estimated probabilities. Are they correct? You can also check the variables ending in counts instead of probs to see the raw counts (for example, typing hmm.initial counts will show you the raw counts of initial states). How are the counts related to the probabilities?
In [ ]:
hmm = hmmc.HMM(simple.x_dict, simple.y_dict)
hmm.train_supervised(simple.train)
print("Initial Probabilities:")
print(hmm.initial_probs)
In [ ]:
print("Transition Probabilities:")
print(hmm.transition_probs)
In [ ]:
print("Final Probabilities:")
print(hmm.final_probs)
In [ ]:
print("Emission Probabilities")
print(hmm.emission_probs)
Convince yourself that the score of a path in the trellis (summing over the scores above) is equivalent to the log-probability $\log P(X = x, Y = y)$, as defined in Eq. 2.2. Use the given function compute_scores on the first training sequence and confirm that the values are correct. You should get the same values as presented below
In [ ]:
initial_scores, transition_scores, final_scores, emission_scores = \
hmm.compute_scores(simple.train.seq_list[0])
print(initial_scores)
In [ ]:
print(transition_scores)
In [ ]:
print(final_scores)
In [ ]:
print(emission_scores)
Look at the module sequences/log_domain.py
. This module implements a function logsum_pair(logx, logy)
to add two numbers represented in the log-domain; it returns their sum also represented in the log-domain. The
function logsum(logv) sums all components of an array represented in the log-domain. This will be used later in our
decoding algorithms. To observe why this is important, type the following:
In [ ]:
import numpy as np
a = np.random.rand(10)
print(np.log(sum(np.exp(a))))
print(np.log(sum(np.exp(10*a))))
print(np.log(sum(np.exp(100*a))))
print(np.log(sum(np.exp(1000*a))))
In [ ]:
from lxmls.sequences.log_domain import logsum
print(logsum(a))
print(logsum(10*a))
print(logsum(100*a))
print(logsum(1000*a))
In [ ]:
log_likelihood, forward = hmm.decoder.run_forward(initial_scores, transition_scores, final_scores, emission_scores)
print('Log-Likelihood = {}'.format(log_likelihood))
In [ ]:
log_likelihood, backward = hmm.decoder.run_backward(initial_scores, transition_scores, final_scores, emission_scores)
print('Log-Likelihood = {}'.format(log_likelihood))
In [ ]:
initial_scores, transition_scores, final_scores, emission_scores = \
hmm.compute_scores(simple.train.seq_list[0])
state_posteriors, _, _ = hmm.compute_posteriors(initial_scores,
transition_scores,
final_scores,
emission_scores)
print(state_posteriors)
In [ ]:
y_pred = hmm.posterior_decode(simple.test.seq_list[0])
print("Prediction test 0: {}".format(y_pred))
print("Truth test 0: {}".format(simple.test.seq_list[0]))
Do the same for the second test sequence:
In [ ]:
y_pred = hmm.posterior_decode(simple.test.seq_list[1])
print("Prediction test 1", y_pred)
print("Truth test 1", simple.test.seq_list[1])
What is wrong? Note the observations for the second test sequence: the observation tennis was never seen at training time, so the probability for it will be zero (no matter what state). This will make all possible state sequences have zero probability. As seen in the previous lecture, this is a problem with generative models, which can be corrected using smoothing (among other options).
Change the train supervised method to add smoothing:
def train_supervised(self,sequence_list, smoothing):
def train_supervised(self, dataset, smoothing=0):
""" Train an HMM from a list of sequences containing observations
and the gold states. This is just counting and normalizing."""
# Set all counts to zeros (optionally, smooth).
self.clear_counts(smoothing)
# Count occurrences of events.
self.collect_counts_from_corpus(dataset)
# Normalize to get probabilities.
self.compute_parameters()
Mode detaials see class HMM in lxmls.sequences.hmm
In [ ]:
hmm.train_supervised(simple.train, smoothing=0.1)
y_pred = hmm.posterior_decode(simple.test.seq_list[0])
print("Prediction test 0 with smoothing")
print(y_pred)
print("Truth test 0")
print(simple.test.seq_list[0])
In [ ]:
y_pred = hmm.posterior_decode(simple.test.seq_list[1])
print("Prediction test 1 with smoothing")
print(y_pred)
print("Truth test 1")
print(simple.test.seq_list[1])
Implement a method for performing Viterbi decoding in file sequence classification decoder.py
.
Hint: look at the implementation of run forward
. Also check the help for the numpy methods max and argmax.
This method will be called by
def viterbi_decode(self, sequence)
in the module sequence classifier.py.
def run_viterbi(self, initial_scores, transition_scores, final_scores, emission_scores):
length = np.size(emission_scores, 0) # Length of the sequence.
num_states = np.size(initial_scores) # Number of states.
# Viterbi variables.
viterbi = np.zeros([length, num_states]) + logzero()
backtrack = np.zeros([length, num_states], dtype=int)
# Initialization.
viterbi[0,:] = emission_scores[0,:] + initial_scores
# viterbi loop.
for pos in xrange(1,length):
for current_state in xrange(num_states):
# Note the fact that multiplication in log domain turns a sum and sum turns a logsum
viterbi_score = viterbi[pos-1, :] + transition_scores[pos-1, current_state, :]
viterbi[pos, current_state] = np.max(viterbi_score)
viterbi[pos, current_state] += emission_scores[pos, current_state]
backtrack[pos, current_state] = np.argmax(viterbi_score)
best_score = np.max(viterbi[-1, :] + final_scores)
best_path = np.zeros(length, dtype=int)
best_path[-1] = np.argmax(viterbi[-1, :] + final_scores)
for pos in xrange(length-2,-1,-1):
#best_path[pos] = int(np.argmax(backtrack[pos+1]))
best_path[pos] = backtrack[pos+1, best_path[pos+1]]
return best_path , best_score
In [ ]:
y_pred, score = hmm.viterbi_decode(simple.test.seq_list[0])
print("Viterbi decoding Prediction test 0 with smoothing")
print(y_pred, score)
print("Truth test 0")
print(simple.test.seq_list[0])
In [ ]:
y_pred, score = hmm.viterbi_decode(simple.test.seq_list[1])
print("Viterbi decoding Prediction test 1 with smoothing")
print(y_pred, score)
print("Truth test 1")
print(simple.test.seq_list[1])
In [ ]:
%matplotlib inline
corpus = pcc.PostagCorpus()
train_seq = corpus.read_sequence_list_conll("%s/train-02-21.conll" % DATA_PATH, max_sent_len=15, max_nr_sent=1000)
test_seq = corpus.read_sequence_list_conll("%s/test-23.conll" % DATA_PATH, max_sent_len=15, max_nr_sent=1000)
dev_seq = corpus.read_sequence_list_conll("%s/dev-22.conll" % DATA_PATH, max_sent_len=15, max_nr_sent=1000)
hmm = hmmc.HMM(corpus.word_dict, corpus.tag_dict)
hmm.train_supervised(train_seq)
hmm.print_transition_matrix()
In [ ]:
viterbi_pred_train = hmm.viterbi_decode_corpus(train_seq)
posterior_pred_train = hmm.posterior_decode_corpus(train_seq)
eval_viterbi_train = hmm.evaluate_corpus(train_seq, viterbi_pred_train)
eval_posterior_train = hmm.evaluate_corpus(train_seq, posterior_pred_train)
print("Train Set Accuracy: Posterior Decode %.3f, Viterbi Decode: %.3f"%(
eval_posterior_train,eval_viterbi_train))
In [ ]:
viterbi_pred_test = hmm.viterbi_decode_corpus(test_seq)
posterior_pred_test = hmm.posterior_decode_corpus(test_seq)
eval_viterbi_test = hmm.evaluate_corpus(test_seq, viterbi_pred_test)
eval_posterior_test = hmm.evaluate_corpus(test_seq, posterior_pred_test)
print("Test Set Accuracy: Posterior Decode %.3f, Viterbi Decode: %.3f" % (eval_posterior_test, eval_viterbi_test))
In [ ]:
best_smothing = hmm.pick_best_smoothing(train_seq, dev_seq, [10, 1, 0.1, 0])
hmm.train_supervised(train_seq, smoothing=best_smothing)
viterbi_pred_test = hmm.viterbi_decode_corpus(test_seq)
posterior_pred_test = hmm.posterior_decode_corpus(test_seq)
eval_viterbi_test = hmm.evaluate_corpus(test_seq, viterbi_pred_test)
eval_posterior_test = hmm.evaluate_corpus(test_seq, posterior_pred_test)
print("Best Smoothing %f -- Test Set Accuracy: Posterior Decode %.3f, Viterbi Decode: %.3f" % (best_smothing, eval_posterior_test, eval_viterbi_test))
In [ ]:
confusion_matrix = cm.build_confusion_matrix(test_seq.seq_list, viterbi_pred_test,
len(corpus.tag_dict), hmm.get_num_states())
cm.plot_confusion_bar_graph(confusion_matrix, corpus.tag_dict,
range(hmm.get_num_states()), 'Confusion matrix')
def update_counts(self, sequence, state_posteriors, transition_posteriors):
""" Used in the E-step in EM."""
num_states = self.get_num_states() # Number of states.
length = len(sequence.x) # Length of the sequence.
# Take care of initial probs
for y in range(num_states):
self.initial_counts[y] += state_posteriors[0, y]
for pos in range(length):
x = sequence.x[pos]
for y in range(num_states):
self.emission_counts[x, y] += state_posteriors[pos, y]
if pos > 0:
for y_prev in range(num_states):
self.transition_counts[y, y_prev] += transition_posteriors[pos-1, y, y_prev]
# Final position
for y in range(num_states):
self.final_counts[y] += state_posteriors[length-1, y]
In [ ]:
# Train with EM.
hmm.train_EM(train_seq, 0.1, 20, evaluate=True)
viterbi_pred_test = hmm.viterbi_decode_corpus(test_seq)
posterior_pred_test = hmm.posterior_decode_corpus(test_seq)
eval_viterbi_test = hmm.evaluate_corpus(test_seq, viterbi_pred_test)
eval_posterior_test = hmm.evaluate_corpus(test_seq, posterior_pred_test)
In [ ]:
confusion_matrix = cm.build_confusion_matrix(test_seq.seq_list, viterbi_pred_test,
len(corpus.tag_dict), hmm.get_num_states())
cm.plot_confusion_bar_graph(confusion_matrix, corpus.tag_dict,
range(hmm.get_num_states()), 'Confusion matrix')