Lab 2: Sequence Models

In this lab, you will implement a Hidden Markov Model (HMM). HMMs specify a joint probability over observations and hidden states.

This is what we will do today:

  1. Estimate a simple HMM model from training data (supervised learning)
  2. Find the best sequence of hidden states for a given sequence of observations (we define "best" in 2 different ways!)

Rules

  • The lab exercises should be made in groups of two people.

  • The deadline is Tuesday 14 nov 16:59.

  • The assignment should submitted to Blackboard as .ipynb. Only one submission per group.

  • The filename should be lab2_lastname1_lastname2.ipynb, so for example lab2_Manning_Schuetze.ipynb.

  • The notebook is graded on a scale of 0-50. The number of points for each question is indicated in parantheses.

  • The questions marked BONUS give you bonus points; try them if you want an extra challenge

Notes on implementation:

  • You should write your code and answers in this iPython Notebook (see http://ipython.org/notebook.html for reference material). If you have problems, please contact your teaching assistant.

  • Use only one cell for code and one cell for markdown answers!

    • Put all code in the cell with the # YOUR CODE HERE comment.

    • For theoretical question, put your solution in the YOUR ANSWER HERE cell.

  • Test your code and make sure we can run your notebook


Notation

$ \Sigma := \{ o_1, \dots, o_J \} $ is our set of observations

$\Lambda := \{c_1, \dots, c_K \}$ is our set of state labels

$\Sigma^*$ are all possible sequences of observations (including empty string $\epsilon$)

$\Lambda^*$ all possible sequences of hidden states (including empty string $\epsilon$)

Extra info: we can say that $\Sigma^*$ is the Kleene-closure of $\Sigma$, and $\Lambda^*$ the Kleene-closure of $\Lambda$.

A simple example: The Baby HMM

We start with a simple example, so that we can easily verify that our code is correct.

Consider that we are modeling how a baby behaves. We observe the baby doing the following things: laughing (laugh), crying (cry), and sleeping (sleep). This is our set $\Sigma$ of observations.

We presume that, at any moment, the baby can be either hungry, bored, or happy. Since babies cannot talk, each of these states is hidden. This is our set $\Lambda$ of hidden states.

Now the question is: if we have a series of observations, can we predict what hidden states the baby went through?

To tackle this problem, we assume that the baby behaves like a 1st order discrete Markov chain: the baby's current state only depends on its previous hidden state. The baby can be described as an HMM. (Yay!)

For example, assume we observed the baby doing the following:

sleep cry laugh cry
cry cry laugh sleep

We will use these sequences as our test set. We can try to find out the states of the baby for those observations.

Now, to train our model, we will need some examples of observations and states; this is our training set:

laugh/happy cry/bored cry/hungry sleep/happy
cry/bored laugh/happy cry/happy sleep/happy
cry/hungry laugh/happy cry/bored sleep/happy

So we have pairs observation/state.


In [ ]:
%matplotlib inline
import numpy as np
from collections import defaultdict
from collections import namedtuple

In [ ]:
# read in test data
test_data = """sleep cry laugh cry
cry cry laugh sleep"""

def test_reader(test_lines):
    for line in test_lines.splitlines():
        yield line.split()

test_set = list(test_reader(test_data))

# read in train data
train_data = """laugh/happy cry/bored cry/hungry sleep/happy
cry/bored laugh/happy cry/happy sleep/bored
cry/hungry cry/bored sleep/happy"""

# for convenience, we define a Observation-State pair class
Pair = namedtuple("Pair", ["obs", "state"])
Pair.__repr__ = lambda x: x.obs + "/" + x.state

def train_reader(train_lines):
    for line in train_data.splitlines():
        # a pair is a string "observation/state" so we need to split on the "/"
        yield [Pair(*pair.split("/")) for pair in line.split()]

training_set = list(train_reader(train_data))

# print the results
print("test set (observations):")
for seq in test_set:
    print(seq)
print("\ntraining set (observation/state pairs):")
for seq in training_set:
    print(seq)

Vocabularies

It's going to be very useful if we can map states and observations to integers, so that we can identify them by a number. If we don't do this, then our implementation will be much slower.

Make sure you understand what is going on here: every time we look up a observation or state, the defaultdict will create a new key (index) if it has not seen that key (state or observation) before.


In [ ]:
# create mappings from state/obs to an ID
state2i = defaultdict(lambda: len(state2i))
obs2i = defaultdict(lambda: len(obs2i))

for seq in training_set:
    for example in seq:
        state_id = state2i[example.state]
        obs_id = obs2i[example.obs]
        
print("\nOur vocabularies:")
print(state2i)
print(obs2i)

The HMM for the first training set sequence looks like this:

Now, we will estimate the following probability distributions:

  • initial: $P( c_k \,|\, \texttt{start})$
  • transition: $P( c_k \,|\, c_l )$
  • final: $P(\texttt{stop} \,|\, c_k )$
  • emission: $P( o_l \,|\, c_k)$

These distributions are all we need. Remember that:

  • the probability of transitioning to a state $c_k$ only depends on one previous state $c_l$ (1st order Markov assumption).
  • emitting an observation $o_l$ only depends on the state $c_k$.

Finding the Maximum Likelihood Parameters

Now we would like to know what those distributions look like. This is called estimation. Given our training data, we count how many times each event occurs and normalize the counts to form proper probability distributions.

Let's first do counts for the start probabilities:


In [ ]:
# we can get the number of states and observations from our dictionaries
num_states = len(state2i)
num_observations = len(obs2i)

# this creates a vector of length `num_states` filled with zeros
counts_start = np.zeros(num_states)

# now we count 1 every time a sequence starts with a certain state
# we look up the index for the state that we want to count using the `state2i` dictionary
for seq in training_set:
    counts_start[state2i[seq[0].state]] += 1.

print(counts_start)

We see each state once at the beginning of a sequence in the training set, so that is why we have a count of 1 for each of them.

We now normalize those counts, so that we obtain a probability distribution:


In [ ]:
# since p_start is a numpy object, we can call sum on it; easy!
total = counts_start.sum()

# normalize: divide each count by the total
p_start = counts_start / total  
print('start', '-->', p_start)

We now turn to the transition probabilities and stop probabilities. We count, and then we normalize:


In [ ]:
# we can transition from any state to any other state in principle,
# so we create a matrix filled with zeros so that we can count any pair of states
# in practice, some transitions might not occur in the training data
counts_trans = np.zeros([num_states, num_states])

# for the final/stop probabilities, we only need to store `num_states` values.
# so we use a vector
counts_stop = np.zeros(num_states)

# now we count transitions, one sequence at a time
for seq in training_set:
    for i in range(1, len(seq)):
        
        # convert the states to indexes
        prev_state = state2i[seq[i-1].state]
        current_state = state2i[seq[i].state]
        
        # count
        counts_trans[current_state, prev_state] += 1.

# count final states
for seq in training_set:
    state = state2i[seq[-1].state]
    counts_stop[state] += 1.

# print the counts
print("Transition counts:")
print(counts_trans)

print("Final counts:")
print(counts_stop)

Now we can normalize again. We will need to collect the total counts per state. Take some time to understand that the totals consist of the transition counts AND the final counts.


In [ ]:
# Useful trick: np.sum(m, 0) sums matrix m along the first dimension:
print(counts_trans.sum(0))

In [ ]:
total_per_state = counts_trans.sum(0) + counts_stop
print("Total counts per state:\n", total_per_state, "\n")

# now we normalize
# here '/' works one column at a time in the matrix
p_trans = counts_trans / total_per_state
print("Transition probabilities:\n", p_trans)

# here '/' divides the values in each corresponding index in the 2 vectors
p_stop = counts_stop / total_per_state
print("Final probabilities:\n", p_stop, "\n")

So far so good! Now let's take care of emission probabilities.


In [ ]:
# now we create a matrix to keep track of emission counts
# in principle any states can emit any observation
# so we need a matrix again
counts_emiss = np.zeros([num_observations, num_states])

# count
for seq in training_set:
    for obs, state in seq:
        obs = obs2i[obs]
        state = state2i[state]
        counts_emiss[obs][state] += 1.

# normalize
p_emiss = counts_emiss / counts_emiss.sum(0)

print("emission counts:\n", counts_emiss)
print("p_emiss:\n", p_emiss)

This is a good moment for a sanity check. First, take a look at the training set to see if these probabilities are correct, i.e. check if for each state $s_k$: $$\sum_l P(s_l \,|\, s_k) = 1.0$$ Note that this includes transitions to "stop" state, so you have to take those into account.

Exercise 1: Sanity check (2.5 points)

Write a function sanity_check(...) that checks if all distributions are correct. If (and only if) it discovers an incorrect distribution, it should throw an AssertionError.

If you want, you can include some print statements to see what is going on.

[Python hint]: use python assert statements.


In [ ]:
def almost_one(p, eps=1e-3):
    return (1.-eps) < p < (1. + eps)

def sanity_check(p_start=None, p_trans=None, p_stop=None, p_emiss=None):
    # YOUR CODE HERE
    raise NotImplementedError()

In [ ]:
# you can try your function out like this
# (do not use this cell for your solution)
try:
    sanity_check(p_start=p_start, p_trans=p_trans, p_stop=p_stop, p_emiss=p_emiss)
    print("All good!")
except AssertionError as e:
    print("There was a problem: %s" % str(e))

Decoding a sequence

Ok, so we have estimated a model. Great. Now what? Well, now we can decode!

Given an observation sequence $o_1, o_2, \dots, o_N$, we want to find the sequence of hidden states $s^* = s^*_1, s^*_2, \dots, s^*_N$ that best explains those observations.

But what does "best" mean?

  1. If we are interested in the best global assignment of states to the sequence as a whole, we can use the Viterbi algorithm.
  2. BONUS If we care more about minimizing the local error of getting each $s_i$ right, we can use posterior decoding (also called max marginal decoding). (This is a bonus exercise at the end!)

Decoding: The Viterbi Algorithm

Viterbi gives us the best global hidden state sequence, i.e.:

$$ \begin{array}{lll} s^* &=& \arg\max_{s = s_1, s_2, \dots, s_N} P(s_1, s_2, \dots, s_N \,|\, o_1, o_2, \dots, o_N ) \\ &=& \arg\max_{s = s_1, s_2, \dots, s_N} P(s_1, s_2, \dots, s_N, o_1, o_2, \dots, o_N ) \end{array}$$

To explain Viterbi we will make use of a trellis, a kind of graph that shows us the possible states for each time step.

For our earlier example, the trellis looks like this:

Note, that we can now label the edges of this trellis with the following probabilities:

  • $P_{\text start}(c_k \,|\, \text{start})$ on the three edges from "start"
  • $P_{\text stop}(\text{stop} \,|\, c_k)$ on the three edges leading to "stop"
  • $P_{\text trans}(c_k \,|\, c_{l})$ on each remaining edge from state $c_l$ to $c_k$
  • $P_{\text emiss}(o \,|\, c_k)$ from each state $c_k$, to an observation $o$ made from that state (not shown here)

Do you see that our trellis nicely shows the independence assumptions of the HMM?

A Naive approach to get the best sequence

To see why the Viterbi algorithm is so useful, we can consider another way to calculate $s^*$:

  • Iterate over all possible state sequences (all ways to go from start to stop)
    • Calculate the probability for that sequence
    • Store the highest probability seen so far and its sequence
  • Return the sequence that had the maximum probability

The problem with this approach is that there are a lot of possible sequences!

Exercise 2: How many possible state sequences are there? (5 points)

This is a theoretical question. Assume that you have a set $\Lambda$ of possible states (so there are $|\Lambda|$ states), and that the observation sequence is of length $N$. Assume there is a transition from any state to any other state. Write down the formula that gives the number of sequences.

YOUR ANSWER HERE

The Viterbi algorithm

We use a slightly different notation here compared to the lecture.

So how do we find the path with the highest score? The idea is that we can use our trellis to represent an exponential number of paths. Since we are only interested in the highest-scoring path, for every state at every time step, we only need to keep track of the highest probability that can lead us to that state. We can disregard any other paths that lead to that state, since they will for sure not be part of the highest-scoring path.

Viterbi uses dynamic programming. Here, that means that we will re-use probabilities that we have already computed, so we never have to compute the score for the same sub-problem multiple times.

Let's start at the beginning.

For the first time step, the viterbi score is the transition probability of reaching a state $c_k$ from "start", times the probability of emitting the first observation $o_1$ from that state:

$$\text{viterbi}(1, c_k) = P_{\text start}( c_k \,|\, \text{start}) \times P_{\text emiss}(o_1 \,|\, c_k)$$

So, the Viterbi trellis represents the path with maximum probability in position $i$ when we are in state $y_i$ having observed $o_1, o_2, \dots, o_i$, the observations up to and including that point.

Now that we have the viterbi scores for all states of the first time step in our trellis, we can use the following recursive formula to get the scores for all other states, one time step at a time:

$$\text{viterbi}(i, c_k) = \big( \max_{c_l \in \Lambda} P_{\text trans}(c_k | c_l) \times \text{viterbi}(i-1, c_l) \big) \times P_{\text emiss}(o_i \,|\, c_k)$$

Finally, for our final state "stop" we need to do something special, since there is no observation there:

$$\text{viterbi}(N+1, \text{stop}) = \max_{c_l \in \Lambda} P_{\text stop}(\text{stop} \,|\, c_l) \times \text{viterbi}(i-1, c_l) $$

This is all we need to know what probability the highest scoring path has! Do you see how the dynamic programming helps us to solve this task efficiently?

How did we get here?

Once we reach the "stop" state we know the maximum probability, but we forgot how we got there! If you don't see this immediately, remember that, whenever we computed the viterbi score for a state, we took the maximum over all previous states' viterbi scores, times the transition from those states. But we didn't keep track of which state was actually selected in that "max" operation. So now that we are in "stop", we don't know how we got there.

To solve this, we will use backpointers. Whenever we do a $\max$, we store what state was selected by that max (i.e. the $\arg\max$):

$$\text{backtrack}(i, c_k) = \arg\max_{c_l \in \Lambda} P_{\text trans}(c_k | c_l) \times \text{viterbi}(i-1, c_l)$$

Log probabilities

Now you know enough to implement Viterbi! But before we start.. Because probabilities tend to get rather small when multiplying, causing numerical instabilities, we will use log probabilities. This means that, instead of multiplying, we can now sum probabilities, because:

$$ \log(uv) = \log u + \log v$$

To get the probability of a path trough our trellis from "start" to "stop", we can just sum the log-probabilities that we encounter. So, finding the best ("Viterbi") path means finding the path with the highest score.

Exercise 3: convert all probabilities to log-probabilities (2.5 points)


In [ ]:
# Note: only run this cell once; otherwise you get NaN values.
# If you get NaN values, try running all cells above first.

def convert_to_log(p_start=None, p_trans=None, p_stop=None, p_emiss=None):
    """
    Convert all probabilities to log-probabilities
    
    Important: only run this function with normal probabilities as input! 
    If you run this twice, things will break.
    """

    # YOUR CODE HERE
    raise NotImplementedError()
    return p_start, p_trans, p_stop, p_emiss


print("Before:\n", p_start, p_trans, p_stop, p_emiss)

# do the conversion using your function
p_start, p_trans, p_stop, p_emiss = convert_to_log(p_start=p_start, p_trans=p_trans, p_stop=p_stop, p_emiss=p_emiss)

print("After:\n", p_start, p_trans, p_stop, p_emiss)

Smoothing

Oops! We got a big red warning! What happened?

Some probabilities were 0.0, and the log function is not defined for zero, resulting in a warning.

To prevent the error, we can add a small smoothing value to our counts, so that we never have a probability of zero.

To make things easier, we define a normalize_all function below that does all the normalization again, but now adding a small value to all the counts.


In [ ]:
def normalize(x, smoothing=0.1, axis=0):
    smoothed = x + smoothing
    return smoothed / smoothed.sum(axis)

def normalize_all(counts_start, counts_trans, counts_stop, counts_emiss, smoothing=0.1):
    """Normalize all counts to probabilities, optionally with smoothing."""
    p_start = normalize(counts_start, smoothing=smoothing)
    p_emiss = normalize(counts_emiss, smoothing=smoothing)
    
    counts_trans_smoothed = counts_trans + smoothing
    counts_stop_smoothed = counts_stop + smoothing
    total_trans_stop = counts_trans_smoothed.sum(0) + counts_stop_smoothed
    p_trans = counts_trans_smoothed / total_trans_stop
    p_stop = counts_stop_smoothed / total_trans_stop
    
    return p_start, p_trans, p_stop, p_emiss


# normalize with smoothing
smoothing = 0.1
p_start, p_trans, p_stop, p_emiss = normalize_all(
    counts_start, counts_trans, counts_stop, counts_emiss, smoothing=smoothing)

# convert to log-probabilities
print("Smoothed probabilities:\n", p_start, p_trans, p_stop, p_emiss)
p_start, p_trans, p_stop, p_emiss = convert_to_log(p_start=p_start, p_trans=p_trans, p_stop=p_stop, p_emiss=p_emiss)
print("Smoothed log-probabilities:\n", p_start, p_trans, p_stop, p_emiss)

In [ ]:

Exercise 4: implement the Viterbi algorithm (40 points)

You will now implement the Viterbi algorithm. Complete the function viterbi(sequence, p_start, p_trans, p_emiss, p_stop) below.

Input: sequence ($o_1, ..., o_N$), $P_\text{start}$, $P_\text{trans}$, $P_\text{emiss}$, $P_\text{stop}$

Forward pass: compute the best path for every end state

  • set $\text{viterbi}(1, c_k)$ for each $c_k$
  • for $i=2$ to $N$, and for each $c_k$, set $\text{viterbi}(i, c_k$) and $\text{backtrack}(i, c_k)$
  • $\text{max_prob} = \max_{c_l} P_{\text{stop}}(\text{stop} \,|\, c_l) \times viterbi(N, c_l)$

Backward pass: backtrack to get most likely path

  • $\hat{s}_N = \arg\max_{c_l} P_\text{stop}(\text{stop} \,|\, c_l) \times viterbi(N, c_l)$
  • for $i = N-1$ to $1$: $\hat{s}_i = \text{backtrack}(i+1, \hat{s}_{i+1})$

Output: max_prob, Viterbi path $\hat{s}_1, \hat{s}_2, \dots, \hat{s}_N$


In [ ]:
def viterbi(sequence, p_start=None, p_trans=None, p_stop=None, p_emiss=None):
    """
    Compute the Viterbi sequence. 
    Note: you have to use log-probabilities!
    
    The return value should be a tuple (max_prob, list_of_viterbi_states)
    Return:
      - best_score (float) the log-probability of the best path
      - best_path (int list) the best path as a list of state IDs
    """
    
    length = len(sequence)
    num_states = len(p_start)
    
    # trellis to store Viterbi scores
    # we store -inf as our initial scores since log(0)=-inf
    trellis = np.full([length, num_states], -np.inf)

    # backpointers to backtrack (to remember what prev. state caused the maximum score)
    # we initialize with -1 values, to represent a non-existing index
    backpointers = -np.ones([length, num_states], dtype=int)

    # YOUR CODE HERE
    raise NotImplementedError()

    return best_score, best_path

Trying out Viterbi

Once you have implemented the Viterbi algorithm, try it out on the following sequence.

Note: to get all points, make sure that the cell below runs.


In [ ]:
"""Test out your Viterbi-algorithm here."""

test_sequence = test_set[0]
best_score, best_path = viterbi(test_sequence, p_start=p_start, p_trans=p_trans, p_stop=p_stop, p_emiss=p_emiss)

print(best_score)
print(best_path)

i2state = {v : k for k, v in state2i.items()}
print([i2state[i] for i in best_path])

Congratulations!

You have reached the end of lab 2.

If you want an additional challenge, and to attempt 10 additional (bonus) points, go ahead and try the following sections. Otherwise, you're done!

Bonus: Posterior Decoding

What if we don't care about the global sequence of hidden states, but more about getting individual states right? We now aim to find the state with the highest state posterior for each position. So what is a state posterior? It is defined as:

$$ P( s_i \,|\, o_1, o_2, \dots, o_N) $$

It gives the probability, with observation sequence $o_1, o_2, \dots, o_N$, that the i'th hidden state was $s_i$.

The best state $s_i^*$ for that position is then:

$$ s_i^* = \arg\max_{s_i \in \Lambda} P( s_i \,|\, o_1, o_2, \dots, o_N) $$

If we calculate this for each position, we are performing posterior decoding. Do you see the difference with Viterbi? Now we choose the hidden states independenly, based on the observations $o = o_1, o_2, \dots, o_N$, one position at a time.

How to calculate a state posterior?

Good question. Calculating the state posterior is not so easy. Remember that we are interested in one specific time step. To choose the best state for this time step, we need to take into account all paths that lead there. You will soon see why.

First, let's define a few useful terms.

Sequence posterior $$ P( s = s_1, s_2, \dots, s_N \,|\, o = o_1, o_2, \dots, o_N) = \frac{P(s, o)}{P(o)}$$

To compute this, we need the likelihood:

$$ P( o = o_1, o_2, \dots, o_N ) = \sum_{s} P(o, s)$$

To calculate the likelihood, we thus need to sum over all possible state sequences s!

Since the number of state sequences can grow so quickly, we will do something smarter than summing over all of them. (We used a similar trick with the Viterbi-algorithm!)

Forward and Backward probabilities

We can use compute forward and backward probabilities, which help us compute the likelihood in linear time: O(N).

Let's first see how they are computed, before we will use them to calculate the likelihood (and finally the state posterior).

The forward probability for a position i and a state $c_k$, gives us the probability of being in that state and that position, having observed $o_1, o_2, \dots, o_i$:

$$\text{forward}(i, c_k) = P(c_k, o_1, o_2, \dots, o_i)$$

Because of the independence assumptions in the HMM, we can calculate the forward probability of position $i$ using the forward probabilities of each state in position $i-1$:

$$\text{forward}(i, c_k) = \Big( \sum_{c_l \in \Lambda} P_{\text trans}(c_k \,|\, c_l) \times \text{forward}(i-1, c_l) \Big) \times P_{\text emiss}(o_i \,|\, c_k) $$

And our special cases:

$$\text{forward}(1, c_k) =P_{\text start}(c_k \,|\, \text{start}) \times P_{\text emiss} (o_1 \, |\, c_k)$$$$\text{forward}(N+1, \text{stop}) = \sum_{c_l \in \Lambda} P_{\text trans}(\text{stop} \,|\, c_l) \times \text{forward}(N, c_l) $$

Did you notice that this is almost exactly the same as the Viterbi scores? Instead of taking the maximum, we are now summing!

Warning: you cannot "just sum" two probabilities in the log-domain! A brute-force way to do this correctly would be to do $\log(\exp(a) + \exp(b)$, i.e. convert the probabilities back and then sum, and then convert them to the log domain again. But this exposes us to numeric instabilities again! So a better way to do it is using $$\log(\exp(a) + \exp(b)) = a + \log(1 + \exp(b − a)) \qquad \text{for } a < b$$

We are providing a function logsum() for you that sums a list of values in the log-domain (using the above strategy).

Now that we can compute forward probabilities: did you notice that $\text{forward}(N+1, \text{stop})$ gives us the likelihood, i.e. $P(o_1, o_2, \dots, o_N)$? Take a moment to see why.

Sadly, this is still not enough to calculate the state posteror. (You probably guessed, since this is calleed the Forward-Backward algorithm!). Right now we know the probability of being in state $s_i$ having observed $o_1, o_2, \dots, o_i$, but we do not know the probability of being in that state knowing $o_{i+1}, \dots, o_N$. This is what the backward probability gives us:

$$\text{backward}(i, c_l) = \sum_{c_k \in \Lambda} P_\text{trans}(c_k \,|\, c_l) \times \text{backward}(i+1,c_k) \times P_\text{emiss}(o_{i+1} \,|\, c_k)$$

And our special cases:

$$\text{backward}(N, c_l) = P_\text{stop}( \text{stop} \,|\, c_l)$$$$\text{backward}(0, \text{start}) = \sum_{c_k \in \Lambda} P_{\text start}(c_k| \text{start}) \times \text{backward}(1, c_k) \times P_{\text emiss}(o_1 \,|\, c_k)$$

Can you see that we can also calculate the likelihood using just backward probabilities? You can find it in $\text{backward}(0, \text{start})$.

Now, implement the forward and backward algorithms.

The forward function should return two values:

  • all forward scores for the trellis (length x num_states)
  • the forward score of the stop state (not part of the trellis)

The backward function should similarly return two values.


In [ ]:
def logsum(logv):
    """Sum of probabilities in log-domain."""
    res = -np.inf
    for val in logv:
        res = logsum_pair(res, val)
    return res

def logsum_pair(logx, logy):
    """
    Return log(x+y), avoiding arithmetic underflow/overflow.
    """
    if logx == -np.inf:
        return logy
    elif logx > logy:
        return logx + np.log1p(np.exp(logy-logx))
    else:
        return logy + np.log1p(np.exp(logx-logy))


def forward(sequence, p_start=None, p_trans=None, p_stop=None, p_emiss=None):
    """
    Compute Forward probabilities.
    Note: all probabilities should be log-probabilities.
    
    Return:
      - trellis with forward probabilities, excluding the "stop" cell
      - the forward probability of the stop cell (this is the log-likelihood!)
    """
    
    length = len(sequence)
    num_states = len(p_start)
    
    # trellis to store forward probabilities
    trellis = np.full([length, num_states], -np.inf)
    
    # YOUR CODE HERE
    raise NotImplementedError()
    
    return trellis, log_likelihood


def backward(sequence, p_start=None, p_trans=None, p_stop=None, p_emiss=None):
    """
    Compute Backward probabilities.
    Note: all probabilities should be log-probabilities.
    
    Return:
      - trellis with backward probabilities, excluding the "start" cell
      - the forward probability of the start cell (this is ALSO the log-likelihood!)
    """
    
    length = len(sequence)
    num_states = len(p_start)
    
    # trellis to store forward probabilities
    trellis = np.full([length, num_states], -np.inf)
    
    # YOUR CODE HERE
    raise NotImplementedError()
    
    return trellis, log_likelihood


def forward_backward(sequence):
    """
    Compute forward and backward probabilities.
    Return:
    - fw_trellis
    - fw_log_likelihood (the value of the "stop" cell, not part of the trellis)
    - bw_trellis
    - bw_log_likelihood (the value of the "start" cell, not part of the trellis)
    """
    fw_trellis, fw_ll = forward(sequence, p_start=p_start, p_trans=p_trans, p_stop=p_stop, p_emiss=p_emiss)
    bw_trellis, bw_ll = backward(sequence, p_start=p_start, p_trans=p_trans, p_stop=p_stop, p_emiss=p_emiss)
    return fw_trellis, fw_ll, bw_trellis, bw_ll

In [ ]:
test_sequence = test_set[0]

fw_trellis, fw_ll, bw_trellis, bw_ll = forward_backward(test_sequence)

print(test_sequence)
print(fw_trellis)
print(fw_ll)
print(bw_trellis)
print(bw_ll)

Posterior Decoding

Implement a function that, given the forward and backward probabilities, and a sequence of observations $o_1, o_2, \dots, o_N$, returns the best hidden state sequence, according to posterior decoding.


In [ ]:
def posterior_decode(sequence, fw_trellis, bw_trellis, ll, p_trans, p_emiss):
    """
    Return best hidden state sequence according to Posterior decoding
    """

    length = len(sequence)
    num_states = fw_trellis.shape[1]
        
    # calculate the state posteriors
    state_posteriors = np.zeros([length, num_states])
    
    for i in range(length):
        
        # YOUR CODE HERE
        raise NotImplementedError()

    state_posteriors = np.exp(state_posteriors)
    
    # the best states are simply the arg max of the state posteriors
    best_sequence = np.argmax(state_posteriors, axis=1)

    return state_posteriors, best_sequence

In [ ]:
state_posteriors, best_sequence = posterior_decode(test_sequence, fw_trellis, bw_trellis, fw_ll, p_trans, p_emiss)

print(state_posteriors)
print(best_sequence)