In [1]:
import numpy as np
import scipy
import scipy.signal
import scipy.fftpack

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
sns.set_style('whitegrid')
%matplotlib inline

In [ ]:
# run cell for interactive plots
import mpld3
mpld3.enable_notebook()

In [2]:
# impulse response log files
logs = [
    'implresp_v5.pb.cobs.gz',
    'implresp_v6.pb.cobs.gz',
    'implresp_v7.pb.cobs.gz',
]

# impulse response log files using
# 5 element moving average filter
logs_ma5 = [
    'implresp_v5_ma5.pb.cobs.gz',
    'implresp_v6_ma5.pb.cobs.gz',
    'implresp_v7_ma5.pb.cobs.gz',
]

In [3]:
%%javascript 
var kernel = Jupyter.notebook.kernel; 
var command = ["notebookName = ",
               "'", window.document.body.dataset.notebookName, "'" ].join('')
kernel.execute(command)



In [4]:
import os
import sys
file_dir = os.path.dirname(os.path.abspath(notebookName))
sys.path.append(os.path.abspath(os.path.join(file_dir, os.pardir)))
from plot_sim import plot_states
from load_sim import load_messages, get_records_from_messages, get_time_vector
from simulate_whipple_benchmark import simulate

In [5]:
records = []
for logname in logs:
    msg = load_messages(os.path.join(file_dir, logname))
    # ignore first sample as it is transmitted before the simulation loop
    data = get_records_from_messages(msg[1:])
    rec = dict()
    rec['data'] = data
    rec['metadata'] = msg[0]
    records.append(rec)

In [6]:
for rec in records:
    data = rec['data']
    m = rec['metadata']
    # redefine zero time
    t = get_time_vector(data)
    t -= t[0]
    rec['time'] = t
        
    fig, ax = plot_states(t, data.state[:, 1:],
                          second_yaxis=False, to_degrees=False)
    fig.set_size_inches(11, 6)
    
    try:
        title = 'impulse response, v = {}, {}'.format(
            m.model.v, m.gitsha1.f.decode())
        mpld3
    except NameError:
        fig.suptitle(title)
    else:
        ax.set_title(title)



In [7]:
def fft(x, sample_period, window_type=None):
    """ Calculate the Fourier transform for signal x. Input is assumed real and
    the first half of the frequencies and amplitudes are returned. window_type
    specifies the windowing function used, hamming is default.
    """
    if window_type is None:
        window_type = scipy.signal.hamming
    n = len(x)
    windowed_x = np.multiply(x, window_type(n))

    # only use first half of fft since real signals are mirrored about nyquist
    # frequency
    xf = 2/n * np.abs(scipy.fftpack.fft(windowed_x)[:n//2])
    freq = np.linspace(0, 1/(2*sample_period), n//2)
    return freq, xf

In [8]:
def roi_index(rec):
    steer_rate = rec['data'].state[:, 4]
    std = steer_rate.std()
    abs_rate = np.abs(steer_rate)
    ind_std = np.argwhere(abs_rate > 1*std)
    try:
        # this occurs for response decays after impulse
        i = np.argwhere(abs_rate > 2*std)[0][0]
    except IndexError:
        # use single standard dev for region of interest
        i = ind_std[0][0]
        j = ind_std[-1][0]
    else:
        # find index of max value
        i = np.argmax(abs_rate)
        j = np.argwhere(abs_rate > 0.2*std)[-1][0]
    #print('impulse after {} seconds, until {} seconds'.format(
    #    rec['time'][i], rec['time'][j]))
    return i, j

In [9]:
def plot_fft(states, dt, xlim=None):
    labels = [
        'roll angle',
        'steer angle',
        'roll rate',
        'steer rate',
    ]
    colors = np.roll(sns.color_palette('Paired', 10), 2, axis=0)
    
    xf = []
    for s in range(len(labels)):
        freq, xfs = fft(states[:, s], m.model.dt)
        xf.append(xfs)
    
    try:
        i = np.argwhere(freq < xlim[0])[-1][0]
    except:
        i = 0
    try:
        j = np.argwhere(freq > xlim[1])[0][0]
    except:
        j = -1
    fig, ax = plt.subplots()
    for s in range(len(labels)):
        ax.plot(freq[i:j], xf[s][i:j], color=colors[2*s + 1], label=labels[s])
    ax.set_yscale('log')
    ax.set_xlabel('frequency [Hz]')
    ax.set_ylabel('amplitude')
    ax.legend()
    return fig, ax

In [10]:
for rec in records:
    data = rec['data']
    m = rec['metadata']
    t = rec['time']
    i, j = roi_index(rec)
        
    fig1, ax1 = plot_states(t[i:j], data.state[i:j, 1:],
                          second_yaxis=False, to_degrees=False)
    fig1.set_size_inches(11, 6)
    try:
        title = 'impulse response (region of interest t=[{}:{}])'.format(
            t[i], t[j])
        title +=  ', v = {}, {}'.format(m.model.v, m.gitsha1.f.decode())
        mpld3
    except NameError:
        fig1.suptitle(title)
    else:
        ax1.set_title(title)
        
    fig2, ax2 = plot_fft(data.state[i:j, 1:], m.model.dt)
    fig2.set_size_inches(11, 6)
    try:
        title = 'state fft (region of interest), v = {}, {}'.format(
            m.model.v, m.gitsha1.f.decode())
        mpld3
    except NameError:
        fig2.suptitle(title)
    else:
        ax2.set_title(title)



In [11]:
# compute Wiener filter for steer rate signal
# use Savitzky-Golay filtered steer rate signal for
# our assumed original signal, window length 55, order 5
rec = records[0]
states = rec['data'].state[:, 1:]
t = rec['time']
m = rec['metadata']
steer_angle = states[:, 1]
steer_rate = states[:, 3]

# get savitzky-golay filtered steer rate
steer_rate_savgol = scipy.signal.savgol_filter(steer_rate, 55, 5)
# get savitzky-golay filtered steer angle
steer_angle_savgol = scipy.signal.savgol_filter(
    steer_angle, 111, 7, deriv=1, delta=m.model.dt)

# plot savgol and measured steer rate
colors = np.roll(sns.color_palette('Paired', 10), 2, axis=0)
fig, ax = plt.subplots()
ax.plot(t, steer_angle, label='steer angle',
        color=colors[2], zorder=0, alpha=0.5)
ax.plot(t, steer_rate, label='steer rate',
        color=colors[6], zorder=0, alpha=0.5)
ax.plot(t, steer_rate_savgol, label='steer rate savgol',
        color=colors[7],  zorder=1)
ax.plot(t, steer_angle_savgol, label='steer angle savgol',
        color=colors[3],  zorder=1)

def wiener(original_signal, observed_signal, filter_length):
    s = original_signal
    x = observed_signal
    M = filter_length
    
    # estimate cross PSDs using Welch technique
    f, Pxx = scipy.signal.csd(x, x, nperseg=M)
    f, Psx = scipy.signal.csd(s, x, nperseg=M)
    # compute Wiener filter
    H = Psx/Pxx
    # shift for causal filter
    H = H* np.exp(-1j*2*np.pi/len(H)*np.arange(len(H))*(len(H)//2))
    h = np.fft.irfft(H) # Wiender FIR taps

    # apply Wiener filter to observed signal
    y = np.convolve(np.hstack((np.zeros(M - 1), x)), h, mode='valid')
    return h, y

h8, y8 = wiener(steer_rate_savgol, steer_rate, 8)
h16, y16 = wiener(steer_rate_savgol, steer_rate, 16)
h32, y32 = wiener(steer_rate_savgol, steer_rate, 32)
h64, y64 = wiener(steer_rate_savgol, steer_rate, 64)

# add to plot
husl = sns.husl_palette(4)
ax.plot(t, y8, label='steer rate wiener 8',
        color=husl[0], zorder=1)
ax.plot(t, y16, label='steer rate wiener 16',
        color=husl[1], zorder=1)
ax.plot(t, y32, label='steer rate wiener 32',
        color=husl[2], zorder=1)
ax.plot(t, y64, label='steer rate wiener 64',
        color=husl[3], zorder=1)
ax.legend()
ax.set_xlabel('time [s]')
ax.set_ylabel('[rad, rad/s]')
fig.set_size_inches(11, 6)



In [12]:
def kalman(Ad, Bd, Cd, Q, R, P0, x0, z, u):
    n = z.shape[0]
    xhat = np.zeros((n, Ad.shape[0], 1))
    xhatminus = np.zeros(xhat.shape)
    P = np.zeros((n,) + Ad.shape)
    Pminus = np.zeros(P.shape)
    K = np.zeros((n,) + tuple(reversed(Cd.shape)))
    
    I = np.eye(Ad.shape[0])
    for i in range(n):
        if i == 0:
            # time upstate state
            xhatminus[i, :] = np.dot(Ad, x0) + np.dot(Bd, u[i, :])
            # time update error covariance
            Pminus[i, :] = np.dot(np.dot(Ad, P0), Ad.T) + Q
        else:
            # time upstate state
            xhatminus[i, :] = np.dot(Ad, xhat[i - 1, :]) + np.dot(Bd, u[i, :])
            # time update error covariance
            Pminus[i, :] = np.dot(np.dot(Ad, P[i - 1, :]), Ad.T) + Q
        
        # measurement update kalman gain
        S = np.dot(np.dot(Cd, Pminus[i, :]), Cd.T) + R
        K[i, :] = np.linalg.lstsq(S, np.dot(Cd, Pminus[i, :].T))[0].T
        # measurement update state
        xhat[i, :] = (xhatminus[i, :] +
                      np.dot(K[i, :], (z[i, :] - np.dot(Cd, xhatminus[i, :]))))
        P[i, :] = np.dot(I - np.dot(K[i, :], Cd), Pminus[i, :])
    return xhat, P, K

In [13]:
# compute Kalman filter estimate for steer rate signal
from plot_sim import get_kollmorgen_applied_torque, get_kistler_sensor_torque
from phobos.constants import sa

rec = records[0]
T_M = rec['T_M'] = get_kollmorgen_applied_torque(
    rec['data'].sensors.kollmorgen_actual_torque)
T_S = rec['T_S'] = get_kistler_sensor_torque(
    rec['data'].sensors.kistler_measured_torque)
m = (sa.FULL_ASSEMBLY_INERTIA_WITHOUT_WEIGHT -
     sa.UPPER_ASSEMBLY_INERTIA_PHYSICAL)

dt = rec['metadata'].model.dt
Ad = np.array([
    [1, dt],
    [0,  1]
])
Bd = np.array([
    [1/2*dt**2],
    [dt]
])
Cd = np.eye(2)
Q = np.array([
    [1/4*dt**4, 1/2*dt**3],
    [1/2*dt**3,     dt**2]
])
# measured steer angle/rate variance
## 2.03049-6
## 0.02604
# reduce steer angle measurement covariance
# due to time delay in steer rate measurement
R = np.array([
    [2.03049e-6 / 10,       0],
    [         0, 0.02604]
])
# steer angle and rate observations
z = np.reshape(
    rec['data'].state[:, [2, 4]],
    (-1, 2, 1))
u = np.reshape((T_M - T_S)/m, (-1, 1, 1))
P0 = np.zeros((2, 2))
x0 = np.zeros((2, 1))

# reduce points for plotting
limit = slice(0, 2030)
t = rec['time'][limit]
z = z[limit, :]
u = u[limit, :]

xhat = []
q = [int(i) for i in np.logspace(1, 3, 10)]
for qi in q:
    xhat.append(kalman(Ad, Bd, Cd, qi*Q, R, P0, x0, z, u)[0])

# get savitzky-golay filtered steer angle
steer_angle_savgol = scipy.signal.savgol_filter(
    np.squeeze(z[:, 0]), 111, 7, deriv=1, delta=dt)

colors = np.roll(sns.color_palette('Paired', 10), 2, axis=0)
fig, ax = plt.subplots()
ax.plot(t, z[:, 0], label='steer angle',
        color=colors[3], zorder=0)
ax.plot(t, z[:, 1], label='steer rate measured',
        linewidth=2,
        color=colors[8], zorder=0, alpha=0.9)
#ax.plot(t, (T_M - T_S)/m, label='steer accel (from torque diff)',
#        color=colors[8], zorder=0, alpha=0.9)
ax.plot(t, steer_angle_savgol, label='steer angle savgol, size=111, order=7, deriv=1',
        color=colors[5],  zorder=1)

import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    qcolors = sns.diverging_palette(10, 275, n=len(q))
for i in range(len(q)):
    ax.plot(t, xhat[i][:, 1], label='steer rate kalman q={}'.format(q[i]),
            color=qcolors[i], zorder=1)
    
ax.legend()
ax.set_xlabel('time [s]')
ax.set_ylabel('[rad, rad/s]')
fig.set_size_inches(11, 6)