Kalman Filter


In [1]:
import sys
# Add the symgp folder path to the sys.path list
module_path = r'/Users/jaduol/Documents/Uni (original)/Part II/IIB/MEng Project/'
if module_path not in sys.path:
    sys.path.append(module_path)

from symgp import SuperMatSymbol, utils, MVG, Variable, SuperDiagMat, Kernel, Covariance, Constant, Mean
from sympy import symbols, ZeroMatrix, Identity
from IPython.display import display, Math, Latex, clear_output

In [2]:
# Shapes
m, n, t = symbols('m n t')

1. Variables


In [3]:
# Variables
x_prev, x_t, y_prev, y_t, b_t, d_t, w_t, e_t = utils.variables('x_{t-1} x_t y_{1:t-1} y_t b_t d_t w_t e_t', 
                                                                [m, m, (n,t-1), n, m, n, m, n])

2. Constant Parameters


In [4]:
# Constant parameters
A, C = utils.constants('A C',[(m,m), (n,m)])                                  
Q = Covariance(w_t, name='Q')
R = Covariance(e_t, name='R')

3. p(x(t-1)|y(1:t-1))


In [5]:
# p(x_(t-1)|y_(1:t-1))
f_prev = Mean(x_prev, name='m_{t-1|t-1}') 
F_prev = Covariance(x_prev, name='P_{t-1|t-1}') 
p_xprev = MVG([x_prev], mean=f_prev, cov=F_prev, cond_vars=[y_prev])

print("p_xprev:")
display(Latex(utils.matLatex(p_xprev)))


p_xprev:
\begin{align*} p\left(\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}\right)&= \mathcal{N}\left(\mathbf{x_{t-1}};\mathbf{m}_{\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}},\mathbf{\Sigma}_{\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}}\right)\\ \mathbf{m}_{\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}} &= \mathbf{m_{t-1|t-1}}\\ \mathbf{\Sigma}_{\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}} &= \mathbf{P_{t-1|t-1}}\\ \end{align*}

In [6]:
print(p_xprev.covar.expanded)


P_{t-1|t-1}

4. p(xt|x(t-1))


In [6]:
p_xt = MVG([x_t], mean=A*x_prev, cov=Q, cond_vars=[x_prev])

print("p_xt:")
display(Latex(utils.matLatex(p_xt)))


p_xt:
\begin{align*} p\left(\mathbf{x_t}|\mathbf{x_{t-1}}\right)&= \mathcal{N}\left(\mathbf{x_t};\mathbf{m}_{\mathbf{x_t}|\mathbf{x_{t-1}}},\mathbf{\Sigma}_{\mathbf{x_t}|\mathbf{x_{t-1}}}\right)\\ \mathbf{m}_{\mathbf{x_t}|\mathbf{x_{t-1}}} &= \mathbf{A} \mathbf{x_{t-1}}\\ \mathbf{\Sigma}_{\mathbf{x_t}|\mathbf{x_{t-1}}} &= \mathbf{Q}\\ \end{align*}

In [7]:
print(Covariance._used_names)


['x_{t-1}', 'x_t', 'y_{1:t-1}', 'y_t', 'b_t', 'd_t', 'w_t', 'e_t', 'A', 'C', 'Q', 'R', 'm_{t-1|t-1}', 'P_{t-1|t-1}', 'm_{x_{t-1}|y_{1:t-1}}', 'S_{x_{t-1},x_{t-1}|y_{1:t-1}}', 'm_{x_t|x_{t-1}}', 'S_{x_t,x_t|x_{t-1}}']

In [9]:
print(p_xprev.covar)


S_{x_{t-1},x_{t-1}|y_{1:t-1}}

5. p(xt,x(t-1)|y_(1:t-1))


In [7]:
p_xt_xprev = p_xt*p_xprev

print("p_xt_xprev:")
display(Latex(utils.matLatex(p_xt_xprev)))


p_xt_xprev:
\begin{align*} p\left(\mathbf{x_t},\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}\right)&= \mathcal{N}\left(\left[\begin{smallmatrix}\mathbf{x_t}\\\mathbf{x_{t-1}}\end{smallmatrix}\right];\mathbf{m}_{\mathbf{x_t},\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}},\mathbf{\Sigma}_{\mathbf{x_t},\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}}\right)\\ \mathbf{m}_{\mathbf{x_t},\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}} &= \left[\begin{smallmatrix}\mathbf{A} \mathbf{m_{t-1|t-1}}\\\mathbf{m_{t-1|t-1}}\end{smallmatrix}\right]\\ \mathbf{\Sigma}_{\mathbf{x_t},\mathbf{x_{t-1}}|\mathbf{y_{1:t-1}}} &= \left[\begin{smallmatrix}\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T&\mathbf{A} \mathbf{P_{t-1|t-1}}\\\mathbf{P_{t-1|t-1}} \mathbf{A}^T&\mathbf{P_{t-1|t-1}}\end{smallmatrix}\right]\\ \end{align*}

6. p(xt|y(1:t-1))


In [8]:
p_xt_predict = p_xt_xprev.marginalise([x_prev])

print("p_xt_predict:")
display(Latex(utils.matLatex(p_xt_predict)))


p_xt_predict:
\begin{align*} p\left(\mathbf{x_t}|\mathbf{y_{1:t-1}}\right)&= \mathcal{N}\left(\mathbf{x_t};\mathbf{m}_{\mathbf{x_t}|\mathbf{y_{1:t-1}}},\mathbf{\Sigma}_{\mathbf{x_t}|\mathbf{y_{1:t-1}}}\right)\\ \mathbf{m}_{\mathbf{x_t}|\mathbf{y_{1:t-1}}} &= \mathbf{A} \mathbf{m_{t-1|t-1}}\\ \mathbf{\Sigma}_{\mathbf{x_t}|\mathbf{y_{1:t-1}}} &= \mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\\ \end{align*}

7. p(y_t|x_t)


In [9]:
p_yt = MVG([y_t], mean=C*x_t, cov=R, cond_vars=[x_t])

print("p_yt:")
display(Latex(utils.matLatex(p_yt)))


p_yt:
\begin{align*} p\left(\mathbf{y_t}|\mathbf{x_t}\right)&= \mathcal{N}\left(\mathbf{y_t};\mathbf{m}_{\mathbf{y_t}|\mathbf{x_t}},\mathbf{\Sigma}_{\mathbf{y_t}|\mathbf{x_t}}\right)\\ \mathbf{m}_{\mathbf{y_t}|\mathbf{x_t}} &= \mathbf{C} \mathbf{x_t}\\ \mathbf{\Sigma}_{\mathbf{y_t}|\mathbf{x_t}} &= \mathbf{R}\\ \end{align*}

8. p(y_t ,xt|y(1:t-1))


In [10]:
p_yt_xt = p_yt*p_xt_predict

print("p_yt_xt:")
display(Latex(utils.matLatex(p_yt_xt)))


p_yt_xt:
\begin{align*} p\left(\mathbf{y_t},\mathbf{x_t}|\mathbf{y_{1:t-1}}\right)&= \mathcal{N}\left(\left[\begin{smallmatrix}\mathbf{y_t}\\\mathbf{x_t}\end{smallmatrix}\right];\mathbf{m}_{\mathbf{y_t},\mathbf{x_t}|\mathbf{y_{1:t-1}}},\mathbf{\Sigma}_{\mathbf{y_t},\mathbf{x_t}|\mathbf{y_{1:t-1}}}\right)\\ \mathbf{m}_{\mathbf{y_t},\mathbf{x_t}|\mathbf{y_{1:t-1}}} &= \left[\begin{smallmatrix}\mathbf{C} \mathbf{A} \mathbf{m_{t-1|t-1}}\\\mathbf{A} \mathbf{m_{t-1|t-1}}\end{smallmatrix}\right]\\ \mathbf{\Sigma}_{\mathbf{y_t},\mathbf{x_t}|\mathbf{y_{1:t-1}}} &= \left[\begin{smallmatrix}\mathbf{R} + \mathbf{C} \left(\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\right) \mathbf{C}^T&\mathbf{C} \left(\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\right)\\\left(\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\right) \mathbf{C}^T&\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\end{smallmatrix}\right]\\ \end{align*}

9. p(xt|y(1:t))


In [11]:
p_xt_update = p_yt_xt.condition([y_t])

print("p_xt_update:")
display(Latex(utils.matLatex(p_xt_update)))


p_xt_update:
\begin{align*} p\left(\mathbf{x_t}|\mathbf{y_{1:t-1}},\mathbf{y_t}\right)&= \mathcal{N}\left(\mathbf{x_t};\mathbf{m}_{\mathbf{x_t}|\mathbf{y_{1:t-1}},\mathbf{y_t}},\mathbf{\Sigma}_{\mathbf{x_t}|\mathbf{y_{1:t-1}},\mathbf{y_t}}\right)\\ \mathbf{m}_{\mathbf{x_t}|\mathbf{y_{1:t-1}},\mathbf{y_t}} &= \mathbf{A} \mathbf{m_{t-1|t-1}} + \left(\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\right) \mathbf{C}^T \left(\mathbf{R} + \mathbf{C} \left(\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\right) \mathbf{C}^T\right)^{-1} \left(\mathbf{y_t} - \mathbf{C} \mathbf{A} \mathbf{m_{t-1|t-1}}\right)\\ \mathbf{\Sigma}_{\mathbf{x_t}|\mathbf{y_{1:t-1}},\mathbf{y_t}} &= \mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T - \left(\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\right) \mathbf{C}^T \left(\mathbf{R} + \mathbf{C} \left(\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\right) \mathbf{C}^T\right)^{-1} \mathbf{C} \left(\mathbf{Q} + \mathbf{A} \mathbf{P_{t-1|t-1}} \mathbf{A}^T\right)\\ \end{align*}

Numerical Evaluation


In [12]:
from sympy import Matrix, Identity
import numpy as np

d = {'A': np.array([[1, 0, 1, 0],[0, 1, 0, 1],[0, 0, 1, 0],[0, 0, 0, 1]],dtype=np.float32),
     'C': np.array([[1, 0, 0, 0],[0, 1, 0, 0]], dtype=np.float32),
     'Q': 0.001*np.eye(4),#Matrix(Identity(4)),
     'R': np.eye(2),#Matrix(Identity(2)),
     'm_{t-1|t-1}': np.array([[8],[10],[1],[0]],dtype=np.float32),
     'P_{t-1|t-1}': np.eye(4),#Matrix(Identity(4)),
     'b_t': np.array([[0],[0],[0],[0]],dtype=np.float32),
     'd_t': np.array([[0],[0]],dtype=np.float32)}

In [13]:
def ldsSample(A, C, Q, R, initmu, T):
    """
        Simulates a run of a linear dynamical system
    """
    
    A, C = np.array(A.tolist(), dtype=np.float32), np.array(C.tolist(), dtype=np.float32)
    Q, R = np.array(Q.tolist(), dtype=np.float32), np.array(R.tolist(), dtype=np.float32)
    initmu = np.array(initmu.tolist(), dtype=np.float32).reshape((-1,))
    
    ss = A.shape[0]
    os = C.shape[0]
    
    x = np.zeros((ss,T))
    y = np.zeros((os,T))
    
    x[:,0] = np.dot(A,initmu) + np.random.multivariate_normal(np.zeros(ss),Q)
    y[:,0] = np.dot(C,x[:,0]) + np.random.multivariate_normal(np.zeros(os),R)
    
    for t in range(1,T):
        x[:,t] = np.dot(A,x[:,t-1]) + np.random.multivariate_normal(np.zeros(ss),Q)
        y[:,t] = np.dot(C,x[:,t-1]) + np.random.multivariate_normal(np.zeros(os),R)
    
    return x,y

In [14]:
T = 15
x, y = ldsSample(d['A'], d['C'], d['Q'], d['R'], d['m_{t-1|t-1}'], T)

d['y_t'] = Matrix(y[:,0].reshape((-1,1)))

In [17]:
def kalmanFilter(y, A, C, Q, R, init_m, init_V, debug=False):
    
    A, C = np.array(A.tolist(), dtype=np.float32), np.array(C.tolist(), dtype=np.float32)
    Q, R = np.array(Q.tolist(), dtype=np.float32), np.array(R.tolist(), dtype=np.float32)
    init_m = np.array(init_m.tolist(), dtype=np.float32).reshape((-1,))
    init_V = np.array(init_V.tolist(), dtype=np.float32)
    
    os, T = y.shape   #Observations shape 
    ss = A.shape[0]   #State space size
    
    m = np.zeros((ss, T))
    mpred = np.zeros((ss, T))
    V = np.zeros((ss, ss, T))
    Vpred = np.zeros((ss, ss, T))
    
    for t in range(T):
        if t==0:
            d['m_{t-1|t-1}'] = init_m
            d['P_{t-1|t-1}'] = init_V
        else:
            d['m_{t-1|t-1}'] = m[:,t-1]
            d['P_{t-1|t-1}'] = V[:,:,t-1]
        
        d['y_t'] = Matrix(y[:,t].reshape((-1,1)))
        
        if debug:
            print("t: ", t)
            print("d['m_{t-1|t-1}']: ",d['m_{t-1|t-1}'])
            print("d['P_{t-1|t-1}']: ",d['P_{t-1|t-1}'])
            print("d['y_t']: ", d['y_t'])
        
    
        m[:,t] =  utils.replace_with_num(p_xt_update.mean.to_full_expr(), d, debug)
        V[:,:,t] = utils.replace_with_num(p_xt_update.covar.to_full_expr(), d, debug)
        mpred[:,t] = utils.replace_with_num(p_xt_predict.mean.to_full_expr(), d, debug)
        Vpred[:,:,t] = utils.replace_with_num(p_xt_predict.covar.to_full_expr(), d, debug)
                
    
    return m, V, mpred, Vpred

In [18]:
mfilt, Vfilt, mpred, Vpred = kalmanFilter(y, d['A'], d['C'], d['Q'], d['R'], d['m_{t-1|t-1}'], d['P_{t-1|t-1}'])

In [19]:
#Configure the matplotlib backend as plotting inline
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.patches as patches

#Enables latex
from matplotlib import rc
rc('font',**{'family':'sans-serif','sans-serif':['Helvetica'],'size':'20'})
## for Palatino and other serif fonts use:
#rc('font',**{'family':'serif','serif':['Palatino']})
rc('text', usetex=True)

def plotGaussianEllipse(ax, m, V, ellipsecolor, markercolor,label=None,t=None):
    """
        Plots an ellipse representing the covariance matrix of a Gaussian.
        
        Args:
            ax - The Axes object on which to plot the ellipses
            m - The centre of the ellipse
            V - Covariance matrix
            ellipsecolor - Ellipse color border colour
            markercolor - Centre marker colour
          (Optional)
            label - label for the plot containing this point
            t - point at which to add new data
    """
    
    s = 4.605  #90% confidence interval
    eigVals, eigVecs = np.linalg.eig(V)
        
    #Sort eigvalues to make sure that large eigval is first
    idx = eigVals.argsort()[::-1]
    eigVals = eigVals[idx]
    eigVecs = eigVecs[:,idx]
        
    #Add ellipse to axes
    border = patches.Arc(xy=m,width = 2*(s*eigVals[0])**0.5,height = 2*(s*eigVals[1])**0.5,
                        angle=np.degrees(np.arctan2(eigVecs[1,0],eigVecs[0,0])), \
                         linewidth=0.5, linestyle='--',color=ellipsecolor,fill=False, zorder=2)
    ax.add_patch(border)
    
    #Either plot points that are joined by a line or plot them individually
    if t != None:
        data = ax.lines[-1].get_data()
        if t >= len(data[0]):
            new_xdata = np.append(data[0],m[0])
            new_ydata = np.append(data[1],m[1])
            ax.lines[-1].set_data((new_xdata, new_ydata))
        else:
            data[0][t] = m[0]
            data[1][t] = m[1]
            ax.lines[-1].set_data((data[0],data[1]))        
    else:   
        ax.plot(m[0],m[1],'x-',ms=10,mew=2,c=markercolor,label=label)
    
def plotLine(ax, a, b, color):
    """
        Plots a line between two points
    """
    ax.plot([a[0], b[0]],[a[1], b[1]], color=color,linestyle='solid')

In [20]:
#Set up figure
fig = plt.figure()
fig.set_size_inches(20,7)
ax = fig.add_subplot(111)
plt.axis('equal')

#Plot observations and true data
ax.set_xlim(left=5,right=25)
ax.set_ylim(bottom=5,top=15)
ax.plot(y[0,:],y[1,:],'ko',label=r'Obs')  
ax.plot(x[0,:],x[1,:],'g-s',label=r'True')
ax.set_xlabel('$x_{1}$')
ax.set_ylabel('$x_{2}$')
fig.tight_layout()

runTillEnd = False

#Plot Kalman filter estimate
for t in range(mfilt.shape[1]):
    if not runTillEnd:
        if input("Press enter to continue (Predict). Type 'rte' to run simulation to end.") == "rte":
            runTillEnd = True
            
    clear_output(wait=True)
    
    #Plot prediction
    plotGaussianEllipse(ax, mpred[:,t], Vpred[:,:,t], \
                    ellipsecolor='green', markercolor='green')
    display(plt.gcf())
    
    if not runTillEnd:
        if input("Press enter to continue (Measurement). Type 'rte' to run simulation to end.") == "rte":
            runTillEnd = True
            
    clear_output(wait=True)
    
    #Remove last prediction ellipse
    del ax.patches[-1]
    del ax.lines[-1]
            
    #Plot update
    if t == 0:
        plotGaussianEllipse(ax, mfilt[:2,t],Vfilt[:2,:2,t],\
                    ellipsecolor='blue', markercolor='red',label=r'Filter')
        ax.legend()
    else:
        plotGaussianEllipse(ax, mfilt[:2,t],Vfilt[:2,:2,t],\
                    ellipsecolor='blue', markercolor='red',t=t)
    display(plt.gcf())

clear_output(wait=True)


Simplification

Covariance of p(xt|y{1:t})


In [12]:
simp_exprs, subs = utils.simplify(p_xt_update.covar.to_full_expr())

In [15]:
print(p_xt_update.covar.to_full_expr())


Q + A*P_{t-1|t-1}*A' + (-1)*(Q + A*P_{t-1|t-1}*A')*C'*(R + C*(Q + A*P_{t-1|t-1}*A')*C')^-1*C*(Q + A*P_{t-1|t-1}*A')

In [13]:
for s in simp_exprs:
    print("s: ", s)

for s in subs.keys():
    print("%s: %s"%(s,subs[s]))


s:  (I + (-1)*A_{2}*C)*A_{1}
s:  (-1)*A_{2}*C*A_{1} + A_{1}
s:  A_{1}*(I + (-1)*C'*(R + C*A_{1}*C')^-1*C*A_{1})
s:  (I + (-1)*A_{1}*C'*(R + C*A_{1}*C')^-1*C)*A_{1}
s:  (-1)*A_{1}*C'*(R + C*A_{1}*C')^-1*C*A_{1} + A_{1}
s:  (Q + A_{0})*(I + (-1)*C'*(R + C*(Q + A_{0})*C')^-1*C*(Q + A_{0}))
s:  (I + (-1)*(Q + A_{0})*C'*(R + C*(Q + A_{0})*C')^-1*C)*(Q + A_{0})
s:  Q + (-1)*(Q + A_{0})*C'*(R + C*(Q + A_{0})*C')^-1*C*(Q + A_{0}) + A_{0}
Q + A_{0}: A_{1}
A*P_{t-1|t-1}*A': A_{0}
A_{1}*C'*(R + C*A_{1}*C')^-1: A_{2}

In [ ]: