In [2]:
import scipy.integrate
import scipy.sparse
import numpy as np
import collections
import matplotlib.pyplot as plt
%matplotlib inline
class IVPResult:
pass
def solve_ivp(f, ts, x0, p=None, integrator='dopri5', store_trajectory=False,
sens=False, dfx=None, dfp=None):
"""
Solve initial value problem
d/dt x = f(t, x, p); x(t0) = x0.
Evaluate Solution at time points specified in ts, with ts[0] = t0.
Compute sensitivity matrices evaluated at time points in ts if sens=True.
"""
if sens:
def f_vode(t, xGxGp, p):
x = xGxGp[:x0.shape[0]]
Gx = xGxGp[x0.shape[0]:x0.shape[0]+x0.shape[0]*x0.shape[0]]
Gx = Gx.reshape([x0.shape[0], x0.shape[0]])
Gp = xGxGp[x0.shape[0]+x0.shape[0]*x0.shape[0]:]
Gp = Gp.reshape([x0.shape[0], p.shape[0]])
dx = f(t, x, p)
dfxev = dfx(t, x, p)
dfpev = dfp(t, x, p)
dGx = dfxev.dot(Gx)
dGp = dfxev.dot(Gp) + dfpev
return np.concatenate(
[dx,
dGx.reshape(-1),
dGp.reshape(-1)])
ivp = scipy.integrate.ode(f_vode)
else:
ivp = scipy.integrate.ode(f)
ivp.set_integrator(integrator)
if store_trajectory:
times = []
points = []
def solout(t, x):
if len(times) == 0 or t != times[-1]:
times.append(t)
points.append(np.copy(x[:x0.shape[0]]))
ivp.set_solout(solout)
if sens:
ivp.set_initial_value(np.concatenate(
[x0,
np.eye(x0.shape[0]).reshape(-1),
np.zeros([x0.shape[0], p.shape[0]]).reshape(-1)
]), ts[0])
else:
ivp.set_initial_value(x0, ts[0])
ivp.set_f_params(p)
result = IVPResult()
result.ts = ts
result.xs = np.zeros([ts.shape[0], x0.shape[0]])
if sens:
result.Gxs = np.zeros([ts.shape[0], x0.shape[0], x0.shape[0]])
result.Gps = np.zeros([ts.shape[0], x0.shape[0], p.shape[0]])
result.success = True
result.xs[0,:] = x0
for ii in range(1,ts.shape[0]):
ivp.integrate(ts[ii])
result.xs[ii,:] = ivp.y[:x0.shape[0]]
if sens:
result.Gxs[ii,:,:] = ivp.y[x0.shape[0]:x0.shape[0]+x0.shape[0]*x0.shape[0]].reshape([x0.shape[0], x0.shape[0]])
result.Gps[ii,:,:] = ivp.y[x0.shape[0]+x0.shape[0]*x0.shape[0]:].reshape([x0.shape[0], p.shape[0]])
if not ivp.successful():
result.success = False
break
if store_trajectory:
result.trajectory_t = np.array(times)
result.trajectory_x = np.array(points)
return result
# delta x should be a vector of dimension equal to the number
# of variables, which is true only if we do not change the parameters.
def evaluate_parest_single_shooting(f, dfx, dfp, meas_times, meas_values, s, p):
m = len(meas_times)
x_traject = []
t_traject = []
for l in range(m):
interval = (meas_times[l], meas_times[l + 1])
result = solve_ivp(f, interval, s[l], p=p, sens=True, dfx=dfx, dfp=dfp, store_trajectory=True)
x_traject.append(result.trajectory_x)
t_traject.append(result.trajectory_t)
return t_traject, x_traject
'''
result = solve_ivp(f, meas_times, s0, p=p, sens=True, dfx=dfx, dfp=dfp)
m = len(meas_times)
nx = len(s0)
nv = nx + len(p)
# make F a column vector with [y11, y12, y21, y22, y31, y32, ... ym1, ym2]
F = (meas_values - result.xs).reshape((m * nx, 1))
# each row of J will be [dFij / dx01, dFij / dx02, dFij / dp1, ..., dFij / dp6 ]
J = np.zeros((m * nx, nv))
dy = np.eye(nx)
for i in range(m):
for j in range(nx):
# dhi / dy = dyi / dy = all zeros except for the ith element
# that will be 1
# For dFij / dp I dhi / dp = dyi(tj, yi, p) / dp is equal to 0
J[2 * i + j, :] = np.hstack((dy[[j], :].dot(result.Gxs[i]), dy[[j], :].dot(result.Gps[i])))
return F, J
'''
In [ ]:
np.random.seed(31)
meas_times = np.arange(21) * 5
noise = 5. * np.random.rand(21, 2)
s0_init = np.array([20., 10.])
param_init = np.array((.2, .01, .001, .1))
result = solve_ivp(dX_dt, meas_times, s0_init, p=param_init)
meas_values = result.xs + noise