In [1]:
from __future__ import print_function, division
import numpy as np
import pandas as pd

The a simple version of viterbi algorithm is implemented responsible for part-of-speech tagging.


In [2]:
transi = pd.DataFrame({'VB': [.019, .0038, .83, .0040, .23],
                       'TO': [.0043, .035, 0, .016, .00079],
                       'NN': [.041, .047, .00047, .087, .0012],
                       'PPSS': [.067, .0070, 0, .0045, .00014]},
                      index=['s', 'VB', 'TO', 'NN', 'PPSS'],
                      columns=['VB', 'TO', 'NN', 'PPSS'],
                      dtype='float128')
transi


Out[2]:
VB TO NN PPSS
s 0.0190 0.00430 0.04100 0.06700
VB 0.0038 0.03500 0.04700 0.00700
TO 0.8300 0.00000 0.00047 0.00000
NN 0.0040 0.01600 0.08700 0.00450
PPSS 0.2300 0.00079 0.00120 0.00014

In [3]:
obser = pd.DataFrame({'I': [0, 0, 0, .37],
                      'want': [.0093, 0, .000054, 0],
                      'to': [0, .99, 0, 0],
                      'race': [.00012, 0, .00057, 0]},
                     index=['VB', 'TO', 'NN', 'PPSS'],
                     columns=['I', 'want', 'to', 'race'],
                     dtype='float128')
obser


Out[3]:
I want to race
VB 0.00 0.009300 0.00 0.00012
TO 0.00 0.000000 0.99 0.00000
NN 0.00 0.000054 0.00 0.00057
PPSS 0.37 0.000000 0.00 0.00000

In [5]:
def VITERBI(N, T):
    viterbi = np.zeros((N+2, T))
    bp = np.zeros((N, T))
    for state in range(N):
        viterbi[state, 0] = transi.iloc[0, state] * obser.iloc[state, 0]
        bp[state, 0] = 0
    for t in range(1, T):
        for s in range(N):
            viterbi[s, t] = max(viterbi[s, t-1] * \
                                transi.iloc[s+1, sbar] * \
                                obser.iloc[sbar, tbar] for sbar in range(N)
                                                     for tbar in range(1,T))
    # print(viterbi)
    #print(bp)
    viterbi[-1, -1] = max(viterbi[s, -1] * transi.iloc[s, -1] for s in range(N))
    return viterbi

print(VITERBI(4,4))


[[  0.00000000e+00   0.00000000e+00   0.00000000e+00   0.00000000e+00]
 [  0.00000000e+00   0.00000000e+00   0.00000000e+00   0.00000000e+00]
 [  0.00000000e+00   0.00000000e+00   0.00000000e+00   0.00000000e+00]
 [  2.47900000e-02   5.30258100e-05   1.13422208e-07   2.42610102e-10]
 [  0.00000000e+00   0.00000000e+00   0.00000000e+00   0.00000000e+00]
 [  0.00000000e+00   0.00000000e+00   0.00000000e+00   1.09174546e-12]]

Here's a more elaborated example

I'm creating a viterbi function that takes 3 parameters: the transition probability matrix A (transition from one syntactic category to the next), the emission probability B (probability of an observed word given it's hidden syntactic category) and the observation sequence observ. It will print out all the trellises and return the most likely tags for each observed word.


In [8]:
def viterbi(A, B, observ):
    """Hidden Markov models need to solve 3 fundamental problems:
1. For a given HMM model M and an observation sequence O, what is the likelihood of P(O|M);
2. For a given HMM model M and an observation sequence O, what is the best hidden state sequence Q(think of O as words, and Q as syntactic categories);
3. For an observation sequence O, and the set of hidden states, learn the HMM parameters A and B;

The viterbi aims at solving the second problem: for a given model and observation sequence, find out the most likely hidden state sequence.

The viterbi algorithm is very similar to the forward algorithm, except that within the inner for loop, it looks for the maximum value of the trellis instead of the sum of all trellises. It also keeps track of the path.

There are different ways to keep track of the largest path. The one I use here looks for the argmax of each column, and keep the x and y args in a tuple, and append this tuple to a list
----------------------------
params:
A: the transition probability matrix; probability to change from one syntactic category to the next
A type: pandas dataframe

B: the emission probability matrix; probability to emit a symbol given a syntactic category
B type: pandas dataframe

observ: the observed sequence of words
observ type: tuple of strings"""
    # initialization
    T = len(observ)
    N = B.shape[0]
    backpointer = [(0, 0)]
    vtb = pd.DataFrame(np.zeros((N+2, T+1)),
                       columns=('start',)+observ,
                       index=('start',)+tuple(B.index)+('end',))
    # viterbi algorithm
    # initialize the second column
    for s in vtb.index[1:-1]:
        vtb.loc[s, observ[0]] = A.loc['start', s] * B.loc[s, observ[0]]
    # compute the trellises starting from the 3rd column; recursion
    for t in xrange(2, T+1):
        for s in xrange(1, N+1):
            vtb.iloc[s, t] = max(vtb.iloc[s_p, t-1] * A.iloc[s_p, s] * B.iloc[s-1, t-1]
                            for s_p in xrange(1, N+1))
    # find the argmax of each column and create a path
    pointers = [(vtb.loc[:, i].argmax(), i) for i in observ]
    backpointer.extend(pointers)
    vtb.iloc[-1, T] = max(vtb.iloc[s, T]* A.iloc[s, -1] for s in xrange(1, N+1))
    print(vtb)
    return backpointer

In [9]:
# plug in the transition and emission probability matrices
# normally these are computed by counting appearances and smoothing
A_matrix = np.matrix(((0, 0.2767, 0.0006, 0.0031, 0.0453, 0.0449, 0.0510, 0.2026),
                      (0, 0.3777, 0.0110, 0.0009, 0.0084, 0.0584, 0.0090, 0.0025),
                      (0, 0.0008, 0.0002, 0.7968, 0.0005, 0.0008, 0.1698, 0.0041),
                      (0, 0.0322, 0.0005, 0.0050, 0.0837, 0.0615, 0.0514, 0.2231),
                      (0, 0.0366, 0.0004, 0.0001, 0.0733, 0.4509, 0.0036, 0.0036),
                      (0, 0.0096, 0.0176, 0.0014, 0.0086, 0.1216, 0.0177, 0.0068),
                      (0, 0.0068, 0.0102, 0.1011, 0.1012, 0.0120, 0.0728, 0.0479),
                      (0, 0.1147, 0.0021, 0.0002, 0.2157, 0.4744, 0.0102, 0.0017))
                    )
A = pd.DataFrame(A_matrix,
                 columns=('start','NNP', 'MD', 'VB', 'JJ',
                          'NN', 'RB', 'DT'),
                 index=('start', 'NNP', 'MD', 'VB', 'JJ',
                        'NN', 'RB', 'DT'))
B_matrix = {'NNP': {'Janet': 0.000032, 'will': 0, 'back': 0,
            'the': 0.000048, 'bill': 0},
            'MD': {'Janet': 0, 'will': 0.308431, 'back': 0,
            'the': 0, 'bill': 0},
            'VB': {'Janet': 0, 'will': 0.000028, 'back': 0.000672,
            'the':0, 'bill':0.000028},
            'JJ': {'Janet': 0, 'will': 0, 'back': 0.000340,
            'the': 0.000097, 'bill': 0},
            'NN': {'Janet': 0, 'will': 0.0002, 'back': 0.000223,
            'the': 0.000006, 'bill': 0.002337},
            'RB': {'Janet': 0, 'will': 0, 'back': 0.010446,
            'the': 0, 'bill': 0},
            'DT': {'Janet': 0, 'will': 0, 'back': 0,
            'the': 0.506099, 'bill': 0}
           }
B = pd.DataFrame(B_matrix).T.reindex(A.index[1:],
                                     columns=['Janet', 'will', 'back',
                                              'the', 'bill'])

In [12]:
OBS = ('Janet', 'will', 'back', 'the', 'bill')
RESULT = viterbi(A, B, OBS)
print(RESULT)


       start     Janet          will          back           the          bill
start    0.0  0.000000  0.000000e+00  0.000000e+00  0.000000e+00  0.000000e+00
NNP      0.0  0.000009  0.000000e+00  0.000000e+00  2.486140e-17  0.000000e+00
MD       0.0  0.000000  3.004069e-08  0.000000e+00  0.000000e+00  0.000000e+00
VB       0.0  0.000000  2.231309e-13  1.608527e-11  0.000000e+00  1.017072e-20
JJ       0.0  0.000000  0.000000e+00  5.106917e-15  5.230579e-16  0.000000e+00
NN       0.0  0.000000  1.034194e-10  5.359258e-15  5.935466e-18  2.013571e-15
RB       0.0  0.000000  0.000000e+00  5.328409e-11  0.000000e+00  0.000000e+00
DT       0.0  0.000000  0.000000e+00  0.000000e+00  1.816199e-12  0.000000e+00
end      0.0  0.000000  0.000000e+00  0.000000e+00  0.000000e+00  1.369228e-17
[(0, 0), ('NNP', 'Janet'), ('MD', 'will'), ('RB', 'back'), ('DT', 'the'), ('NN', 'bill')]

In [ ]: