This script is necessary to recompute the parameters when kinematic apex data are "sub-frame" computed.
Requires IPython to run in "pylab" mode. If this is not the case, at least insert
from pylab import *
somewhere
In [1]:
%connect_info
In [1]:
import mutils.io as mio
import mutils.misc as mi
import os
import re
ws = mio.saveable()
ws.subject = 7
ws.subject_doc = 'id of selected subject'
ws.ttype = 1
ws.ttype_doc = 'trial type (here: only 1 allowed)'
# load SLIP data
ws.files = []
ws.files_doc = 'list of filenames of (original) SLIP data'
#datadir = '../data/2011-mmcl_mat/SLIP/'
ws.datadir='data/2011-mmcl_mat/SLIP/new/'
ws.datadir_doc = 'absolute path to SLIP data'
ws.datadir_out='data/2011-mmcl_mat/SLIP/new4/'
!mkdir -p 'data/2011-mmcl_mat/SLIP/new4/'
ws.datadir_out_doc = 'absolute path to store the recomputed SLIP data'
for fn in os.listdir(ws.datadir):
if re.match('^params3D_s%it%ir[0-9]' % (ws.subject, ws.ttype), fn):
if fn != 'params3D_s8t1r2.dict': # this dataset is corrputed
ws.files.append(fn)
ws.files.sort()
# NOTE: use ESLIP_params, *not* SLIP_params
ws.SlipData = [mi.Struct(mio.mload(ws.datadir + fn)) for fn in ws.files]
ws.SlipData_doc = 'list of original SLIP data (one for each trial)'
for d in ws.SlipData:
d.IC = vstack([d.y0, d.vx, d.vz]).T[:-1,:]
d.P = hstack([d.ESLIP_params[:, :4], hstack(d.dE)[:, newaxis]])
In [2]:
# load continous data
ws.k = mio.KinData(data_dir = 'data/2011-mmcl_mat/')
ws.k.selection = ['com_x', 'com_y', 'com_z']
ws.k.load(ws.subject, ws.ttype)
ws.k_doc = 'KinData object that handles the continuous data'
In [3]:
# find apices (at sampling rate accuracy)
all_apices = []
all_apexphases = []
for r in ws.k.raw_dat:
apices = []
apexphases = []
c = r['com'][:, 2]
phi = r['phi2'].squeeze()
rising = c[1] > c[0]
for idx, z in enumerate(c[1:]):
if rising and c[idx] > z:
apices.append(idx)
apexphases.append(phi[idx])
rising = False
if z >= c[idx]:
rising = True
all_apices.append(apices[:])
all_apexphases.append(apexphases[:])
# find nadirs (at sampling rate accuracy)
all_nadirs = []
all_nadirphases = []
for r in ws.k.raw_dat:
nadirs = []
nadirphases = []
c = r['com'][:, 2]
phi = r['phi2'].squeeze()
falling = c[1] < c[0]
for idx, z in enumerate(c[1:]):
if falling and c[idx] < z:
nadirs.append(idx)
nadirphases.append(phi[idx])
falling = False
if z <= c[idx]:
falling = True
all_nadirs.append(nadirs[:])
all_nadirphases.append(nadirphases[:])
ws.n_idcs = []
ws.n_idcs_doc = 'list of indices of (nearby) nadir events (one for each trial)'
ws.a_idcs = []
ws.a_idcs_doc = 'list of indices of (nearby) apex events (one for each trial)'
ws.n_phi = []
ws.n_phi_doc = 'list of phases of (nearby) nadir events (one for each trial)'
ws.a_phi = []
ws.a_phi_doc = 'list of phases of (nearby) apex events (one for each trial)'
nr = 0
for n, a, np, ap in zip(all_nadirs, all_apices, all_nadirphases, all_apexphases):
firstidx = 0
while n[firstidx] < a[0]:
firstidx += 1
#print "first:", firstidx, n[firstidx], a[firstidx], n[firstidx] - a[0]
ws.n_idcs.append(n[firstidx:][:])
ws.a_idcs.append(a)
ws.n_phi.append(np[firstidx:][:])
ws.a_phi.append(ap)
In [4]:
# interpolate apex and nadir values
ws.nx_idcs = []
ws.nx_idcs_doc = 'list of indices of (exact) nadir events (one for each trial)'
ws.ax_idcs = []
ws.ax_idcs_doc = 'list of indices of (exact) apex events (one for each trial)'
ws.nx_vals = []
ws.nx_vals_doc = 'list of heights of nadir events (one for each trial)'
ws.ax_vals = []
ws.ax_vals_doc = 'list of heights of apex events (one for each trial)'
for rep in range(len(ws.n_idcs)):
c = ws.k.raw_dat[rep]['com'][:, 2]
# start with nadir events
nx_idx = []
nx_val = []
for nidx in ws.n_idcs[rep]:
delta = 3
if nidx < 3 or nidx > 60000 - 3:
delta = 1
print "WARNING - nadir close to border detected"
pos, val = mi.get_minmax(c[nidx-delta:nidx+delta])
pos += nidx-delta
nx_idx.append(pos)
nx_val.append(val)
ws.nx_vals.append(nx_val)
ws.nx_idcs.append(nx_idx)
# continue with apex events
ax_idx = []
ax_val = []
for aidx in ws.a_idcs[rep]:
delta = 3
if aidx < 3 or aidx > 60000 - 3:
delta = 1
print "WARNING - apex close to border detected"
pos, val = mi.get_minmax(c[aidx-delta:aidx+delta])
pos += aidx-delta
if abs(val - c[aidx]) > .0001:
print aidx, ": val:", val, " orig:", c[aidx]
ax_idx.append(pos)
ax_val.append(val)
ws.ax_vals.append(ax_val)
ws.ax_idcs.append(ax_idx)
print "number of trials:", len(all_apices)
In [5]:
# sanity check: (almost) every nadix is AFTER the corresponding apex
if min(hstack([array(ws.nx_idcs[t])[:500] - array(ws.ax_idcs[t])[:500] for t in range(len(ws.nx_idcs))])) < 0:
raise ValueError, "This should not happen!"
In [7]:
import models.fitSlip as fit
reload(fit)
t = linspace(0, 240, 60000, endpoint=False)
dt = t[2] - t[1]
ka = ws.k.get_kin_apex(ws.a_phi) # list of 6-by-n apex states
# add belt speed. add this also to the dictionary that will be saved!
for rep in range(len(all_apices)):
vb = mean(ws.k.raw_frc[rep]['vb'])
mass = ws.SlipData[rep].mass
#raise NotImplementedError('PHASES: calculate new phases from raw_dat!')
#phases = ws.SlipData[rep].phases
phi = ws.k.raw_dat[rep]['phi2'].squeeze()
mdp = mean(diff(phi))
phases = []
orig = ws.SlipData[rep].ESLIP_params
ESLIP_params = []
SLIP_params = ws.SlipData[rep].SLIP_params # just copy to keep format intact
vx = []
y0 = []
vz = []
ymin = []
dE = []
T_exp = []
for step in range(len(all_apices[rep]) - 1):
T = (ws.ax_idcs[rep][step+1] - ws.ax_idcs[rep][step]) * dt
dp = (1. - mod(ws.ax_idcs[rep][step], 1))*(phi[ws.ax_idcs[rep][step]+1] - phi[ws.ax_idcs[rep][step]])
if dp > 2*mdp:
raise ValueError("Interpolation between two sampled phases gave too large value!")
phi0 = phi[ws.ax_idcs[rep][step]] + dp
IC = [ws.ax_vals[rep][step], ka[rep][4, step] + vb, ka[rep][3, step]] # [y0, vx, vz]
FS = [ws.ax_vals[rep][step+1], ka[rep][4, step+1] + vb, ka[rep][3, step+1]] # [y0, vx, vz]
E0 = mass * 9.81 * IC[0] + .5 * mass * (IC[1]**2 + IC[2]**2)
Ee = mass * 9.81 * FS[0] + .5 * mass * (FS[1]**2 + FS[2]**2)
dE_l = Ee - E0 # name "dE" already taken
dE.append(dE_l)
T_exp.append(T)
phases.append(phi0)
vx.append(IC[1])
y0.append(IC[0])
vz.append(IC[2])
ymin.append(ws.nx_vals[rep][step])
if False: # old code
# model- and step parameters
mp = { 'IC' : IC,
'm' : mass,
'dE' : dE_l}
sp = (ws.ax_vals[rep][step+1], T ,
ws.nx_vals[rep][step], ka[rep][3, step+1])
#Initial guess: take from "original", but modify according to modified means mean
if len(ESLIP_params) < 10:
x0 = orig[step, :]
x0[2] -= .01
else:
x0 = mean(vstack(ESLIP_params), axis=0) + (orig[step,:] - mean(orig, axis=0))
x0[2] -= .005
# check if starting condition is valid:
while x0[2] * sin(x0[1]) > IC[0]:
print "updated starting parameter"
x0[2] -= .01
#Initial guess: take from "original", but modify according to modified means mean
# NOTE: actually, x0[4] is the wrong value, but it's ignored in the calculation.
if len(ESLIP_params) < 10:
x0 = orig[step, :5]
x0[2] -= .01
else:
x0 = mean(vstack(ESLIP_params), axis=0) + (orig[step,:5] - mean(orig[:, :5], axis=0))
x0[2] -= .005
#fit.calcSlipParams3D2(
pars = fit.calcSlipParams3D2(IC, mass, array(FS), ws.nx_vals[rep][step], T) # P0 = x0)
#ESLIP_params.append( fit.calcSlipParams3D(sp,mp, x0[:4]), )
ESLIP_params.append( pars)
if not mod(step, 50):
print "solution found (step:", step, "rep:", rep, ")"
# store data
fn = ws.datadir_out + ws.files[rep]
ESLIP_params = vstack(ESLIP_params)
# use "old" format.
ESLIP_out = hstack([ESLIP_params[:, :4], zeros((ESLIP_params.shape[0], 2))])
dE = ESLIP_params[:,4]
dat = {
'SLIP_params' : SLIP_params,
'T_exp' : T_exp,
'phases' : phases,
'dE' : array(dE),
'P' : ESLIP_params, # hstack([vstack(ESLIP_params)[:, :4], array(dE)[:, newaxis]]),
'IC' : vstack([y0, vx, vz]).T,
'ESLIP_params' : ESLIP_params,
'mass' : mass,
'vx' : array(vx),
'y0' : array(y0),
'vz' : array(vz),
'ymin' : array(ymin),
'vb' : vb
}
mio.msave(fn, dat)
print "data stored"
print "done!"
In [ ]:
raise NotImplementedError("STOP")
In [ ]:
x = mio.mload(ws.datadir + ws.files[0])
x.keys()
In [ ]:
#import models.sliputil as su
#ESLIP_params.append( fit.calcSlipParams3D(sp,mp, x0[:4]), )
#ESLIP_params[-1]
print ESLIP_params[-1]
pars = hstack([ESLIP_params[-1][:4], dE_l])
print pars
su.finalState(IC, pars, addDict={'m' : mass, 'g' : -9.81})
In [ ]:
dt
In [ ]:
print len(ESLIP_params)
figure()
titles=['k','alpha','l0','beta']
for dim in range(4):
subplot(2,2,dim+1)
plot(vstack(ESLIP_params)[:, dim],'r+')
plot(orig[:, dim],'go')
title(titles[dim])
In [ ]:
print len(ws.SlipData[rep].phases)
print x.ESLIP_params.shape
print len(ws.ax_idcs[-1])
In [ ]:
## Things required to compute:
#ESLIP_params (nx6)
#T_exp <- from exact apices
#phases <- from "original" SLIP data; verify that apices
#dE <- compute from kinematics
#mass <- from "original" SLIP data
#vx <- get_kin_apex
#y0 <- interpolated
#vz <- get_kin_apex
#ymin <- interpolated
# vb <- from force data
# IC
## P
In [ ]:
#verify data
t_vec = linspace(0, 240, 60000, endpoint=False)
figure()
plot(t_vec, c, 'b.-', label='com vertical')
plot(t_vec[apices], c[apices], 'rd', label='apices')
plot(t_vec[nadirs], c[nadirs], 'gd', label='nadirs')
legend()
In [ ]:
# apex times (list; one for each trial)
T = k.get_kin_apex(all_apexphases, return_times=True)
# apex heights for a single trial
zi = []
for idx, apt in zip(apices, T[-1]):
p = polyfit(tvec[idx-3:idx+4], c[idx-3:idx+4], 2)
zi.append(polyval(p, apt))
zi = array(zi)
In [ ]:
# how to compute slip params later on
In [ ]:
z2 = k.get_kin_apex(all_apexphases) # !NOT! suitable for CoM_z!
In [ ]:
T[0].shape
In [ ]:
In [ ]:
z = c[apices]
In [ ]:
figure()
hist(mod(mi.upper_phases(hstack(all_apexphases), sep=0), 2.*pi), bins=100)
hist(mod(mi.upper_phases(hstack([d.phases for d in SlipData]), sep=0), 2.*pi), color='g', alpha=.5, bins=100)
In [ ]:
# prepare: load data
import models.sliputil as su
import models.fitSlip as fitSlip
import mutils.io as mio
reload(mio)
k = mio.KinData()
k.selection = ['com_x', 'com_y', 'com_z']
k.load(subject,ttype)
vBelt = [mean(frc['vb']) for frc in k.raw_frc]
idx_right = [mi.upper_phases(d.phases[:-1], sep=0, return_indices=True) for d in SlipData]
idx_left = [mi.upper_phases(d.phases[:-1], sep=0, return_indices=True) for d in SlipData]
trial_starts_right = [x < y for x, y in zip(idx_right, idx_left)]
apexstates = [part[[2,4,3], :].T for part in k.get_kin_apex([d.phases for d in SlipData],)]
apextimes = [part for part in k.get_kin_apex([d.phases for d in SlipData], return_times = True)]
all_dE = []
all_T = []
for apx, d, T, vB in zip(apexstates, SlipData, apextimes, vBelt):
energies = [d.mass * ( 9.81 * apx[row, 0] + .5 * ((apx[row, 1] + vB)**2 + apx[row, 2]**2)) for row in arange(apx.shape[0])]
all_dE.append(diff(energies))
all_T.append(diff(T))
all_step_params = []
for apx, d, T in zip(apexstates, SlipData, all_T):
step_params = [ (y2, dT, ymin, vz2) for y2, dT, ymin, vz2 in zip(apx[1:, 0], T, d.ymin, apx[1:, 2]) ]
all_step_params.append(step_params)
In [ ]:
import time
for rep in arange(len(all_dE)):
new_params = []
delta_E = all_dE[rep]
apx = apexstates[rep]
d = SlipData[rep]
all_IC = []
for nr in arange(len(delta_E)):
if mod(nr, 20) == 0:
print time.ctime(), " rep: ", k.reps[rep], "step nr ", nr + 1, " of ", len(delta_E)
all_IC.append(apx[nr, :] + [0, vBelt[rep], 0])
model_params = {'IC': apx[nr, :] + [0, vBelt[rep], 0],
'm': d.mass,
'dE': delta_E[nr] }
step_params = all_step_params[rep]
x0 = d.P[nr, :4]
sp = step_params[nr]
p = fitSlip.calcSlipParams3D(sp, model_params, x0, )
new_params.append(hstack([p[:4], delta_E[nr]]))
# store data
print "storing data ..."
dat = { 'phases': d.phases,
'IC' : all_IC,
'P' : new_params,
'mass' : d.mass,
'ymin' : d.ymin,
'T' : all_T[rep],
'vBelt' : vBelt[rep],
'g' : -9.81}
mio.msave('../data/2013-newSlip/SLIP_s%it%ir%i.dict' % (subject, ttype, k.reps[rep]), dat)
print "done!"
In [ ]:
import mutils.misc as mi
subject = 2
ttype = 1
rep = 4
stepnr = 590
dat = mi.Struct(mio.mload('../data/2013-newSlip/SLIP_s%it%ir%i.dict' % (subject, ttype, rep)))
print "simulation:", su.finalState(dat.IC[stepnr], dat.P[stepnr], {'m' : dat.mass, 'g': dat.g})
print '==================='
print "experiment:", dat.IC[stepnr + 1]
In [ ]: