In [1]:
import sys
# Add the symgp folder path to the sys.path list
module_path = r'/Users/jaduol/Documents/Uni (original)/Part II/IIB/MEng Project/'
if module_path not in sys.path:
sys.path.append(module_path)
from symgp import SuperMatSymbol, utils, MVG, Variable, SuperDiagMat, Kernel, Covariance, Constant, Mean
from sympy import symbols, ZeroMatrix, Identity
from IPython.display import display, Math, Latex, clear_output
In [2]:
# Shapes
m, n, t = symbols('m n t')
In [3]:
# Variables
x_prev, x_t, y_prev, y_t, b_t, d_t, w_t, e_t = utils.variables('x_{t-1} x_t y_{1:t-1} y_t b_t d_t w_t e_t',
[m, m, (n,t-1), n, m, n, m, n])
In [4]:
# Constant parameters
A, C = utils.constants('A C',[(m,m), (n,m)])
Q = Covariance(w_t, name='Q')
R = Covariance(e_t, name='R')
In [5]:
# p(x_(t-1)|y_(1:t-1))
f_prev = Mean(x_prev, name='m_{t-1|t-1}')
F_prev = Covariance(x_prev, name='P_{t-1|t-1}')
p_xprev = MVG([x_prev], mean=f_prev, cov=F_prev, cond_vars=[y_prev])
print("p_xprev:")
display(Latex(utils.matLatex(p_xprev)))
In [6]:
print(p_xprev.covar.expanded)
In [6]:
p_xt = MVG([x_t], mean=A*x_prev, cov=Q, cond_vars=[x_prev])
print("p_xt:")
display(Latex(utils.matLatex(p_xt)))
In [7]:
print(Covariance._used_names)
In [9]:
print(p_xprev.covar)
In [7]:
p_xt_xprev = p_xt*p_xprev
print("p_xt_xprev:")
display(Latex(utils.matLatex(p_xt_xprev)))
In [8]:
p_xt_predict = p_xt_xprev.marginalise([x_prev])
print("p_xt_predict:")
display(Latex(utils.matLatex(p_xt_predict)))
In [9]:
p_yt = MVG([y_t], mean=C*x_t, cov=R, cond_vars=[x_t])
print("p_yt:")
display(Latex(utils.matLatex(p_yt)))
In [10]:
p_yt_xt = p_yt*p_xt_predict
print("p_yt_xt:")
display(Latex(utils.matLatex(p_yt_xt)))
In [11]:
p_xt_update = p_yt_xt.condition([y_t])
print("p_xt_update:")
display(Latex(utils.matLatex(p_xt_update)))
In [12]:
from sympy import Matrix, Identity
import numpy as np
d = {'A': np.array([[1, 0, 1, 0],[0, 1, 0, 1],[0, 0, 1, 0],[0, 0, 0, 1]],dtype=np.float32),
'C': np.array([[1, 0, 0, 0],[0, 1, 0, 0]], dtype=np.float32),
'Q': 0.001*np.eye(4),#Matrix(Identity(4)),
'R': np.eye(2),#Matrix(Identity(2)),
'm_{t-1|t-1}': np.array([[8],[10],[1],[0]],dtype=np.float32),
'P_{t-1|t-1}': np.eye(4),#Matrix(Identity(4)),
'b_t': np.array([[0],[0],[0],[0]],dtype=np.float32),
'd_t': np.array([[0],[0]],dtype=np.float32)}
In [13]:
def ldsSample(A, C, Q, R, initmu, T):
"""
Simulates a run of a linear dynamical system
"""
A, C = np.array(A.tolist(), dtype=np.float32), np.array(C.tolist(), dtype=np.float32)
Q, R = np.array(Q.tolist(), dtype=np.float32), np.array(R.tolist(), dtype=np.float32)
initmu = np.array(initmu.tolist(), dtype=np.float32).reshape((-1,))
ss = A.shape[0]
os = C.shape[0]
x = np.zeros((ss,T))
y = np.zeros((os,T))
x[:,0] = np.dot(A,initmu) + np.random.multivariate_normal(np.zeros(ss),Q)
y[:,0] = np.dot(C,x[:,0]) + np.random.multivariate_normal(np.zeros(os),R)
for t in range(1,T):
x[:,t] = np.dot(A,x[:,t-1]) + np.random.multivariate_normal(np.zeros(ss),Q)
y[:,t] = np.dot(C,x[:,t-1]) + np.random.multivariate_normal(np.zeros(os),R)
return x,y
In [14]:
T = 15
x, y = ldsSample(d['A'], d['C'], d['Q'], d['R'], d['m_{t-1|t-1}'], T)
d['y_t'] = Matrix(y[:,0].reshape((-1,1)))
In [17]:
def kalmanFilter(y, A, C, Q, R, init_m, init_V, debug=False):
A, C = np.array(A.tolist(), dtype=np.float32), np.array(C.tolist(), dtype=np.float32)
Q, R = np.array(Q.tolist(), dtype=np.float32), np.array(R.tolist(), dtype=np.float32)
init_m = np.array(init_m.tolist(), dtype=np.float32).reshape((-1,))
init_V = np.array(init_V.tolist(), dtype=np.float32)
os, T = y.shape #Observations shape
ss = A.shape[0] #State space size
m = np.zeros((ss, T))
mpred = np.zeros((ss, T))
V = np.zeros((ss, ss, T))
Vpred = np.zeros((ss, ss, T))
for t in range(T):
if t==0:
d['m_{t-1|t-1}'] = init_m
d['P_{t-1|t-1}'] = init_V
else:
d['m_{t-1|t-1}'] = m[:,t-1]
d['P_{t-1|t-1}'] = V[:,:,t-1]
d['y_t'] = Matrix(y[:,t].reshape((-1,1)))
if debug:
print("t: ", t)
print("d['m_{t-1|t-1}']: ",d['m_{t-1|t-1}'])
print("d['P_{t-1|t-1}']: ",d['P_{t-1|t-1}'])
print("d['y_t']: ", d['y_t'])
m[:,t] = utils.replace_with_num(p_xt_update.mean.to_full_expr(), d, debug)
V[:,:,t] = utils.replace_with_num(p_xt_update.covar.to_full_expr(), d, debug)
mpred[:,t] = utils.replace_with_num(p_xt_predict.mean.to_full_expr(), d, debug)
Vpred[:,:,t] = utils.replace_with_num(p_xt_predict.covar.to_full_expr(), d, debug)
return m, V, mpred, Vpred
In [18]:
mfilt, Vfilt, mpred, Vpred = kalmanFilter(y, d['A'], d['C'], d['Q'], d['R'], d['m_{t-1|t-1}'], d['P_{t-1|t-1}'])
In [19]:
#Configure the matplotlib backend as plotting inline
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.patches as patches
#Enables latex
from matplotlib import rc
rc('font',**{'family':'sans-serif','sans-serif':['Helvetica'],'size':'20'})
## for Palatino and other serif fonts use:
#rc('font',**{'family':'serif','serif':['Palatino']})
rc('text', usetex=True)
def plotGaussianEllipse(ax, m, V, ellipsecolor, markercolor,label=None,t=None):
"""
Plots an ellipse representing the covariance matrix of a Gaussian.
Args:
ax - The Axes object on which to plot the ellipses
m - The centre of the ellipse
V - Covariance matrix
ellipsecolor - Ellipse color border colour
markercolor - Centre marker colour
(Optional)
label - label for the plot containing this point
t - point at which to add new data
"""
s = 4.605 #90% confidence interval
eigVals, eigVecs = np.linalg.eig(V)
#Sort eigvalues to make sure that large eigval is first
idx = eigVals.argsort()[::-1]
eigVals = eigVals[idx]
eigVecs = eigVecs[:,idx]
#Add ellipse to axes
border = patches.Arc(xy=m,width = 2*(s*eigVals[0])**0.5,height = 2*(s*eigVals[1])**0.5,
angle=np.degrees(np.arctan2(eigVecs[1,0],eigVecs[0,0])), \
linewidth=0.5, linestyle='--',color=ellipsecolor,fill=False, zorder=2)
ax.add_patch(border)
#Either plot points that are joined by a line or plot them individually
if t != None:
data = ax.lines[-1].get_data()
if t >= len(data[0]):
new_xdata = np.append(data[0],m[0])
new_ydata = np.append(data[1],m[1])
ax.lines[-1].set_data((new_xdata, new_ydata))
else:
data[0][t] = m[0]
data[1][t] = m[1]
ax.lines[-1].set_data((data[0],data[1]))
else:
ax.plot(m[0],m[1],'x-',ms=10,mew=2,c=markercolor,label=label)
def plotLine(ax, a, b, color):
"""
Plots a line between two points
"""
ax.plot([a[0], b[0]],[a[1], b[1]], color=color,linestyle='solid')
In [20]:
#Set up figure
fig = plt.figure()
fig.set_size_inches(20,7)
ax = fig.add_subplot(111)
plt.axis('equal')
#Plot observations and true data
ax.set_xlim(left=5,right=25)
ax.set_ylim(bottom=5,top=15)
ax.plot(y[0,:],y[1,:],'ko',label=r'Obs')
ax.plot(x[0,:],x[1,:],'g-s',label=r'True')
ax.set_xlabel('$x_{1}$')
ax.set_ylabel('$x_{2}$')
fig.tight_layout()
runTillEnd = False
#Plot Kalman filter estimate
for t in range(mfilt.shape[1]):
if not runTillEnd:
if input("Press enter to continue (Predict). Type 'rte' to run simulation to end.") == "rte":
runTillEnd = True
clear_output(wait=True)
#Plot prediction
plotGaussianEllipse(ax, mpred[:,t], Vpred[:,:,t], \
ellipsecolor='green', markercolor='green')
display(plt.gcf())
if not runTillEnd:
if input("Press enter to continue (Measurement). Type 'rte' to run simulation to end.") == "rte":
runTillEnd = True
clear_output(wait=True)
#Remove last prediction ellipse
del ax.patches[-1]
del ax.lines[-1]
#Plot update
if t == 0:
plotGaussianEllipse(ax, mfilt[:2,t],Vfilt[:2,:2,t],\
ellipsecolor='blue', markercolor='red',label=r'Filter')
ax.legend()
else:
plotGaussianEllipse(ax, mfilt[:2,t],Vfilt[:2,:2,t],\
ellipsecolor='blue', markercolor='red',t=t)
display(plt.gcf())
clear_output(wait=True)
In [12]:
simp_exprs, subs = utils.simplify(p_xt_update.covar.to_full_expr())
In [15]:
print(p_xt_update.covar.to_full_expr())
In [13]:
for s in simp_exprs:
print("s: ", s)
for s in subs.keys():
print("%s: %s"%(s,subs[s]))
In [ ]: