This notebook implements some of the HMM algorithms using Theano. These may be helpful for incorporating them in the graphs of some other models in Theano. These are implemented in the same manner as in the other notebook about HMMs.
We start with a toy HMM example from Rabiner's paper.
Many improvements can be added to this (and may be added in the future):
everything should be moved to the log-domain - for computation stability. This is especially important for GPUs which usually have a lower floating-point precision. This example here works only for the smallest examples (a few observations).
continous density models - this discrete example cannot be applied to most practical examples.
custom topologies - this is a simple ergodic example. In practice, we'd want to be able to design a specific transition graph for a particular use.
combining models - real applications rely on being able to combine several models to be able to train them for specific problems. E.g. combining many tri-phone models for speech recognition. This should also include the ability to do state-tying.
better training algorithms - only Baum-Welch and the simplest gradient descent are demonstrated here. There are other methods that oculd work better/faster.
The example below will demonstrate the code using a similar notation and equations from Rabiner's paper. First we import the required stuff:
In [1]:
import numpy as np
import theano
import theano.tensor as T
from theano.ifelse import ifelse
import pickle
from collections import OrderedDict
In [2]:
class DiscreteHMM:
def __init__(self, N=3, M=4):
updates={}
pi = theano.shared((np.ones(N)/N).astype(theano.config.floatX))
a = theano.shared((np.ones((N,N))/(N*np.ones(N))).astype(theano.config.floatX))
b = theano.shared((np.ones((N,M))/(N*np.ones(M))).astype(theano.config.floatX))
N = theano.shared(N)
M = theano.shared(M)
self.pi=pi
self.a=a
self.b=b
self.N=N
self.M=M
O = T.ivector()
TT = O.shape[0]
#forward algorithm:
alpha0=pi*b[:,O[0]]
alpha_scan,upd = theano.scan(fn=lambda O,alpha_p: T.dot(alpha_p,a)*b[:,O],
sequences=O[1:],
outputs_info=alpha0)
updates.update(upd)
alpha=T.concatenate((alpha0.reshape((1,N)),alpha_scan))
#backward algorithm:
beta0=T.ones(N).astype(theano.config.floatX)
beta_scan,upd = theano.scan(fn=lambda O,beta_p: T.dot(beta_p*b[:,O],a.T),
sequences=O[1:],
outputs_info=beta0,
go_backwards=True)
updates.update(upd)
beta=T.concatenate((beta_scan[::-1],beta0.reshape((1,N))))
#full model probability:
full_prob = alpha_scan[-1].sum()
#forward-backward probabilities:
gamma=alpha*beta/full_prob
#viterbi algorithm:
def viterbi_rec_step(O, delta_p, phi_p):
m=delta_p*a.T
phi=m.argmax(axis=1)
delta=m[T.arange(N),phi]*b[:,O]
return delta,phi
phi0=T.zeros(N).astype('int64')
[delta_scan, phi_scan], upd = theano.scan(fn=viterbi_rec_step,
sequences=O[1:],
outputs_info=[alpha0,phi0])
updates.update(upd)
QT=phi_scan[-1].argmax()
vite_prob = delta_scan[-1,QT]
Q_scan, upd = theano.scan(fn=lambda phi, Q: phi[Q],
sequences=phi_scan,
outputs_info=QT,
go_backwards=True)
updates.update(upd)
Q=T.concatenate((Q_scan[::-1],QT.reshape((1,))))
#transition probabilities
xi=alpha[:-1].reshape((TT-1,N,1))*a.reshape((1,N,N))*b[:,O[1:]].T.reshape((TT-1,1,N))*beta[1:].reshape((TT-1,1,N))/full_prob
#expected values
exp_pi=gamma[0]
exp_a=xi.sum(axis=0)/gamma[:-1].sum(axis=0).reshape((N,1))
exp_b_map, upd = theano.map(fn=lambda k: T.sum(gamma[T.eq(O,k).nonzero()],axis=0)/T.sum(gamma,axis=0),
sequences=T.arange(M))
updates.update(upd)
exp_b = exp_b_map.T
exp_err = T.concatenate(((pi-exp_pi).ravel(),(a-exp_a).ravel(),(b-exp_b).ravel()))
exp_mean_err = T.mean(exp_err**2)
#Baum-Welch updates:
baum_welch_updates=OrderedDict()
exp_updates={pi:exp_pi,a:exp_a,b:exp_b}
baum_welch_updates.update(updates)
baum_welch_updates.update(exp_updates)
#Gradient descent:
pi_grad=T.grad(cost=full_prob,wrt=pi)
a_grad=T.grad(cost=full_prob,wrt=a)
b_grad=T.grad(cost=full_prob,wrt=b)
lr=T.scalar()
pi_upd=pi*(pi_grad**lr)
norm_pi_upd=pi_upd/pi_upd.sum()
a_upd=a*(a_grad**lr)
norm_a_upd=(a_upd.T/a_upd.sum(axis=1)).T
b_upd=b*(b_grad**lr)
norm_b_upd=b_upd/b_upd.sum(axis=0)
gd_updates=OrderedDict()
grad_updates={pi:norm_pi_upd,
a:norm_a_upd,
b:norm_b_upd}
gd_updates.update(updates)
gd_updates.update(grad_updates)
#function definitions
self.forward_fun = theano.function(inputs=[O], outputs=alpha, updates=updates)
self.backward_fun = theano.function(inputs=[O], outputs=beta, updates=updates)
self.full_prob_fun = theano.function(inputs=[O], outputs=full_prob, updates=updates)
self.gamma_fun = theano.function(inputs=[O], outputs=gamma, updates=updates)
self.viterbi_fun = theano.function(inputs=[O], outputs=[Q,vite_prob], updates=updates)
self.xi_fun = theano.function(inputs=[O], outputs=xi, updates=updates)
self.exp_fun = theano.function(inputs=[O], outputs=[exp_pi,exp_a,exp_b], updates=updates)
self.baum_welch_fun = theano.function(inputs=[O], outputs=[full_prob,exp_mean_err], updates=baum_welch_updates)
self.gd_fun = theano.function(inputs=[O,lr], outputs=full_prob, updates=gd_updates)
def setModel(self,pi,a,b,N,M):
self.pi.set_value(pi.astype(theano.config.floatX))
self.a.set_value(a.astype(theano.config.floatX))
self.b.set_value(b.astype(theano.config.floatX))
self.N.set_value(N)
self.M.set_value(M)
def getModel(self):
return self.pi.get_value(),self.a.get_value(),self.b.get_value(),self.N.get_value(),self.M.get_value()
def forward(self, O):
return self.forward_fun(O.astype('int32'))
def backward(self, O):
return self.backward_fun(O.astype('int32'))
def full_prob(self, O):
return self.full_prob_fun(O.astype('int32'))
def gamma(self, O):
return self.gamma_fun(O.astype('int32'))
def viterbi(self, O):
return self.viterbi_fun(O.astype('int32'))
def xi(self, O):
return self.xi_fun(O.astype('int32'))
def exp_values(self, O):
return self.exp_fun(O.astype('int32'))
def baum_welch(self,O):
return self.baum_welch_fun(O.astype('int32'))
def gradient_descent(self,O,lr=0.01):
return self.gd_fun(O.astype('int32'),lr)
In [3]:
with open('../data/hmm.pkl') as f:
O,pi,a,b,N,M,Time=pickle.load(f)
print 'Number of states: {}'.format(N)
print 'Number of observation classes: {}'.format(M)
print 'Number of time steps: {}'.format(Time) #T is taken by theano.tensor
print 'Observation sequence: {}'.format(O)
print 'Priors: {}'.format(pi)
print 'Transition matrix:\n{}'.format(a)
print 'Observation probability matrix:\n{}'.format(b)
Here we will contruct the HMM object. The constructor needs to compile everything and since we have a few functions, it may take a little while:
In [4]:
%time hmm=DiscreteHMM()
#we can also set the model parameters
hmm.setModel(pi,a,b,N,M)
In [5]:
print 'Forward probabilities:\n{}'.format(hmm.forward(O))
print 'Backward probabilities:\n{}'.format(hmm.backward(O))
print 'Full model probability: {}'.format(hmm.full_prob(O))
print 'Complete state probability:\n{}'.format(hmm.gamma(O))
seq,vite_prob=hmm.viterbi(O)
print 'Viterbi sequence: {} its probability {}'.format(seq,vite_prob)
print 'State transition probability:\n{}'.format(hmm.xi(O))
In [6]:
exp_pi,exp_a,exp_b=hmm.exp_values(O)
print 'Expected priors: {}'.format(exp_pi)
print 'Expected transitions:\n{}'.format(exp_a)
print 'Expected observations:\n{}'.format(exp_b)
We will run 15 iterations of the Baum-Welch EM reestimation here. We will also output the model probability (which should increase with each iteration) and also the mean difference between the model parameters and their expected values (which will decrease to 0 as the model converges on the optimum).
In [7]:
hmm.setModel(pi,a,b,N,M)
for i in range(15):
prob,exp_err=hmm.baum_welch(O)
print 'Iteration #{} P={} delta_exp={}'.format(i+1,prob,exp_err)
In [8]:
hmm.setModel(pi,a,b,N,M)
for i in range(20):
prob=hmm.gradient_descent(O,0.2)
print 'Iteration #{} P={}'.format(i+1,prob)
print hmm.full_prob(O)
pi_n,a_n,b_n,N_n,M_n=hmm.getModel()
np.set_printoptions(suppress=True)
print 'PI: {}'.format(pi_n)
print 'A:\n{}'.format(a_n)
print 'B:\n{}'.format(b_n)
np.set_printoptions(suppress=False)
This method quickly converges to the optimum, although in this example the optimum is not a very useful model because it stays mostly in one state all the time. Having several different sequences would probably serve as a better test for this method...
This is the same class above, but moved into the log domain. All the paramaters and calculations are done in the log domain.
Some log arithmetic hints can be found here.
In log domain, things like multiplication and division are trivial, but simple addition, subtraction and sum become a nuisance. That is why they need to be reimplemented by pulling the values back into the normal linear domain and then taking them back after the operation is completed. So add becomes LogAddExp and sum becomes LogSumExp and so on...
In [9]:
from pylearn2.expr.basic import log_sum_exp
def LogDot(a,b):
return log_sum_exp(a + b.T, axis=1)
def LogSum(a,axis=None):
return log_sum_exp(a,axis)
def LogAdd(a,b):
return T.log(T.exp(a)+T.exp(b))
def LogSub(a,b):
return T.log(T.exp(a)-T.exp(b))
Here is the actual class in the LogDomain:
In [10]:
class LogDiscreteHMM:
def __init__(self, N=3, M=4):
updates={}
pi = theano.shared((np.zeros(N)/N).astype(theano.config.floatX))
a = theano.shared((np.zeros((N,N))/(N*np.ones(N))).astype(theano.config.floatX))
b = theano.shared((np.zeros((N,M))/(N*np.ones(M))).astype(theano.config.floatX))
N = theano.shared(N)
M = theano.shared(M)
self.pi=pi
self.a=a
self.b=b
self.N=N
self.M=M
O = T.ivector()
TT = O.shape[0]
#forward algorithm:
alpha0=pi+b[:,O[0]]
alpha_scan,upd = theano.scan(fn=lambda O,alpha_p: LogDot(alpha_p,a)+b[:,O],
sequences=O[1:],
outputs_info=alpha0)
updates.update(upd)
alpha=T.concatenate((alpha0.reshape((1,N)),alpha_scan))
#backward algorithm:
beta0=T.zeros(N).astype(theano.config.floatX)
beta_scan,upd = theano.scan(fn=lambda O,beta_p: LogDot(beta_p+b[:,O],a.T),
sequences=O[1:],
outputs_info=beta0,
go_backwards=True)
updates.update(upd)
beta=T.concatenate((beta_scan[::-1],beta0.reshape((1,N))))
#full model probability:
full_prob = LogSum(alpha_scan[-1])
#forward-backward probabilities:
gamma=alpha+beta-full_prob
#viterbi algorithm:
def viterbi_rec_step(O, delta_p, phi_p):
m=delta_p+a.T
phi=m.argmax(axis=1)
delta=m[T.arange(N),phi]+b[:,O]
return delta,phi
phi0=T.zeros(N).astype('int64')
[delta_scan, phi_scan], upd = theano.scan(fn=viterbi_rec_step,
sequences=O[1:],
outputs_info=[alpha0,phi0])
updates.update(upd)
QT=phi_scan[-1].argmax()
vite_prob = delta_scan[-1,QT]
Q_scan, upd = theano.scan(fn=lambda phi, Q: phi[Q],
sequences=phi_scan,
outputs_info=QT,
go_backwards=True)
updates.update(upd)
Q=T.concatenate((Q_scan[::-1],QT.reshape((1,))))
#transition probabilities
xi=alpha[:-1].reshape((TT-1,N,1))+a.reshape((1,N,N))+b[:,O[1:]].T.reshape((TT-1,1,N))+beta[1:].reshape((TT-1,1,N))-full_prob
#expected values
exp_pi=gamma[0]
exp_a=LogSum(xi,axis=0)-LogSum(gamma[:-1],axis=0).reshape((N,1))
def exp_b_fun(k):
return ifelse(T.eq(gamma[T.eq(O,k).nonzero()].shape[0],0),
T.ones((a.shape[1],))*(-99),
LogSum(gamma[T.eq(O,k).nonzero()],axis=0)-LogSum(gamma,axis=0))
exp_b_map, upd = theano.map(fn=exp_b_fun, sequences=T.arange(M))
updates.update(upd)
exp_b = exp_b_map.T
exp_err = T.concatenate(((np.exp(pi)-np.exp(exp_pi)).ravel(),
(np.exp(a)-np.exp(exp_a)).ravel(),
(np.exp(b)-np.exp(exp_b)).ravel()))
exp_mean_err = T.mean(exp_err**2)
#Baum-Welch updates:
baum_welch_updates=OrderedDict()
exp_updates={pi:exp_pi,a:exp_a,b:exp_b}
baum_welch_updates.update(updates)
baum_welch_updates.update(exp_updates)
#Gradient descent:
pi_grad=T.grad(cost=full_prob,wrt=pi)
a_grad=T.grad(cost=full_prob,wrt=a)
b_grad=T.grad(cost=full_prob,wrt=b)
lr=T.scalar()
pi_upd=pi+(pi_grad*lr)
norm_pi_upd=pi_upd-LogSum(pi_upd)
a_upd=a+(a_grad*lr)
norm_a_upd=(a_upd.T-LogSum(a_upd,axis=1)).T
b_upd=b+(b_grad*lr)
norm_b_upd=b_upd-LogSum(b_upd,axis=0)
gd_updates=OrderedDict()
grad_updates={pi:norm_pi_upd,
a:norm_a_upd,
b:norm_b_upd}
gd_updates.update(updates)
gd_updates.update(grad_updates)
#function definitions
self.forward_fun = theano.function(inputs=[O], outputs=alpha, updates=updates)
self.backward_fun = theano.function(inputs=[O], outputs=beta, updates=updates)
self.full_prob_fun = theano.function(inputs=[O], outputs=full_prob, updates=updates)
self.gamma_fun = theano.function(inputs=[O], outputs=gamma, updates=updates)
self.viterbi_fun = theano.function(inputs=[O], outputs=[Q,vite_prob], updates=updates)
self.xi_fun = theano.function(inputs=[O], outputs=xi, updates=updates)
self.exp_fun = theano.function(inputs=[O], outputs=[exp_pi,exp_a,exp_b], updates=updates)
self.baum_welch_fun = theano.function(inputs=[O], outputs=[full_prob,exp_mean_err], updates=baum_welch_updates)
self.gd_fun = theano.function(inputs=[O,lr], outputs=full_prob, updates=gd_updates)
def setModel(self,pi,a,b,N,M):
self.pi.set_value(pi.astype(theano.config.floatX))
self.a.set_value(a.astype(theano.config.floatX))
self.b.set_value(b.astype(theano.config.floatX))
self.N.set_value(N)
self.M.set_value(M)
def getModel(self):
return self.pi.get_value(),self.a.get_value(),self.b.get_value(),self.N.get_value(),self.M.get_value()
def forward(self, O):
return self.forward_fun(O.astype('int32'))
def backward(self, O):
return self.backward_fun(O.astype('int32'))
def full_prob(self, O):
return self.full_prob_fun(O.astype('int32'))
def gamma(self, O):
return self.gamma_fun(O.astype('int32'))
def viterbi(self, O):
return self.viterbi_fun(O.astype('int32'))
def xi(self, O):
return self.xi_fun(O.astype('int32'))
def exp_values(self, O):
return self.exp_fun(O.astype('int32'))
def baum_welch(self,O):
return self.baum_welch_fun(O.astype('int32'))
def gradient_descent(self,O,lr=0.01):
return self.gd_fun(O.astype('int32'),lr)
Here we construct the object. It is not much more complicated than the one above:
In [11]:
%time loghmm=LogDiscreteHMM()
Since all the parameters are in the log domain, we have to take logarithms of all the values that were used above:
In [12]:
loghmm.setModel(np.log(pi),np.log(a),np.log(b),N,M)
And we have to compute the exponential of the results to get back into the normal domain. Nevertheless, the results are the same as above:
In [13]:
print 'Forward probabilities:\n{}'.format(np.exp(loghmm.forward(O)))
print 'Backward probabilities:\n{}'.format(np.exp(loghmm.backward(O)))
print 'Full model probability: {}'.format(np.exp(loghmm.full_prob(O)))
print 'Complete state probability:\n{}'.format(np.exp(loghmm.gamma(O)))
seq,vite_prob=loghmm.viterbi(O)
print 'Viterbi sequence: {} its probability {}'.format(seq,np.exp(vite_prob))
print 'State transition probability:\n{}'.format(np.exp(loghmm.xi(O)))
The expected values for Baum-Welch are also correct:
In [14]:
exp_pi,exp_a,exp_b=loghmm.exp_values(O)
print 'Expected priors: {}'.format(np.exp(exp_pi))
print 'Expected transitions:\n{}'.format(np.exp(exp_a))
print 'Expected observations:\n{}'.format(np.exp(exp_b))
And the Baum-Welch procedure works the same as well. The only exception here is that the exp_err value is not retrieved in the log domain, since it's more convinient this way:
In [15]:
loghmm.setModel(np.log(pi),np.log(a),np.log(b),N,M)
for i in range(15):
prob,exp_err=loghmm.baum_welch(O)
print 'Iteration #{} P={} delta_exp={}'.format(i+1,np.exp(prob),exp_err)
Finally, gradient descent works similarly to the one above:
In [16]:
loghmm.setModel(np.log(pi),np.log(a),np.log(b),N,M)
for i in range(20):
prob=loghmm.gradient_descent(O,0.2)
print 'Iteration #{} P={}'.format(i+1,np.exp(prob))
In [ ]: