Extended Kalman Filter design for bicycle's real data


In [1]:
# Import dependencies
from __future__ import division, print_function
%matplotlib inline

import scipy

from BicycleTrajectory2D import *
from BicycleUtils import *
from FormatUtils import *
from PlotUtils import *

from DatasetHelper import *
from RealDatasetImporter import *

In [2]:
global_path = '../../bags/experiments/10_09_2017/csv/'
file_name = 'field_half/field_half_preprocessed.csv'
#file_name = 'field_external_optical/field_external_optical_preprocessed.csv'

#global_path = '../../bags/experiments/08_07_2017/csv/'
#file_name = 'football_field_external/football_field_external_preprocessed.csv'
#file_name = 'parking_2/parking_2_preprocessed.csv'
#file_name = 'football_field_line_3/football_field_line_3_preprocessed.csv'
#file_name = 'mensa_to_mensa/mensa_to_mensa_preprocessed.csv'

# Import CSV as pandas dataframe and define time as index
data = pd.read_csv(global_path + file_name, index_col=0, parse_dates=True)
data['time_index'] = pd.to_datetime(data['time'])
data = data.set_index('time_index', drop=True, verify_integrity=True)
data['time'] = data.index
di = RealDatasetHelper(data)

# Convert pandas DataFrame into np matrix
linear_a = data[[di.linear_a]].as_matrix()
angular_phi = data[[di.angular_vel_phi]].as_matrix()
angular_delta = data[[di.angular_vel_delta]].as_matrix()

# select time: convert from pandas to numpy and fix initial offset
time = data[[di.time]].as_matrix()
time = time.astype('float64')/1e9
time = time - time[0]

dpi = 150

In [3]:
# Select which velocity to use: wheel encoder of GSP -based:

# [v, v_scale] = [di.real_v, 1.0]  # use Wheel encoder velocity, 0.23 scale for old datasets
[v, v_scale] = [di.real_v_gps, 1.0]  # use GPS velocity

# Use optical steering angle or IMU steering angle
use_optical_steering = True

sim = di.data.filter(items=[di.real_xf, di.real_xr, di.real_yf, di.real_yr,
                            di.real_zf, di.real_zr, di.real_za,
                            di.real_delta if use_optical_steering else di.real_delta_imu, 
                            di.real_psi, di.real_phi]).as_matrix()

sim_view = sim.copy()    

# Fix initial offset:
offset_x = sim[0, 0]
offset_y = sim[0, 2]
offset_delta = sim_view[0, 7]

sim_view[:, 0] -= offset_x
sim_view[:, 1] -= offset_x
sim_view[:, 2] -= offset_y 
sim_view[:, 3] -= offset_y
sim_view[:, 7] -= offset_delta

# select imputs:
U = di.data.filter(items=[v, di.angular_vel_phi, di.angular_vel_delta]).as_matrix()
U[:, 0] *= v_scale

# plot state variables
path_output_simulation =  'experiments/' + file_name.split("/")[0] + '/'
    
plot_real_data_state_variables(U=U, sim=sim_view, time=time, file_name=path_output_simulation, dpi=dpi)



In [4]:
class EKF_sigma_model_fusion(object):
    """Implements an EKF to bicycle model"""
    def __init__(self, xs, P, R_std, Q_std, wheel_distance=1.2, dt=0.1, alpha=1.0):
        self.w = wheel_distance        #Set the distance between the wheels
        self.xs = xs    #Set the initial state
        self.P = P      #Set the initial Covariance
        self.dt = dt
        self.R_std = R_std
        self.Q_std = Q_std
        self.alpha = alpha
        self.K = np.zeros((6, 6)) # Kalman gain
        
        #Set the process noise covariance
        self.Q = np.diag([self.Q_std[0], # v
                          self.Q_std[1], # phi_dot
                          self.Q_std[2]  # delta_dot
                          ])
        
        # Set the measurement noise covariance
        self.R = np.diag([self.R_std[0],  # xf
                          self.R_std[1],  # xr
                          self.R_std[2],  # yf
                          self.R_std[3],  # yr
                          self.R_std[4],  # zf
                          self.R_std[5],  # zr
                          self.R_std[6],  # za
                          self.R_std[7],  # sigma
                          self.R_std[8],  # psi
                          self.R_std[9]]) # phi
        
        # Linear relationship H -  z = Hx
        self.H = np.zeros((10, 6))  # 10 measurements x 6 state variables
        [self.H[0, 0], self.H[1, 0]] = [1.0, 1.0]  # x
        [self.H[2, 1], self.H[3, 1]] = [1.0, 1.0]  # y
        [self.H[4, 2], self.H[5, 2], self.H[6, 2]] = [1.0, 1.0, 1.0]  # z
        [self.H[7, 3], self.H[8, 4], self.H[9, 5]] = [1.0, 1.0, 1.0]  # sigma - psi - phi
        
    def Fx(self, xs, u):
        """ Linearize the system with the Jacobian of the x """
        F_result = np.eye(len(xs))
        
        v = u[0]
        phi_dot = u[1]
        delta_dot = u[2]
        
        sigma = xs[3]
        psi = xs[4]
        phi = xs[5]
        t = self.dt

        F04 = -t * v * np.sin(psi) 
        F14 = t * v * np.cos(psi)
        F33 = (2 * t * delta_dot * sigma * self.w) + 1
        F43 = (t * v)/np.cos(phi)
        F45 = t * sigma * v * np.sin(phi) / np.cos(phi)**2

        F_result[0, 4] = F04
        F_result[1, 4] = F14
        F_result[3, 3] = F33
        F_result[4, 3] = F43
        F_result[4, 5] = F45

        return F_result
    
    def Fu(self, xs, u):
        """ Linearize the system with the Jacobian of the u """
        v = u[0]
        phi_dot = u[1]
        delta_dot = u[2]
        
        sigma = xs[3]
        psi = xs[4]
        phi = xs[5]
        t = self.dt
        
        V_result = np.zeros((len(xs), len(u)))
        
        V00 = t * np.cos(psi)
        V10 = t * np.sin(psi)
        V32 = (t/self.w)*((sigma**2)*(self.w**2) + 1)
        V40 = t * sigma / np.cos(phi)
        V51 = t

        V_result[0, 0] = V00
        V_result[1, 0] = V10
        V_result[3, 2] = V32
        V_result[4, 0] = V40
        V_result[5, 1] = V51

        return V_result
    
    def f(self, xs, u):
        """ Estimate the non-linear state of the system """
        v = u[0]
        phi_dot = u[1]
        delta_dot = u[2]
        
        sigma = xs[3]
        psi = xs[4]
        phi = xs[5]
        t = self.dt
               
        fxu_result = np.zeros((len(xs), 1))
        
        fxu_result[0] = xs[0] + t * v * np.cos(psi)
        fxu_result[1] = xs[1] + t * v * np.sin(psi)
        fxu_result[2] = xs[2]
        fxu_result[3] = xs[3] + (t*phi_dot/self.w)*((sigma**2)*(self.w**2) +1)
        fxu_result[4] = xs[4] + t * v * sigma / np.cos(phi)
        fxu_result[5] = xs[5] + t * phi_dot
        
        return fxu_result

    def h(self, x):
        """ takes a state variable and returns the measurement
        that would correspond to that state. """   
        sensor_out = np.zeros((10, 1))
        sensor_out[0] = x[0]
        sensor_out[1] = x[0]
        sensor_out[2] = x[1]
        sensor_out[3] = x[1]
        sensor_out[4] = x[2]
        sensor_out[5] = x[2]
        sensor_out[6] = x[2]
        sensor_out[7] = x[3] # sigma
        sensor_out[8] = x[4] # psi
        sensor_out[9] = x[5] # phi
        
        return sensor_out

    def Prediction(self, u):
        x_ = self.xs
        P_ = self.P
        self.xs = self.f(x_, u)
        self.P = self.alpha * self.Fx(x_, u).dot(P_).dot((self.Fx(x_,u)).T) + \
             self.Fu(x_,u).dot(self.Q).dot((self.Fu(x_,u)).T)
    
    def Update(self, z):
        """Update the Kalman Prediction using the meazurement z"""
        y = z - self.h(self.xs)
        self.K = self.P.dot(self.H.T).dot(np.linalg.inv(self.H.dot(self.P).dot(self.H.T) + self.R))
        
        self.xs = self.xs + self.K.dot(y)
        self.P = (np.eye(len(self.xs)) - self.K.dot(self.H)).dot(self.P)

In [5]:
[t, wheel_distance, number_state_variables] = [0.0, 1.1, 6]

z = sim

# set UTM offset at first measurement
utm_offset_x = z[0, 0]
utm_offset_y = z[0, 2]
utm_offset_z = z[0, 4]

X_init = np.array([utm_offset_x, utm_offset_y, utm_offset_z, 0.0, 0.0, 0.0])  # [x, y, z, sigma, psi, phi]

alpha = 1.06

# covariance matrix
P = np.eye(number_state_variables) * 10

dt = 1.0/5.0 # Slower Sample Rate of the Measurements is 5Hz

# defining how bad things may goes, take max acceleratin value x margin
margin = 2

# process noise covariance Q -----------------------------------------------------------------------
# Maximum change (acceleration) for given dataset
max_acc_v = float(di.data[[di.real_v]].diff().max()) * margin
max_acc_phi_dot = float(di.data[[di.angular_vel_phi]].diff().max()) * margin
max_acc_delta_dot = float(di.data[[di.angular_vel_delta]].diff().max()) * margin

sigma_v = (max_acc_v*dt)**2
sigma_phi_dot = (max_acc_phi_dot*dt)**2
sigma_delta_dot = (max_acc_delta_dot*dt)**2

Q_std = [sigma_v, sigma_phi_dot, sigma_delta_dot] # v, phi_dot, delta_dot

# print("max_acc_v: " + str(max_acc_v))
# print("max_acc_phi_dot: " + str(max_acc_phi_dot))
# print("max_acc_delta_dot: " + str(max_acc_delta_dot))

# measurement noise covariance R ---------------------------------------------------------------------
R_std = [0.1**2, 0.1**2,  # x
         0.1**2, 0.1**2,  # y
         0.1**2, 0.1**2, 0.1**2,  # z
         0.001**2, 0.001**2, 0.005**2] # delta - psi - phi

#[offset_psi, offset_phi, offset_delta] = [-1.0, 0.0, 0.05]
[offset_psi, offset_phi, offset_delta] = [0.0, 0.0, 0.0]

filter_ekf = EKF_sigma_model_fusion(X_init, P, R_std=R_std, Q_std=Q_std, wheel_distance=wheel_distance, dt=dt, alpha=alpha)

Ut = np.array([0.0, 0.0, 0.0]) # [v, phi_dot, delta_dot]

xs = np.zeros((len(time), number_state_variables))
ps = np.zeros((len(time), number_state_variables, number_state_variables))
PU = np.zeros((len(time), number_state_variables))
KU = np.zeros((len(time), number_state_variables))
z_t = np.zeros((10, 1))
t = range(1, len(time))

dt_real = dt

for i in range(0, len(time)): 
    if i > 1:
        dt_real = float(time[i] - time[i-1]) # time: nano to seg
        
    filter_ekf.dt = dt_real
    
    P = filter_ekf.P
    K = filter_ekf.K
    PU[i] = [P[0,0], P[1,1], P[2,2], P[3,3], P[4,4], P[5,5]]
    KU[i] = [K[0,0], K[1,1], K[2,2], K[3,3], K[4,4], K[5,5]]
    xs[i] = filter_ekf.xs.T
    
    # update U
    Ut[0] = float(U[i, 0])
    Ut[1] = float(U[i, 1])
    Ut[2] = float(U[i, 2])
    
    # predict
    filter_ekf.Prediction(Ut)

    #Update measurements [xf, xr, yf, yr, zf, zr, za, delta, psi, phi]
    z_t[0] = z[i, 0] # xf
    z_t[1] = z[i, 1] # xr
    z_t[2] = z[i, 2] # yf
    z_t[3] = z[i, 3] # yr
    z_t[4] = z[i, 4] # zf
    z_t[5] = z[i, 5] # zr
    z_t[6] = z[i, 6] # za
    z_t[7] = np.tan(z[i, 7])/wheel_distance  # sigma
    z_t[8] = z[i, 8]  # psi
    z_t[9] = z[i, 9]  # phi

    filter_ekf.Update(z_t)

# update delta based on sigma
xs[:, 3] = np.arctan2(xs[:, 3], 1/wheel_distance) # delta

# Fix initial offset (for visualization):
xs_view = xs.copy()
xs_view[:, 0] -= offset_x
xs_view[:, 1] -= offset_y

z_view = z.copy()
z_view[:, 0] -= offset_x 
z_view[:, 1] -= offset_x 
z_view[:, 2] -= offset_y
z_view[:, 3] -= offset_y 

path_output_filter = 'filters/EKF/experiments/' + file_name.split("/")[0] + "/"
plot_filter_results_real_data(xs=xs_view, sim=z_view, time=time, file_name=path_output_filter, filter_name="EKF",
                              dpi=dpi, format='png')



In [6]:
plot_EKF_gain_covariance(time, KU, PU, path_output_filter, autoscale_axis=True, format_file="png")


Export to Google Earth


In [7]:
import utm
import datetime
from simplekml import Kml, Model, AltitudeMode, Orientation, Scale

# Get real time
time = di.data[[di.time]].as_matrix()

# Get gps_zone_number and  gps_zone_letter
gps_zone = di.data[[di.gps_zone_number, di.gps_zone_letter]].as_matrix()

car={}
car['when']=[]
car['filter']=[]
car['gps_front']=[]
car['gps_rear']=[]

for i in range(0, len(time)):
    (lat_filter, lon_filter) = utm.to_latlon(xs[i, 0], xs[i, 1], gps_zone[i, 0], gps_zone[i, 1])
    (lat_gps_front, lon_gps_front) = utm.to_latlon(z[i, 0], z[i, 2], gps_zone[i, 0], gps_zone[i, 1])
    (lat_gps_rear, lon_gps_rear) = utm.to_latlon(z[i, 1], z[i, 3], gps_zone[i, 0], gps_zone[i, 1])
    
    d = datetime.datetime.fromtimestamp(float(time[i])/1e9)
    car["when"].append(d.strftime("%Y-%m-%dT%H:%M:%SZ"))
    car["filter"].append((lon_filter, lat_filter , 0))
    car["gps_front"].append((lon_gps_front, lat_gps_front, 0))
    car["gps_rear"].append((lon_gps_rear, lat_gps_rear, 0))

# Create the KML document
kml = Kml(name=d.strftime("%Y-%m-%d %H:%M"), open=1)

# Create the track
trk = kml.newgxtrack(name="EKF", altitudemode=AltitudeMode.clamptoground,
                     description="State Estimation from EKF")

gps_front = kml.newgxtrack(name="GPS FRONT", altitudemode=AltitudeMode.clamptoground,
                     description="Raw data from GPS FRONT (input to filter)")

gps_rear = kml.newgxtrack(name="GPS REAR", altitudemode=AltitudeMode.clamptoground,
                     description="Raw data from GPS REAR (input to filter)")

# Add all the information to the track
trk.newwhen(car["when"])
trk.newgxcoord(car["filter"])

gps_front.newwhen(car["when"][::5])
gps_front.newgxcoord((car["gps_front"][::5]))

gps_rear.newwhen(car["when"][::5])
gps_rear.newgxcoord((car["gps_rear"][::5]))

# Style of the Track
trk.iconstyle.icon.href = "http://maps.google.com/mapfiles/kml/paddle/red-circle.png"
trk.labelstyle.scale = 1
trk.linestyle.width = 4
trk.linestyle.opacity = 1
trk.linestyle.color = '64F01414' # aabbggrr

gps_front.iconstyle.icon.href = ""
gps_front.labelstyle.scale = 0
gps_front.linestyle.width = 4
gps_front.linestyle.opacity = 1
gps_front.linestyle.color = '6414B400'

gps_rear.iconstyle.icon.href = ""
gps_rear.labelstyle.scale = 0
gps_rear.linestyle.width = 4
gps_rear.linestyle.opacity = 1
gps_rear.linestyle.color = '6414F00A'

# Saving
name_output_kmz = path_output_filter + "EKF_google_earth.kmz"
kml.savekmz(name_output_kmz)

print("Google Earth file saved into: " + name_output_kmz)


Google Earth file saved into: filters/EKF/experiments/field_half/EKF_google_earth.kmz

In [ ]: