In [ ]:
import numpy as np
import random
import sympy as sp
import numpy.linalg as la
import control as pc
%matplotlib notebook
import matplotlib.pyplot as plt
plt.ioff()
plt.rcParams['figure.figsize'] = [10, 5]
In [ ]:
''' function A = getA(s,w,p)
Creates symbolic A Matrix with given vectors of symbol names.
Creates ordered A, first complex poles, then real poles
w: symbols of real parts of complex poles
s: symbols of imaginary part of complex poles
p: symbols of purely real poles
A: sympy symbolical matrix
'''
def getA(s,w,p):
if not isinstance(w, tuple):
w = (w,)
if not isinstance(s, tuple):
s = (s,)
if not isinstance(p, tuple):
p = (p,)
A = sp.zeros(len(2*w+p), len(2*w)+len(p))
for i,(v,r) in enumerate(zip(w,s)):
XX = sp.Matrix([[r, v],[-v,r]])
ix = 2*i
A[ix, ix] = r
A[ix+1, ix+1] = r
A[ix, ix+1] = v
A[ix+1, ix] = -v
for i,r in enumerate(p):
A[len(2*w)+i, len(2*w)+i] = r
return A
# Test getA
#wx = sp.symbols('w_1:3')
#sx = sp.symbols('s_1:3')
#px = sp.symbols('p_1:3')
#sp.pprint(getA(wx,sx,px))
''' function A, (ss, s_init), (ws, w_init), (ps, p_init) = getA2(init_arr)
Creates symbolic A Matrix and tuples of ( (symbolic names), [given values]),
with given numerical vector of (complex) poles.
Creates A with the order of the given poles, and return tupels
to map the numerical values to the respective symbolic name.
init_arr: (numpy) array of numerical poles
ws: symbols of real parts of complex poles
w_init: numerical values of real parts of complex poles
ss: symbols of imaginary parts of complex poles
s_init: numerical values of imaginary parts of complex poles
ps: symbols of real poles
p_init: numerical values of real poles
'''
def getA2(init_arr):
num_cplx_poles = 0
num_real_poles = 0
A = None
w_init = []
ws = []
s_init = []
ss = []
p_init = []
ps = []
for val in init_arr:
if np.iscomplex(val):
if np.conjugate(val) in init_arr:
print "Ignored additional complex conjugate pole: {}".format(np.conjugate(val))
init_arr.remove(np.conjugate(val))
num_cplx_poles = num_cplx_poles + 1
w = sp.symbols('w_{}'.format(num_cplx_poles)) # creates tuple (w_1)
s = sp.symbols('s_{}'.format(num_cplx_poles)) # creates tuple (s_1)
X = sp.Matrix([[s, w],[-w,s]])
s_init.append(val.real)
ss.append(s)
w_init.append(val.imag)
ws.append(w)
else:
num_real_poles = num_real_poles + 1
p = sp.symbols('p_{}'.format(num_real_poles))
X = sp.Matrix([p])
p_init.append(val)
ps.append(p)
if A is None:
A = X
else:
if not A:
A = X
else:
A = A.row_insert(A.shape[0], sp.zeros(X.shape[0], A.shape[1]))
A = A.col_insert(A.shape[1], sp.zeros(A.shape[1], X.shape[0]).row_insert(A.shape[0], X))
return A, (tuple(ss), s_init), (tuple(ws), w_init), (tuple(ps), p_init)
# Test getA2()
#A, (ss, s_init), (ws, w_init), (ps, p_init) = getA2([-1, 2+3j, -2])
#sp.pprint(A)
#print ss, s_init
#print ws, w_init
#print ps, p_init
In [ ]:
## Helper functions
''' Create sympy vector from symbols tuple
'''
def to_sympy_vector(syms):
return sp.Matrix(syms)
sp.pprint(to_sympy_vector(sp.symbols('w_1:5')))
''' Lookup of numerical value from 'values' by mapping 'syms' on 'all_syms'
-> Slower than 'get_accessor' function
'''
def get_value_by_symbol(values, all_syms, syms):
res = []
for sy in syms:
idx = list(all_syms).index(sy)
res.append(values[idx])
return np.array(res).squeeze()
# Test
#wx = sp.symbols('w_1:3')
#sx = sp.symbols('s_1:3')
#assert(np.all(get_value_by_symbol(np.arange(len(sx+wx)), sx+wx, sx) == np.arange(len(sx))))
''' Creates lamdified function to lookup 'syms' in a vector structured like 'all_syms'
-> Faster than 'get_value_by_symbol', yet less general (error prone if structure changes)
'''
def get_accessor(all_syms_vector, syms):
return sp.lambdify(all_syms_vector, syms)
In [ ]:
'''
Unroll for np.matrix instead of np.array
'''
def _c(M):
if isinstance(M, np.matrix):
print "Try to avoid numpy matrices"
sh = M.shape
if sh[0] == 1:
return M.tolist()[0]
elif sh[1] == 1:
return M.T.tolist()[0]
else:
print "WARNING! Unrolling can go horribly wrong with matrizes!"
return M.ravel().tolist[0]
elif isinstance(M, np.ndarray):
return M
else:
raise ValueError('No numpy input to unroll')
#_c(np.matrix([1,2,3]))
#_c(np.matrix([1,2,3]))
#_c([1,2,3]
In [ ]:
''' Define Idetification System model with initial values
System is characterized by number of poles given with numerical values
'''
### Define poles with initial vlues
num_poles = [-5+3j, -4]
A, (s, s_init), (w, w_init), (p, p_init) = getA2(num_poles)
num_cplx_poles = len(s_init)
num_real_poles = len(p_init)
# Define all symbolic names in sympy tuples
b = sp.symbols('b_1:{}'.format(2*num_cplx_poles+num_real_poles+1)) # creates tuple (b_1, b_2, b_3, b_4)
d = sp.symbols('d_1:2') # creates tuple (d_1)
x = sp.symbols('x_1:{}'.format(2*num_cplx_poles+num_real_poles+1)) # creates tuple (x_1, x_2, x_3, x_4)
c = sp.symbols('c_1:{}'.format(2*num_cplx_poles+num_real_poles+1)) # creates tuple (c_1, c_2, c_3, c_4)
u = sp.symbols('u_1:2')
# define structures / matrizes / vectors of symbols
X = to_sympy_vector(x)
U = to_sympy_vector(u)
B = to_sympy_vector(b)
C = to_sympy_vector(c).T
D = to_sympy_vector(d)
z = x+s+w+p+b+c+d # tuple of all symbols in Z (state vector)
Z = to_sympy_vector(z)
zu = z+u
fx = A*X+B*U # State space representation -> This changes over time (has dynamics)
f = sp.Matrix([0 for _ in range(len(z)-len(x))]) # -> All parameters do not have dynamics but shall remain the same (the correct estimate)
f = fx.col_join(f) # join together for full state transition vector
print "f="; sp.pprint(f)
F = f.jacobian(z) # First order derivitive of f
print "F="; sp.pprint(F)
h = C*X+D*U
print "h="; sp.pprint(h)
H = h.jacobian(z) # First order derivitive of h
print "H="; sp.pprint(H)
# Create (fast) numericial functions to get numerical matrices/vectors with given values
# ("paste values to the right positions")
AA = sp.lambdify(z, A)
BB = sp.lambdify(z, B)
CC = sp.lambdify(z, C)
DD = sp.lambdify(z, D)
ff = sp.lambdify(zu, f)
FF = sp.lambdify(zu, F)
hh = sp.lambdify(zu, h)
HH = sp.lambdify(zu, H)
In [ ]:
# Define R and Q
R = np.eye(len(d)) * 1.0
print "R:", R
rho = 0.1
# Some certainty for values not in x or c.
# TODO: Read paper why...
q = np.array([])
for sym in z:
if sym in X.T.row_join(C):
q = np.hstack((q, 0))
else:
q = np.hstack((q, 1))
Q = rho * np.diag(q)
print "Q:\n", Q
In [ ]:
In [ ]:
'''
Initialize Testsystem
'''
import control as pc
class TestSystem:
def __init__(self):
Af = np.array([[ -2, 4],
[ -4, -2]])
Bf = np.array([[ 1],
[ 1]])
Cf = np.array([[ 1/0.3, 0.]])
Df = np.array([[ 0.]])
self.T = 0.1
ss2 = pc.ss(Af, Bf, Cf, Df)
#plt.plot(*pc.step_response(ss2))
# Time delay approx.
numerator, denominator = pc.pade(self.T, 15)
tf3 = pc.series(ss2, pc.tf(numerator,denominator))
ss3 = pc.tf2ss(tf3)
#plt.plot(*pc.step_response(ss3))
#plt.show()
#print pc.dcgain(ss2)
#print ss2.pole()
self.A = ss3.A
self.B = ss3.B
self.C = ss3.C
self.D = ss3.D
self.x0 = np.zeros((len(self.B),1))
self.xt = self.x0
def calc_TestSystem(self, u):
print "u", u.shape
u = np.array([u])
x_dot = self.A.dot(self.xt) + self.B.dot(u)
y = self.C.dot(self.xt) + self.D.dot(u)
self.xt = self.xt + x_dot*self.T
return y
sys = TestSystem()
In [ ]:
import control as con
Af = np.array([[ -2, 4],
[ -4, -2]])
Bf = np.array([[ 1],
[ 1]])
Cf = np.array([[ 1/0.3, 0.]])
Df = np.array([[ 0.]])
Tf = 0.1
ss2 = con.ss(Af, Bf, Cf, Df)
plt.plot(*con.step_response(ss2))
plt.show()
numerator, denominator = con.pade(Tf, 15)
tf3 = con.series(ss2, con.tf(numerator,denominator))
ss3 = con.tf2ss(tf3)
plt.plot(*con.step_response(ss3))
plt.show()
Af = ss3.A
Bf = ss3.B
Cf = ss3.C
Df = ss3.D
print con.dcgain(ss2)
print ss2.pole()
### Testsystem ####
xx_0 = np.zeros((len(Bf),1))
x_t_real = xx_0
def calc_TestSystem(u):
global Af, Bf, Cf, Df, x_t_real
#print "Af:\n", Af
#print "x_t:\n", x_t_real
x_dot = Af.dot(x_t_real)
x_dot = x_dot + Bf.dot(u)
y = Cf.dot(x_t_real) + Df.dot(u)
#print x_dot
x_t_real = x_t_real + x_dot*T
return y
In [ ]:
# return real part of a vector
def real_vector(vector):
return map(lambda x: x.real, vector)
# return imaginary part of a vector
def imag_vector(vector):
return map(lambda x: x.imag, vector)
In [ ]:
# initial
random.seed(42) # Set the random number generator to a fixed sequence.
np.random.seed(123) # Set the random number generator for fixed step length t
# Assign initial values to the poles
s_k0 = np.array([s_init])
#print s_k0, s_k0.shape
w_k0 = np.array([w_init])
#print w_k0, w_k0.shape
p_k0 = np.array([p_init])
#print p_k0, p_k0.shape
# Assign random values to the state space system
b_k0 = np.random.rand(len(b), 1)
#print b_k0, b_k0.shape
c_k0 = np.random.rand(1, len(c))
#print c_k0, c_k0.shape
d_k0 = np.zeros((len(d), len(d)))
#print d_k0, d_k0.shape
# Initial P
P_k0 = np.eye(len(z))
#print P_k0, P_k0.shape
# initial x0
x_k0 = np.zeros((len(x),1))
#print x_k0, x_k0.shape
# first step height u, also mean step height for all steps
u_k0 = np.array([[0.1]])
# first step time
step_time = 0.01
# Sampling time
T = 0.001
# internal Sampling
T_internal = 0.0005
T_internal = 0.001
# First values
x_k = x_k0
#print x_k0, x_k0.shape
u_k = u_k0
#print u_k, u_k.shape
z_k = np.hstack((x_k0.T, s_k0, w_k0, p_k0, b_k0.T, c_k0, d_k0)).T
#print z_k, z_k.shape
P_k = P_k0
#print P_k, P_k.shape
# naive Logging
poles = []
y = []
y_e = []
u = []
nor0 = []
nor1 = []
nor2 = []
nor3 = []
nor4 = []
# Create accessors
x_from_z = get_accessor(Z, to_sympy_vector(x))
s_from_z = get_accessor(Z, to_sympy_vector(s))
w_from_z = get_accessor(Z, to_sympy_vector(w))
p_from_z = get_accessor(Z, to_sympy_vector(p))
swp_from_z = get_accessor(Z, to_sympy_vector(s+w+p))
b_from_z = get_accessor(Z, to_sympy_vector(b))
c_from_z = get_accessor(Z, to_sympy_vector(c))
d_from_z = get_accessor(Z, to_sympy_vector(d))
In [ ]:
#%%time
##########################################################
# Loop start
for cnt, t in enumerate(np.arange(0, 30, T)):
####################################################
# Every time_step a new step is performed
if np.isclose(t, step_time, T):
#if cnt%1000 == 0:
u_k = u_k0 + random.uniform(-1, 1) # random step height
step = np.random.randint(low=0, high=3000)*T
step_time = step + t # random step length
print "step @ {} for {}: u={}".format(t, step, u_k)
# >>> Logging of pole trajectories
s_k = np.array(s_from_z(*z_k.ravel())).T
w_k = np.array(w_from_z(*z_k.ravel())).T
p_k = np.array(p_from_z(*z_k.ravel())).T
poles.append(np.array([complex(si,wi) for si,wi in zip(s_k, w_k)] +
[complex(si,-wi) for si,wi in zip(s_k, w_k)] +
[pi for pi in p_k]))
#print u_k, type(u_k), u_k.shape
# >>> Logging of u
u.append(u_k[0,0])
####################################################
##### Get system response y
y_t = calc_TestSystem(u_k)
#print u_k, u_k.shape
#print y_t, y_t.shape
# >>> Logging of y
y.append(y_t[0,0])
#print "enter"
for t_int in np.arange(0, T, T_internal):
#for t_int in np.arange(0, T, T):
#print t_int
##### Reconstruction from z (current state of estimation)
#print z_k, z_k.shape, type(z_k)
x_k = x_from_z(*z_k.ravel())
#print x_k, x_k.shape, type(x_k)
swp_k = np.array(swp_from_z(*z_k.ravel()))
#print s_k0, w_k0, p_k0
#print swp_k, swp_k.shape
b_k = np.array(b_from_z(*z_k.ravel()))
#print b_k0, b_k0.shape
#print b_k, b_k.shape
c_k = np.array(c_from_z(*z_k.ravel())).T
#print c_k0, c_k0.shape
#print c_k, c_k.shape
d_k = np.array(d_from_z(*z_k.ravel()))
#print d_k0, d_k0.shape
#print d_k, d_k.shape
## Recreate System matrices from z
A_k = AA(*z_k.ravel())
#print A_k, type(A_k), A_k.shape
B_k = BB(*z_k.ravel())
#print B_k, type(B_k), B_k.shape
C_k = CC(*z_k.ravel())
#print C_k, type(C_k), C_k.shape
D_k = DD(*z_k.ravel())
#print D_k, type(D_k), D_k.shape
##### Evaluation
# State space equation
dx_k = A_k.dot(x_k) + B_k.dot(u_k)
#print dx_k, dx_k.shape
y_k = C_k.dot(x_k) + D_k.dot(u_k)
#print y_k, y_k.shape
# Concate values for lambdafied lookups
zu_k = np.vstack((z_k, u_k.T))
#print zu_k, zu_k.shape
# current h
h_k = hh(*zu_k.ravel()) # h = y_k (predicted)
#print h_k,h_k.shape
# Current H
H_k = HH(*zu_k.ravel()) # h.Jacobian
#print H_k, H_k.shape
# Current f
f_k = ff(*zu_k.ravel()) # f
#print f_k, f_k.shape
# Current F
F_k = FF(*zu_k.ravel()) #f.Jacobian
#print F_k, F_k.shape
##### Time discrete Kalman Filter (DKF)
# Correction error covariance P_k1 = P + T*(F*P*F^T + Q)
P_k1 = P_k + T*(F_k.dot(P_k) + P_k.dot(F_k.T) + Q)
#print "P_k1:\n", P_k1, P_k1.shape
#print P_k, P_k.shape
# Compute Kalman gain: K = P_k * H^T * (H P_k H^T + R)^(-1)
K_k = P_k1.dot(H_k.T).dot(la.inv(H_k.dot(P_k1).dot(H_k.T) + R))
#print "K_k:\n", K_k, K_k.shape
# Update estimate with measurements z_k
# Innovation + Euler intergration
# z*_k = z_k + T*f_k* K*(y_t - h_k)
z_k1 = (z_k + T_internal*f_k + K_k.dot((y_t - h_k)))
#print "z_k1:", z_k1, z_k1.shape, type(z_k1)
#print "z_k:", z_k, z_k.shape, type(z_k)
# Error covariance P*_k: P* = (I - KH)*P_k
P_k = (np.eye(len(z)) - K_k.dot(H_k)).dot(P_k1) # P*_k
#print "P_k:\n", P_k, P_k.shape
# Project ahead
z_k = np.array(z_k1) # y_t is numpy matrix for some reason
#print "z_k:\n", z_k, z_k.shape, type(z_k)
#P_k = P_k1
#print "P_k:\n", P_k
# >>> Logging of estimated y (y_e) according to current state z
y_e.append(y_k[0,0])
# Naive tracking of performance
nor0.append(np.trace(P_k))
nor1.append(la.norm(P_k, 'fro'))
nor2.append(la.norm(P_k, np.inf))
nor3.append(la.norm(P_k, 1))
nor4.append(la.norm(P_k, 2))
print "------------------"
In [ ]:
# another approach to plot
real_part = real_vector(poles)
imag_part = imag_vector(poles)
# Display a window with a plot of real, imag
plt.plot(real_part, imag_part, 'b-')
plt.plot(real_vector(poles), imag_vector(poles), 'gx')
plt.plot(real_part[0], imag_part[0], 'r.')
plt.plot(real_part[-1], imag_part[-1], 'ro')
plt.show()
In [ ]:
#plt.rcParams['figure.figsize'] = [15, 7]
#import mpld3
#mpld3.enable_notebook()
def rolling_window(a, window):
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
def every_nth(arr, n):
return [range(0, len(arr))[::n], arr[::n]]
In [ ]:
ax1 = plt.subplot(2, 1, 1)
#ax1.plot(*every_nth(nor0, 100), label="trace")
ax1.plot(*every_nth(nor1, 100), label="frobenius")
#ax1.plot(*every_nth(nor2, 100), label="inf norm")
#ax1.plot(*every_nth(nor3, 100), label="1 norm")
#ax1.plot(*every_nth(nor4, 100), label="2 norm")
#ax1.plot(*every_nth(0.1*np.sum(rolling_window(np.abs(np.array(y)-np.array(y_e)), 2000), -1), 100), label="roll dist 2000")
#roll_dist_5000 = 0.1*np.sum(rolling_window(np.abs(np.array(y)-np.array(y_e)), 5000), -1)
#ax1.plot(*every_nth(roll_dist_5000, 100), label="roll dist 5000")
#mux_5000 = nor1[0:len(roll_dist_5000)] * roll_dist_5000
#ax1.plot(*every_nth(mux_5000, 100), label="mux")
#ax1.plot(*every_nth(1*np.sum(rolling_window(np.power(np.array(y)-np.array(y_e),2), 2000), -1), 100), label="roll dist^2 2000")
#ax1.plot(*every_nth(1*np.sum(rolling_window(np.power(np.array(y)-np.array(y_e),2), 5000), -1), 100), label="roll dist^2 5000")
ax1.legend()
# H-inf Norm in mov-window
# Entropy in mov-window
# Highpass
ax2 = plt.subplot(2, 1, 2, sharex=ax1)
plt.plot(y, 'b', label="real y")
plt.plot(u, 'r', label="setp u")
plt.plot(y_e, 'y-', label="esti y")
plt.show()
In [ ]:
#print A_k, A_k.shape
#print b_k, b_k.shape
#print c_k, c_k.shape
#print d_k, d_k.shape
ss = pc.ss(A_k, b_k, c_k, d_k)
#print poles[-1]
print ss
In [ ]:
print pc.dcgain(ss2)
print ss2.pole()
print ss2.zero()
print pc.dcgain(ss)
print ss.pole()
print ss.zero()
In [ ]:
yout3, T3 = pc.step_response(ss3)
plt.plot(yout3, T3, 'r')
#plt.show()
yout4, T4 = pc.step_response(ss)
plt.plot(yout4, T4, 'b--')
plt.show()
In [ ]:
In [ ]:
In [ ]:
In [ ]: