On développe le filtre de Kalman étendu pour un modèle à espace d'état continu/discret.
On considère le modèle à espace d'état composé du modèle de chemostat (écrit sous forme canonique) couplé à un modèle d'observation: \begin{align} \label{eq.espace.etat} \left\{ \begin{array}{rl} \dot X(t) &= f(X(t)) + \xi(t)\,,\hskip2em X(0)=X_0\,,\\ Y_k &= h(X(t_k)) + \sigma \, V_k\,, \end{array} \right. \end{align} où: \begin{itemize} \item $\xi(t)$ bruit blanc gaussien (en temps continu) $N(0,Q_\xi)$, on suppose $Q_\xi$ diagonale et on note $\sigma_1^2$ et $\sigma_2^2$ les termes de sa diagonale; \item $V_k$ bruit blanc gaussien scalaire (en temps discret) $N(0,1)$, on suppose que ces bruits sont indépendants et indépendants de $X_0$; \end{itemize} \begin{align*} X=\begin{pmatrix} S\\ B\end{pmatrix}=\begin{pmatrix} x_1\\ x_2\end{pmatrix}\hskip3em \begin{array}{l} \textrm{(concentration en subtrat)}\\ \textrm{(concentration en biomasse)}\end{array} \end{align*} avec: \begin{align*} f(X) &= f(S,B) = \begin{pmatrix} f_1(S,B) \\ f_2(S,B) \end{pmatrix} = \begin{pmatrix} D\,(S_{in}-S)-\kappa\,\mu(S)\,B \\ (\mu(S)-D)\,B \end{pmatrix} = \begin{pmatrix} D\,(S_{in}-x_1)-\kappa\,\mu(x_1)\,x_2 \\ (\mu(x_1)-D)\,x_2 \end{pmatrix} \\ h(X) &= h(S,B) = \mu(S)\,B=\mu(x_1)\,x_2 \\ \mu(S) &= \mu_{max}\,\frac{S}{K+S} = \mu_{max}\,\frac{x_1}{K+x_1} \end{align*}
Afin de simuler \eqref{eq.espace.etat}, on utilise un schéma d'Euler-Maruyama, on se donne $\delta>0$ on pose $t_k=k\,\delta$, et:
\begin{align*} \left\{ \begin{array}{rll} S(t_{k+1}) & = S(t_k) + f_1(S(t_k),B(t_k))\,\delta+\sigma_1\,\sqrt{\delta}\,W^1_k\,, & S(0) = S_0\,,\\ B(t_{k+1}) & = B(t_k) + f_2(S(t_k),B(t_k))\,\delta+\sigma_2\,\sqrt{\delta}\,W^2_k\,, & B(0) = B_0\,,\\ Y_k &= h(X(t_k))+\sigma\, V_k\,, \end{array} \right. \end{align*}où \begin{align*} W^1_k = (\delta\,\sigma_1^2)^{-1/2}\,\int_{t_k}^{t_{k+1}} \xi^1(s)\,{\rm d}s\,, \hskip 2em W^2_k = (\delta\,\sigma_2^2)^{-1/2}\,\int_{t_k}^{t_{k+1}} \xi^2(s)\,{\rm d}s \end{align*} sont des variables aléatoires gaussiennes et indépendantes $N(0,1)$.
On supposera $S_0\sim N(\bar S_0,Q^S_0)$ et $B_0\sim N(\bar B_0,Q^B_0)$ (indépendants).
On peut donc simuler \eqref{eq.espace.etat} avec des observations à tous les instants $t_k=k\delta$ avec $\delta$ "petit", et on pourra choisir la fréquence des observations en ne retenant que les observations $Y(t_{k\,{\rm f_{obs}}})$ où ${\rm f_{obs}}$ est la fréquence des observations.
In [1]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
#___________________________________________________________paramètres
# paramètres du chemostat
smin, smax = 0, 3
bmin, bmax = 0, 1.5
ka = 2; # stoichiometric coefficient
s_in = 2.4; # input substrate concentration
D = 0.1; # dillution rate
mu_max = 5; # maximim of the specific rate (Monod case)
k_s = 10; # half saturation coefficient (Monod case)
# paramètres du modèle stochastique
esp_S0, esp_B0 = 1.8, 0.05 # concentration initiales moyennes
var_S0, var_B0 = 1e-2, 1e-4 # covariance initiale
sigma = np.array([1e-2,1e-2,4e-2]) # intensités des bruits:
# 0 : état substrat \sigma_1
# 1 : état biomasse \sigma_2
# 2 : observation \sigma
# paramètres de simulation
t_final = 60 # simu pour t in [0,t_final]
sim_time_steps = 501 # nombre de pas de temps pour la simulation
#____________________________________________________________fonctions
def f(x, t):
"second memnbre de l'EDO chemostat"
mu = mu_max*x[0]/(k_s+x[0])
f1 = D*(s_in-x[0])-ka*mu*x[1]
f2 = (mu-D)*x[1]
return f1,f2
def h(x):
"fonction observation"
mu = mu_max*x[0]/(k_s+x[0])
return mu*x[1]
def simu_chemostat(esp_S0,esp_B0,var_S0,var_B0,sigma):
"simulation d'une trajectoire et d'observations"
delta = tt[1]
S = np.ones_like(tt)
B = np.ones_like(tt)
Y = np.ones_like(tt)
S[0] = esp_S0 + np.random.randn()*np.sqrt(var_S0) # subtrat initiale
B[0] = esp_B0 + np.random.randn()*np.sqrt(var_B0) # biomasse initiale
sigma_dS = sigma[0]*np.sqrt(delta)
sigma_dB = sigma[1]*np.sqrt(delta)
sigma_obs = sigma[2]
for i in range(sim_time_steps):
fS, fB = f([S[i],B[i]], 0)
S[i+1] = S[i] + delta * fS + sigma_dS*np.random.randn()
B[i+1] = B[i] + delta * fB + sigma_dB*np.random.randn()
Y[i+1] = h([S[i+1],B[i+1]]) + sigma_obs*np.random.randn()
return S,B,Y
#___________________________________________________________simulation
tt = np.linspace(0, t_final, sim_time_steps+1, endpoint=True)
S,B,Y = simu_chemostat(esp_S0,esp_B0,var_S0,var_B0,sigma)
In [2]:
%matplotlib inline
from matplotlib.pyplot import *
from IPython.display import set_matplotlib_formats # sorties de meilleur qualité
set_matplotlib_formats('png', 'pdf') # sous nbconvert
#___________________________________________________________graphiques
plt.plot(tt,S,label='substrat', color = 'b', linewidth=0.3)
plt.plot(tt,B,label='biomasse', color = 'r', linewidth=0.3)
plt.plot(tt[1:],Y[1:],label='observations', color = 'g', linewidth=0.3)
plt.xlabel(r'temps $t$')
plt.legend()
plt.show()
On ne peut pas calculer exactement la loi de ce processus car il est solution d'un équation non-linéaire dont on ne connait pas de solution explicite. On peut toutefois utiliser l'étape de prédiction du FKE afin de calculer une approximation de cette loi sous la forme d'une loi gaussienne de moyenne ${\bar X}(t)$ et de covariance $Q(t)$ données par:
\begin{align*} \frac{\rm d}{{\rm d}t} \begin{pmatrix}\bar X(t) \\ Q(t)\end{pmatrix} = \begin{pmatrix}f(\bar X(t)) \\ \nabla f(\bar X(t))\,Q(t)+Q(t)\,\nabla f(\bar X(t))^* + Q_\xi \end{pmatrix} \end{align*}donc en plus des simulation précédentes, nous pouvons tracer les moyennes approchées ${\bar S}(t)$, ${\bar B}(t)$ ainsi que les "tubes" associés ${\bar S}(t)\pm 2\,\sqrt{Q_{11}(t)}$ et ${\bar B}(t)\pm 2\,\sqrt{Q_{22}(t)}$. Il faudra calculer les fonctions suivantes:
\begin{align*} \nabla f(X) &= \begin{pmatrix} -D-\kappa\,\mu'(S)\,B & -\kappa\,\mu(S) \\ \mu'(S)\,B & \mu(S)-D \end{pmatrix} = \begin{pmatrix} -D-\kappa\,\mu'(x_1)\,x_2 & -\kappa\,\mu(x_1) \\ \mu'(x_1)\,x_2 & \mu(x_1)-D \end{pmatrix} \\ \nabla h(X) &= \nabla h(S,B) = \begin{pmatrix} \mu'(S)\,B & \mu(S) \end{pmatrix} = \begin{pmatrix} \mu'(x_1)\,x_2 & \mu(x_1) \end{pmatrix} \\ \mu'(S) &= \mu_{max}\,\frac{(K+S)\,S'-(K+S)'\,S}{(K+S)^2} = \mu_{max}\,\frac{K}{(K+S)^2} = \mu_{max}\,\frac{K}{(K+x_1)^2} \end{align*}
In [3]:
def Df(x):
"differentielle second memnbre de l'EDO chemostat"
denom = k_s+x[0]
mu = mu_max*x[0]/denom
mup = mu_max*k_s/(denom*denom)
Df1DS = -D-ka*mup*x[1]
Df1DB = -ka*mu
Df2DS = mup*x[1]
Df2DB = mu-D
return Df1DS,Df1DB,Df2DS,Df2DB
def Dh(x):
"differentielle fonction observation"
denom = k_s+x[0]
mu = mu_max*x[0]/denom
mup = mu_max*k_s/(denom*denom)
return mup*x[1],mu
"""
IMPORTANT: voir annexe pour des commentaires sur la fonction
suivante f_etendu
"""
def f_etendu(x_etendu, t):
'''second membre étendu :
EDO chemostat couplé à l'EDO de Riccati pour FKE'''
S = x_etendu[0]
B = x_etendu[1]
RSS = x_etendu[2]
RSB = x_etendu[3]
RBS = x_etendu[4]
RBB = x_etendu[5]
denom = k_s+S
mu = mu_max*S/denom
mup = mu_max*k_s/(denom*denom)
f1 = D*(s_in-S)-ka*mu*B
f2 = (mu-D)*B
Df1DS = -D-ka*mup*B
Df1DB = -ka*mu
Df2DS = mup*B
Df2DB = mu-D
# on ne calcule que la partie surdiagonale de la matrice
DRSS = Df1DB*RBS + Df1DB*RSB + 2*Df1DS*RSS + Qxi[0,0]
DRSB = Df1DB*RBB + Df1DS*RSB + Df2DB*RSB + Df2DS*RSS + Qxi[0,1]
DRBB = 2*Df2DB*RBB + Df2DS*RBS + Df2DS*RSB + Qxi[1,1]
return [f1, f2, DRSS, DRSB, DRSB, DRBB]
def fke_pred(E0,Q0,tt):
"""
consiste à packer les données, passer les arguments à odeint
et à unpacker les résultats
"""
x_etendu_0 = [E0[0], E0[1], Q0[0,0], Q0[0,1], Q0[1,0], Q0[1,1]]
x_etendu_t = odeint(f_etendu, x_etendu_0, tt)
bar_X_t = x_etendu_t[:,0:2]
Q_t = x_etendu_t[:,2:6].reshape((len(tt),2, 2))
return bar_X_t, Q_t
In [4]:
# on simule plusieurs trajectoires du chemostat
for i_simu in range(0, 40):
S,B,Y = simu_chemostat(esp_S0,esp_B0,var_S0,var_B0,sigma)
plt.plot(tt,S, color = 'b', linewidth=0.3,alpha=0.3)
plt.plot(tt,B, color = 'r', linewidth=0.3,alpha=0.3)
S,B,Y = simu_chemostat(esp_S0,esp_B0,var_S0,var_B0,sigma)
plt.plot(tt,S,label='substrat', color = 'b', linewidth=0.3)
plt.plot(tt,B,label='biomasse', color = 'r', linewidth=0.3)
plt.plot(tt[1:],Y[1:],label='observations', color = 'g', linewidth=0.3)
plt.xlabel(r'temps $t$')
# calcul de l'approximation de la loi
Qxi = np.array([ [sigma[0]**2,0] , [0,sigma[1]**2] ])
bar_X_0 = np.array([ [esp_S0] , [esp_B0] ])
Q_0 = np.array([ [var_S0,0] , [0,var_B0] ])
bar_X_t, Q_t = fke_pred(bar_X_0,Q_0, tt)
bar_S_t = bar_X_t[:,0]
bar_B_t = bar_X_t[:,1]
S_stdv_t = np.sqrt(Q_t[:,0,0])
B_stdv_t = np.sqrt(Q_t[:,1,1])
plt.plot(tt,bar_S_t,label='substrat moyen', color = 'b', linestyle = ':')
plt.fill_between(tt,bar_S_t+2*S_stdv_t,bar_S_t-2*S_stdv_t, color = 'b', alpha=0.1)
plt.plot(tt,bar_B_t,label='biomasse moyenne',color = 'r', linestyle = ':')
plt.fill_between(tt,bar_B_t+2*B_stdv_t,bar_B_t-2*B_stdv_t, color = 'r', alpha=0.1)
#plt.ylim([1,1.3])
plt.legend()
plt.show()
In [ ]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import odeint
"""
simulation de \widehat{X}^-(t) R^-(t)
"""
x_etendu_0 = [S_Esp0, B_Esp0, S_Var0, 0, 0, B_Var0]
x_etendu_t = odeint(f_etendu, x_etendu_0, tt)
S_Esp_t = x_etendu_t[:,0]
B_Esp_t = x_etendu_t[:,1]
S_stdv_t = np.sqrt(x_etendu_t[:,2])
B_stdv_t = np.sqrt(x_etendu_t[:,5])
X_Esp_t, R_t = fke_pred(E0,Q0, tt)
S_Esp_t = X_Esp_t[:,0]
B_Esp_t = X_Esp_t[:,1]
S_stdv_t = np.sqrt(R_t[:,0,0])
B_stdv_t = np.sqrt(R_t[:,1,1])
"""
graphiques
"""
plt.plot(tt,S,label='substrat', color = 'b', linewidth=0.3)
plt.plot(tt,B,label='biomasse', color = 'r', linewidth=0.3)
plt.plot(tt[1:],Y[1:],label='observations', color = 'g', linewidth=0.3)
plt.xlabel(r'temps $t$')
plt.plot(tt,S_Esp_t,label='substrat moyen', color = 'b', linestyle = ':')
plt.fill_between(tt,S_Esp_t+2*S_stdv_t,S_Esp_t-2*S_stdv_t, color = 'b', alpha=0.1)
plt.plot(tt,B_Esp_t,label='biomasse moyen',color = 'r', linestyle = ':')
plt.fill_between(tt,B_Esp_t+2*B_stdv_t,B_Esp_t-2*B_stdv_t, color = 'r', alpha=0.1)
plt.ylim(0, 2.5)
plt.legend()
plt.show()
On se donne fke_step
et on suppose que l'on dispose des observations suivantes: Y[k*fke_step]
pour k=1,2,...
exemple:
sim_time_steps : 6
t_final : 20
fke_steps : 5
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 (=sim_time_steps)
|___|___|___|___|___|___|___|___|___|___|___|___|___|___|___|___|
0 20 (=t_final)
on fait tt = np.linspace(0, t_final, sim_time_steps+1, endpoint=True)
donc
|___|
delta=tt[1]=1.25
nb_obs = sim_time_steps//fke_step = 3
|___________________*___________________*___________________*___| instants d'observation
| | | | | |
-------------------
fke_steps
In [ ]:
t_final = 20 # simu pour t in [0,t_final]
sim_time_steps = 16 # nombre de pas de temps pour la simulation
tt = np.linspace(0, t_final, sim_time_steps+1, endpoint=True)
delta = tt[1]
fke_step = 5
nb_obs = sim_time_steps//fke_step # nombre d'observations
fke_X = np.zeros((2, sim_time_steps+nb_obs+1))
fke_R = np.zeros((2,2, sim_time_steps+nb_obs+1))
zz = np.dot(fke_R[:,:,0],fke_X[:,0])
nb_obs
In [ ]:
fke_X[0,:], fke_R[0,:,:]
"""
résultat du filtrage: fke_X et fke_R à tous les instants de simulation (sur tt)
ainsi que les corrections aux instants d'observation
"""
fke_X = np.zeros((2, sim_time_steps+nb_obs+1)) # allocation de mémoire
fke_R = np.zeros((2, 2, sim_time_steps+nb_obs+1)) # "
# _____________________________________________________________ initialisation
i_run, X_run, R_run = 0, E0, Q0
fke_X[0,:], fke_R[0,:,:] = E0, Q0
# _________________________________________________________________ itérations
for k in range(nb_obs):
# _____________________________________________________________ prédiction
ttt = tt[ nb_obs*fke_step : (nb_obs+1)*fke_step +1]
XX, RR = fke_pred(X_run,R_run, ttt)
fke_X[i_run:i_run+fke_step+1,:] = XX
fke_R[i_run:i_run+fke_step+1,:,:] = RR
X_run = fke_X[i_run+fke_step,:]
R_run = fke_R[i_run+fke_step,:]
i_run = i_run+fke_step+1
# ______________________________________________________________ correction
Y_k = xxxxx
h_k = h(X_run)
H_k = Dh(X_run)
K1 = np.ot(R_run, H_k.transpose()) # calcul du gain
K2 = np.multi_dot(H_k, R_run, H_k.transpose()) + Qv # "
K_k = K1 / K2 # "
X_update = X_run + K_k * (Y_k-h_k) # state update
R_update = R_run - K_k * np.ot(H_k,R_run) # covariance update
i_run = i_run+1
fke_X[i_run,:] = X_update
fke_R[i_run,:] = R_update
X_run, R_run = X_update, R_update
# éventuellement finir en prédiction
La fonction f_etendu
a été optimisée en comparaison de la fonction f_etendu_slow
ci-dessous.
La seconde est plus lisible et la première plus rapide; dans la seconde on effectue
un produit de matrices mais pour cela on doit faire du unpack/pack/unpack donc perte de temps. Pour faire le produit des matrices explicitement on peut faire appel comme ici à sympy
pour faire du calcul symbolique.
In [8]:
def f_etendu_slow(x_etendu, t):
'''second membre étendu :
EDO chemostat couplé à l'EDO de Riccati pour FKE'''
x = x_etendu[0:2].reshape((2,1)) # unpacking du couple (etat,Cov)
cov = x_etendu[2:6].reshape((2,2)) # -------
denom = k_s+x[0]
mu = mu_max*x[0]/denom
mup = mu_max*k_s/(denom*denom)
f1 = D*(s_in-x[0])-ka*mu*x[1]
f2 = (mu-D)*x[1]
Df1DS = -D-ka*mup*x[1]
Df1DB = -ka*mu
Df2DS = mup*x[1]
Df2DB = mu-D
DF = np.array([Df1DS, Df1DB, Df2DS, Df2DB]).reshape((2, 2))
Fcov = np.dot(DF,cov) + np.dot(cov,DF.transpose()) + Qxi
"attention je retourne deux fois Fcov[1,0]"
return [f1, f2, Fcov[0,0], Fcov[1,0], Fcov[1,0], Fcov[1,1]]
def fke_pred_slow(E0,Q0,tt):
"""
consiste à packer les données, passer les arguments à odeint
et à unpacker les résultats
"""
x_etendu_0 = [E0[0], E0[1], Q0[0,0], Q0[0,1], Q0[1,0], Q0[1,1]]
x_etendu_t = odeint(f_etendu_slow, x_etendu_0, tt)
bar_X_t = x_etendu_t[:,0:2]
Q_t = x_etendu_t[:,2:6].reshape((len(tt),2, 2))
return bar_X_t, Q_t
In [9]:
import sympy
Df1DS = sympy.Symbol("Df1DS")
Df1DB = sympy.Symbol("Df1DB")
Df2DS = sympy.Symbol("Df2DS")
Df2DB = sympy.Symbol("Df2DB")
RSS = sympy.Symbol("RSS")
RSB = sympy.Symbol("RSB")
RBS = sympy.Symbol("RBS")
RBB = sympy.Symbol("RBB")
zQxiSS = sympy.Symbol("zQxiSS")
zQxiSB = sympy.Symbol("zQxiSB")
zQxiBS = sympy.Symbol("zQxiBS")
zQxiBB = sympy.Symbol("zQxiBB")
Df = np.array([ [Df1DS,Df1DB] , [Df2DS,Df2DB]])
R = np.array([ [RSS, RSB] , [RBS, RBB]])
zQxi = np.array([ [zQxiSS,zQxiSB] , [zQxiBS,zQxiBB]])
DR=np.dot(Df,R)+np.dot(R,Df.transpose())+zQxi
print(DR[0,0])
print(DR[0,1])
print(DR[1,0])
print(DR[1,1])
In [14]:
# paramètres de simulation
t_final = 60 # simu pour t in [0,t_final]
sim_time_steps = 3001 # nombre de pas de temps pour la simulation
tt = np.linspace(0, t_final, sim_time_steps+1, endpoint=True)
import time
t_depart = time.process_time()
bar_X_t2, Q_t2 = fke_pred_slow(bar_X_0,Q_0, tt)
print("temps de calcul version lente : ",time.process_time() - t_depart)
t_depart = time.process_time()
bar_X_t, Q_t = fke_pred(bar_X_0,Q_0, tt)
print("temps de calcul version rapide : ",time.process_time() - t_depart)
from numpy import linalg as LA
print("différence sur la moyenne :",LA.norm(bar_X_t2-bar_X_t))
print("différence sur la variance :",LA.norm(Q_t2-Q_t))
In [ ]: