Viterbi algorithm example

Wikipedia says:

The Viterbi algorithm is a dynamic programming algorithm for finding the most likely sequence of hidden states – called the Viterbi path – that results in a sequence of observed events, especially in the context of Markov information sources and hidden Markov models

[...]

It is now also commonly used in speech recognition, speech synthesis, diarization, keyword spotting, computational linguistics, and bioinformatics. For example, in speech-to-text (speech recognition), the acoustic signal is treated as the observed sequence of events, and a string of text is considered to be the "hidden cause" of the acoustic signal. The Viterbi algorithm finds the most likely string of text given the acoustic signal.

Let's start with a pure Python implementation of the example given on Wikipedia:


In [1]:
states = ('Healthy', 'Fever')
 
observations = ('normal', 'cold', 'dizzy')
 
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
 
transition_probability = {
   'Healthy' : {'Healthy': 0.7, 'Fever': 0.3},
   'Fever' : {'Healthy': 0.4, 'Fever': 0.6}
   }
 
emission_probability = {
   'Healthy' : {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
   'Fever' : {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6}
   }

def viterbi(obs, states, start_p, trans_p, emit_p):
    V = [{}]
    path = {}
 
    # Initialize base cases (t == 0)
    for y in states:
        V[0][y] = start_p[y] * emit_p[y][obs[0]]
        path[y] = [y]
 
    # Run Viterbi for t > 0
    for t in range(1, len(obs)):
        V.append({})
        newpath = {}
 
        for y in states:
            (prob, state) = max((V[t-1][y0] * trans_p[y0][y] * emit_p[y][obs[t]], y0) for y0 in states)
            V[t][y] = prob
            newpath[y] = path[state] + [y]
 
        # Don't need to remember the old paths
        path = newpath
    n = 0           # if only one element is observed max is sought in the initialization values
    if len(obs) != 1:
        n = t
    # print_dptable(V)
    (prob, state) = max((V[n][y], y) for y in states)
    return (prob, path[state])
 
# Don't study this, it just prints a table of the steps.
def print_dptable(V):
    s = "    " + " ".join(("%7d" % i) for i in range(len(V))) + "\n"
    for y in V[0]:
        s += "%.5s: " % y
        s += " ".join("%.7s" % ("%f" % v[y]) for v in V)
        s += "\n"
    print(s)

If we run the Viterbi algorithm on the observation, it tells us the most probable state sequence for these observations.


In [2]:
viterbi(observations, states, start_probability, transition_probability, emission_probability)


Out[2]:
(0.01512, ['Healthy', 'Healthy', 'Fever'])

Let's time this implementation with very short observations (length=3) and long observations (length=30000)


In [3]:
# create a long version of the observations by simply repeating them 10000 times 
long_observations = list(observations) * 10000

In [4]:
%timeit viterbi(observations, states, start_probability, transition_probability, emission_probability)


100000 loops, best of 3: 9.76 µs per loop

In [5]:
%timeit viterbi(long_observations, states, start_probability, transition_probability, emission_probability)


1 loops, best of 3: 3.99 s per loop

Since the computation takes quite a while for a long sequence, we should speed up the whole thing.

A Cython implementation of the same algorithm

The schimmel package provides a Cython HMM implementation.

In this implementation, the transitions and observations are decoupled as individual classes, the TransitionModel and the ObservationModel.

The goal of both is to reduce the amount of information which needs to be stored or computed and to design the Viterbi algorithm to be executed in parallel.


In [6]:
import numpy as np
import schimmel

Create a TransitionModel

We subclass schimmel.TransitionModel and overwrite the __init__() method to accept literal states and a dictionary with the transition probabilities and transforms them to a integer representation.


In [7]:
class HealthTransitionModel(schimmel.TransitionModel):
    def __init__(self, literal_states, transition_probability):
        # save the literal names of the states
        self.literal_states = literal_states
        # convert the dictionary with transitions to dense arrays
        states = []
        prev_states = []
        probabilities = []
        for from_state in transition_probability:
            for to_state in transition_probability[from_state]:
                states.append(self.literal_states.index(to_state))
                prev_states.append(self.literal_states.index(from_state))
                probabilities.append(transition_probability[from_state][to_state])
        # make the lists sparse
        sparse_arrays = self.make_sparse(states, prev_states, probabilities)
        # instantiate a HealthTransitionModel
        super(HealthTransitionModel, self).__init__(*sparse_arrays)

In [8]:
htm = HealthTransitionModel(states, transition_probability)

print htm.pointers
print htm.states
print htm.probabilities
print htm.log_probabilities


[0 2 4]
[0 1 0 1]
[ 0.7  0.4  0.3  0.6]
[-0.35667494 -0.91629073 -1.2039728  -0.51082562]

This need's to be done only once and the transition model can be stored/pickled so there's no real need to optimise things.

Create an Observation Model

We sublass schimmel.ObservationModel again and also define the compute_log_densities() method, which is called later on.


In [9]:
class HealthObservationModel(schimmel.ObservationModel):
    def __init__(self, transition_model, literal_observations, emission_probability):
        """
        Instantiate an ObservationModel.
        
        """
        # create the pointers array, saving the observation probability
        # for 'Healthy' in the first column and 'Fever' in the second
        self.pointers = np.arange(transition_model.num_states, dtype=np.uint32)
        # save the literal names of the observations
        self.literal_observations = literal_observations
        # save the emmision probabilities in a suitable format
        emission_probabilities = []
        for state in emission_probability.keys():
            probs = np.empty(len(emission_probability[state]))
            for obs_state in emission_probability[state]:
                obs_idx = self.literal_observations.index(obs_state)
                tm_idx = transition_model.literal_states.index(state)
                probs[obs_idx] = emission_probability[state][obs_state]
            emission_probabilities.append(probs)
        self.emission_probabilities = np.vstack(emission_probabilities).T
        
    def compute_log_densities_list(self, observations):
        """
        Compute the log densities of the observations for each state.

        :param observations: observations list
        :return:             log densities [2D numpy array, np.float]
        
        """
        densities = np.empty((len(observations), len(self.pointers)))
        for i, obs in enumerate(observations):
            densities[i] = self.emission_probabilities[self.literal_observations.index(obs)]
        return np.log(densities)
    
    def compute_log_densities(self, observations):
        """
        Compute the log densities of the observations for each state.

        :param observations: observations [numpy array]
        :return:             log densities [2D numpy array, np.float]
        """
        densities = np.empty((len(observations), len(self.pointers)))
        densities[:] = self.emission_probabilities[observations]
        return np.log(densities)

In [10]:
hom = HealthObservationModel(htm, observations, emission_probability)
print hom.emission_probabilities
print hom.compute_log_densities_list(observations)


[[ 0.5  0.1]
 [ 0.4  0.3]
 [ 0.1  0.6]]
[[-0.69314718 -2.30258509]
 [-0.91629073 -1.2039728 ]
 [-2.30258509 -0.51082562]]

Let's time the creation of the observation model:


In [11]:
%timeit hom.compute_log_densities_list(long_observations)


10 loops, best of 3: 23.2 ms per loop

Relatively fast, but if we use integer indices instead of literal observation values, cython does not need to call Python any more and thus is much faster:


In [12]:
observation_indices = np.empty(len(observations), dtype=np.int)
for i, obs in enumerate(observations):
    observation_indices[i] = observations.index(obs)
long_observation_indices = np.tile(observation_indices, 10000)

print np.allclose(hom.compute_log_densities_list(observations), hom.compute_log_densities(observation_indices))

%timeit hom.compute_log_densities(long_observation_indices)


True
1000 loops, best of 3: 834 µs per loop

Creation of the observation model takes a reasonable time, continue with the HMM and actually run the Viterbi algorithm.

Create a HMM

No need to subclass anything, the HMM class depends only on the TransitionModel and the ObservationModel.


In [13]:
hmm = schimmel.HMM(htm, hom, initial_distribution=np.asarray([0.6, 0.4]), num_threads=1)

Viterbi decoding


In [14]:
hmm.viterbi(observation_indices)


Out[14]:
(array([0, 0, 1], dtype=uint32), -3.437586228403492)

In [15]:
%timeit hmm.viterbi(long_observation_indices)


100 loops, best of 3: 3.73 ms per loop

This is much faster (~4 ms instead ~4 s; speedup: ~1000x)

Parallel Viterbi decoding

Let's create a HMM model which uses 2 threads in parallel.


In [16]:
parallel_hmm = schimmel.HMM(htm, hom, np.asarray([0.6, 0.4]), num_threads=2)

In [17]:
%timeit parallel_hmm.viterbi(long_observation_indices)


1 loops, best of 3: 1.11 s per loop

Ups, using more threads is really killing the performance (1.1 s vs. 4 ms). We see that parallel decoding is not useful at all if we have only 2 states and compute the best previous state for them in parallel. Additionally, since each of these 2 states have only 2 transitions leading to them each, there is not much calculation needed to get the best predecessor.

Let's design a more demanding scenario. Assume we have 1000 states with 1000 transitions each:

Create an appropriate transition model:

For simplicity, we create an transition model with random transition probabilities between states:


In [18]:
import itertools as it

num_states = 1000

# define state indices
from_states = []
to_states = []
# define transitions
for from_state, to_state in it.product(range(num_states), range(num_states)):
    from_states.append(from_state)
    to_states.append(to_state)
# define a probability distribution
density = np.random.uniform(low=0.0, high=1.0, size=num_states)
density /= np.sum(density)
# define transition probabilities
transition_probabilities = np.tile(density, num_states)

tm = schimmel.TransitionModel.from_dense(to_states, from_states, transition_probabilities)
tm.num_transitions


Out[18]:
1000000

Create an appropriate observation model:

For simplicity, we create an observation model which returns random numbers as observations:


In [19]:
class RandomObservationModel(schimmel.ObservationModel):
    def compute_log_densities(self, observations):
        """
        Return random observation desities.
 
        :param observations: observations
        :return:             log densities as a 2D numpy array with the number
                             of rows being equal to the number of observations
                             and the columns representing the different
                             observation log probability densities. The type
                             must be np.float.
        """
        return np.random.rand(len(observations), len(self.pointers))

om = RandomObservationModel(np.arange(tm.num_states, dtype=np.uint32))

In [20]:
%timeit om.compute_log_densities(np.zeros(2000))


10 loops, best of 3: 17.4 ms per loop

Create two HMMs, one for sequential decoding, one for parallel decoding:


In [21]:
initial = np.random.rand(num_states)

In [22]:
random_hmm = schimmel.HMM(tm, om, initial, num_threads=1)
random_parallel_hmm = schimmel.HMM(tm, om, initial, num_threads=4)

In [23]:
%timeit random_hmm.viterbi(np.zeros(2000))
%timeit random_parallel_hmm.viterbi(np.zeros(2000))


1 loops, best of 3: 2.11 s per loop
1 loops, best of 3: 1.2 s per loop

If the model has enough states and transitions, parallel processing helps.