The Viterbi algorithm is a dynamic programming algorithm for finding the most likely sequence of hidden states – called the Viterbi path – that results in a sequence of observed events, especially in the context of Markov information sources and hidden Markov models
[...]
It is now also commonly used in speech recognition, speech synthesis, diarization, keyword spotting, computational linguistics, and bioinformatics. For example, in speech-to-text (speech recognition), the acoustic signal is treated as the observed sequence of events, and a string of text is considered to be the "hidden cause" of the acoustic signal. The Viterbi algorithm finds the most likely string of text given the acoustic signal.
Let's start with a pure Python implementation of the example given on Wikipedia:
In [1]:
states = ('Healthy', 'Fever')
observations = ('normal', 'cold', 'dizzy')
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
transition_probability = {
'Healthy' : {'Healthy': 0.7, 'Fever': 0.3},
'Fever' : {'Healthy': 0.4, 'Fever': 0.6}
}
emission_probability = {
'Healthy' : {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
'Fever' : {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6}
}
def viterbi(obs, states, start_p, trans_p, emit_p):
V = [{}]
path = {}
# Initialize base cases (t == 0)
for y in states:
V[0][y] = start_p[y] * emit_p[y][obs[0]]
path[y] = [y]
# Run Viterbi for t > 0
for t in range(1, len(obs)):
V.append({})
newpath = {}
for y in states:
(prob, state) = max((V[t-1][y0] * trans_p[y0][y] * emit_p[y][obs[t]], y0) for y0 in states)
V[t][y] = prob
newpath[y] = path[state] + [y]
# Don't need to remember the old paths
path = newpath
n = 0 # if only one element is observed max is sought in the initialization values
if len(obs) != 1:
n = t
# print_dptable(V)
(prob, state) = max((V[n][y], y) for y in states)
return (prob, path[state])
# Don't study this, it just prints a table of the steps.
def print_dptable(V):
s = " " + " ".join(("%7d" % i) for i in range(len(V))) + "\n"
for y in V[0]:
s += "%.5s: " % y
s += " ".join("%.7s" % ("%f" % v[y]) for v in V)
s += "\n"
print(s)
If we run the Viterbi algorithm on the observation, it tells us the most probable state sequence for these observations.
In [2]:
viterbi(observations, states, start_probability, transition_probability, emission_probability)
Out[2]:
Let's time this implementation with very short observations (length=3) and long observations (length=30000)
In [3]:
# create a long version of the observations by simply repeating them 10000 times
long_observations = list(observations) * 10000
In [4]:
%timeit viterbi(observations, states, start_probability, transition_probability, emission_probability)
In [5]:
%timeit viterbi(long_observations, states, start_probability, transition_probability, emission_probability)
Since the computation takes quite a while for a long sequence, we should speed up the whole thing.
The schimmel
package provides a Cython HMM implementation.
In this implementation, the transitions and observations are decoupled as individual classes, the TransitionModel
and the ObservationModel
.
The goal of both is to reduce the amount of information which needs to be stored or computed and to design the Viterbi algorithm to be executed in parallel.
In [6]:
import numpy as np
import schimmel
In [7]:
class HealthTransitionModel(schimmel.TransitionModel):
def __init__(self, literal_states, transition_probability):
# save the literal names of the states
self.literal_states = literal_states
# convert the dictionary with transitions to dense arrays
states = []
prev_states = []
probabilities = []
for from_state in transition_probability:
for to_state in transition_probability[from_state]:
states.append(self.literal_states.index(to_state))
prev_states.append(self.literal_states.index(from_state))
probabilities.append(transition_probability[from_state][to_state])
# make the lists sparse
sparse_arrays = self.make_sparse(states, prev_states, probabilities)
# instantiate a HealthTransitionModel
super(HealthTransitionModel, self).__init__(*sparse_arrays)
In [8]:
htm = HealthTransitionModel(states, transition_probability)
print htm.pointers
print htm.states
print htm.probabilities
print htm.log_probabilities
This need's to be done only once and the transition model can be stored/pickled so there's no real need to optimise things.
In [9]:
class HealthObservationModel(schimmel.ObservationModel):
def __init__(self, transition_model, literal_observations, emission_probability):
"""
Instantiate an ObservationModel.
"""
# create the pointers array, saving the observation probability
# for 'Healthy' in the first column and 'Fever' in the second
self.pointers = np.arange(transition_model.num_states, dtype=np.uint32)
# save the literal names of the observations
self.literal_observations = literal_observations
# save the emmision probabilities in a suitable format
emission_probabilities = []
for state in emission_probability.keys():
probs = np.empty(len(emission_probability[state]))
for obs_state in emission_probability[state]:
obs_idx = self.literal_observations.index(obs_state)
tm_idx = transition_model.literal_states.index(state)
probs[obs_idx] = emission_probability[state][obs_state]
emission_probabilities.append(probs)
self.emission_probabilities = np.vstack(emission_probabilities).T
def compute_log_densities_list(self, observations):
"""
Compute the log densities of the observations for each state.
:param observations: observations list
:return: log densities [2D numpy array, np.float]
"""
densities = np.empty((len(observations), len(self.pointers)))
for i, obs in enumerate(observations):
densities[i] = self.emission_probabilities[self.literal_observations.index(obs)]
return np.log(densities)
def compute_log_densities(self, observations):
"""
Compute the log densities of the observations for each state.
:param observations: observations [numpy array]
:return: log densities [2D numpy array, np.float]
"""
densities = np.empty((len(observations), len(self.pointers)))
densities[:] = self.emission_probabilities[observations]
return np.log(densities)
In [10]:
hom = HealthObservationModel(htm, observations, emission_probability)
print hom.emission_probabilities
print hom.compute_log_densities_list(observations)
Let's time the creation of the observation model:
In [11]:
%timeit hom.compute_log_densities_list(long_observations)
Relatively fast, but if we use integer indices instead of literal observation values, cython does not need to call Python any more and thus is much faster:
In [12]:
observation_indices = np.empty(len(observations), dtype=np.int)
for i, obs in enumerate(observations):
observation_indices[i] = observations.index(obs)
long_observation_indices = np.tile(observation_indices, 10000)
print np.allclose(hom.compute_log_densities_list(observations), hom.compute_log_densities(observation_indices))
%timeit hom.compute_log_densities(long_observation_indices)
Creation of the observation model takes a reasonable time, continue with the HMM and actually run the Viterbi algorithm.
In [13]:
hmm = schimmel.HMM(htm, hom, initial_distribution=np.asarray([0.6, 0.4]), num_threads=1)
In [14]:
hmm.viterbi(observation_indices)
Out[14]:
In [15]:
%timeit hmm.viterbi(long_observation_indices)
This is much faster (~4 ms instead ~4 s; speedup: ~1000x)
In [16]:
parallel_hmm = schimmel.HMM(htm, hom, np.asarray([0.6, 0.4]), num_threads=2)
In [17]:
%timeit parallel_hmm.viterbi(long_observation_indices)
Ups, using more threads is really killing the performance (1.1 s vs. 4 ms). We see that parallel decoding is not useful at all if we have only 2 states and compute the best previous state for them in parallel. Additionally, since each of these 2 states have only 2 transitions leading to them each, there is not much calculation needed to get the best predecessor.
Let's design a more demanding scenario. Assume we have 1000 states with 1000 transitions each:
In [18]:
import itertools as it
num_states = 1000
# define state indices
from_states = []
to_states = []
# define transitions
for from_state, to_state in it.product(range(num_states), range(num_states)):
from_states.append(from_state)
to_states.append(to_state)
# define a probability distribution
density = np.random.uniform(low=0.0, high=1.0, size=num_states)
density /= np.sum(density)
# define transition probabilities
transition_probabilities = np.tile(density, num_states)
tm = schimmel.TransitionModel.from_dense(to_states, from_states, transition_probabilities)
tm.num_transitions
Out[18]:
In [19]:
class RandomObservationModel(schimmel.ObservationModel):
def compute_log_densities(self, observations):
"""
Return random observation desities.
:param observations: observations
:return: log densities as a 2D numpy array with the number
of rows being equal to the number of observations
and the columns representing the different
observation log probability densities. The type
must be np.float.
"""
return np.random.rand(len(observations), len(self.pointers))
om = RandomObservationModel(np.arange(tm.num_states, dtype=np.uint32))
In [20]:
%timeit om.compute_log_densities(np.zeros(2000))
In [21]:
initial = np.random.rand(num_states)
In [22]:
random_hmm = schimmel.HMM(tm, om, initial, num_threads=1)
random_parallel_hmm = schimmel.HMM(tm, om, initial, num_threads=4)
In [23]:
%timeit random_hmm.viterbi(np.zeros(2000))
%timeit random_parallel_hmm.viterbi(np.zeros(2000))
If the model has enough states and transitions, parallel processing helps.