In [ ]:
from __future__ import division, print_function, absolute_import
import GPy
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib import cm
from matplotlib import patches
%matplotlib inline
from safe_learning import *
The following code models an inverted pendulum, and uses a GP model to determine the safe region of attraction (ROA). The following is inteded to illustrate the algorithm, not to be high-performance code. As such, the code will run very slowly, with the main bottleneck being the repeated GP predictions of the dynamics. There are several obvious points that could make the code run faster
We define the dynamics of an inverted pendulum $$\ddot{\theta}(t) = \frac{mgl \sin(\theta(t)) + u(t)}{m l^2},$$ where $m$ is the mass, $g$ the gravitational constant, $l$ the length of the pendulum, $u$ the control input (torque), and $\theta$ the angle.
The prior model that we use considers no friction, as well as a mass that is $0.5\,$kg lighter.
In [ ]:
n = 2
m = 1
# 'Wrong' model parameters
mass = 0.1
friction = 0.
length = 0.5
gravity = 9.81
inertia = mass * length ** 2
# True model parameters
true_mass = 0.15
true_friction = 0.05
true_length = length
true_inertia = true_mass * true_length ** 2
# Input saturation
x_max = np.deg2rad(30)
u_max = gravity * true_mass * true_length * np.sin(x_max)
# LQR cost matrices
Q = np.array([[1, 0], [0, 1]], dtype=np.float)
R = np.array([[0.1]], dtype=np.float)
In order for the LQR to return meaningful results, as well as for the GP model to have simpler kernel parameters, we normalize the system dynamics (all dimensions have similar magnitudes).
$\theta$ is normalized withing the maximum controllable angle, $\dot{\theta}$ is normalized with the eigenfrequency of the dynamics, and $u$ is normlized with the maximum allowed control input.
In [ ]:
# Normalize the cost functions for the LQR computation
# x_normalized = inv(Tx) * x
Tx = np.diag([x_max, np.sqrt(gravity / length)])
Tu = np.array([[u_max]])
Tx_inv = np.diag(np.diag(Tx)**(-1))
Tu_inv = np.diag(np.diag(Tu)**(-1))
def normalize_x(x):
"""Normalize x vector"""
x = np.asarray(x)
return x.dot(Tx_inv)
def denormalize_x(x):
"""Denormalize x vector"""
x = np.asarray(x)
return x.dot(Tx)
def normalize_u(u):
"""Normalize u vector"""
u = np.asarray(u)
return u.dot(Tu_inv)
def denormalize_u(u):
"""Denormalize u vector"""
u = np.asarray(u)
return u.dot(Tu)
In [ ]:
# Nonlinear dynamics
def ode(x, u):
"""True ode of the dynamics.
Parameters
----------
x: np.array
2D array with one, normalized state
at each column
u: np.array
2D array with one, normalized input
at each column
Returns
-------
x_dot: np.array
The normalized derivative of the dynamics
"""
# Denormalize
x = denormalize_x(np.atleast_2d(x))
u = denormalize_u(np.asarray(u))
# Physical dynamics
x_dot = np.hstack([x[:, [1]],
(gravity / true_length * np.sin(x[:, [0]]) +
u / true_inertia
- true_friction / true_inertia * x[:, [1]])])
# Normalize
return normalize_x(x_dot)
# Linearized dynamics
A = np.array([[0, 1],
[gravity / length, -friction / inertia]])
B = np.array([[0],
[1 / inertia]])
# Normalize linear dynamics
An = Tx_inv.dot(A.dot(Tx))
Bn = Tx_inv.dot(B.dot(Tu))
# Obtain LQR controlelr gain and cost-to-go matrix
Kn, Pn = lqr(An, Bn, Q, R)
u_max_norm = normalize_u(u_max)
def control_law(x):
"""LQR controller with bounded (normalized) inputs.
Parameters
----------
x: np.array
2D array with one normalized state on each column
Returns
-------
u: np.array
2D array with normalized inputs on each column
"""
x = np.asarray(x)
u = -x.dot(Kn.T)
np.clip(u, -u_max_norm, u_max_norm, out=u)
return u
def true_dynamics(x):
"""Return the true closed-loop, normalized dynamics.
Parameters
----------
x: np.array
2D array with one normalized state on each column
Returns
-------
x_dot: np.array
2D array with normalized derivative states on each column
"""
x = np.asarray(x)
u = control_law(x)
return ode(x, u)
def prior_dynamics(x):
"""Return the linearized, closed-loop, prior, normalized dynamics.
Parameters
----------
x: np.array
2D array with one normalized state on each column
Returns
-------
x_dot: np.array
2D array with normalized derivative states on each column
"""
x = np.asarray(x)
u = control_law(x)
return x.dot(An.T) + u.dot(Bn.T)
In [ ]:
# Discretization constant
tau = 0.002
# x_min, x_max, accuracy
grid_param = [(-0.5, 0.5, tau),
(-0.5, 0.5, tau)]
# Used to plot the safe set later
extent = np.array([grid_param[0][0], grid_param[0][1],
grid_param[1][0], grid_param[1][1]])
# Define a grid with combinations of states
grid = [np.arange(*x) for x in grid_param]
num_samples = [len(x) for x in grid]
grid = combinations(grid)
# Initial safe set
grid_true = denormalize_x(grid)
S0 = np.logical_and(np.abs(grid_true[:, 0]) < np.deg2rad(5),
np.abs(grid_true[:, 1]) < np.deg2rad(10))
if not np.any(S0):
print('No initial safe points!')
print('Grid size: {0} combinations in {1}x{2} discretized with tau={3}'
.format(len(grid), extent[:2], extent[2:], tau))
We define the state vector $\mathbf{x} = [\mathbf{x}_1, \mathbf{x}_2] = [\theta, \dot{\theta}]$, so that the dynamics can be written as $$ \dot{\mathbf{x}} = \left[ \begin{matrix} \mathbf{x}_2 \\ \frac{mgl \sin(\mathbf{x}_1) + \tau}{m l^2} \end{matrix} \right] $$
The first part of this equation says that the angle is equal to the integrated angular velocity. This is a intuitively true, irrespective of model errors. As such, we only learn the model error of the second part of the dynamics. That is $$\dot{\mathbf{x}} = \left[ \begin{matrix} \mathbf{x}_2 \\ \frac{mgl \sin(\mathbf{x}_1) + \tau}{m l^2} + g_\pi(\mathbf{x}) \end{matrix} \right] $$
As a kernel we choose $k(x,x') = k_{\mathrm{linear}}(x, x') * k_{\mathrm{Matern}}(x, x')$, the product of a linear and a Matern kernel. This encodes nonlinear functions with linearly increasing amplitude. For more details what this kernel encodes, see the one-dimensional example.
In [ ]:
# Mean function for the GP with the prior dynamics
mf = GPy.core.Mapping(2, 1)
mf.f = lambda x: prior_dynamics(x)[:, [1]]
mf.update_gradients = lambda a,b: None
# Matern kernel multiplied with linear kernel
kernel = (GPy.kern.Matern32(input_dim=2, lengthscale=.2, variance=5, name='radial') *
GPy.kern.Linear(input_dim=2, name='linear', variances=1))
# Measurement model
likelihood = GPy.likelihoods.Gaussian(variance=0.05**2)
# GP with initial measurement at (0, 0), 0
gp = GPy.core.GP(np.array([[0, 0]]), np.array([[0]]),
kernel, likelihood, mean_function=mf)
def predict_model(gp, x):
"""Predict the model using the gp dynamics
Given that the model error only affects the second derivative,
the first state has zero variance and is equal to the prior model.
Parameters
----------
gp: GPy.core.GP
The GP model of the dynamics (including prior)
x: np.array
2D array. Each column has one state at which
to predict the dynamics
Returns
-------
mean: np.array
The mean dynamics at x
var: np.array
Variance of the dynamics at x
"""
gp_mean, gp_var = gp._raw_predict(x)
# Augment with deterministic model for first state
gp_mean = np.hstack([prior_dynamics(x)[:, [0]], gp_mean])
gp_var = np.hstack([np.zeros_like(gp_var), gp_var])
return gp_mean, gp_var
The Lipschitz constant is defined via the high-probability Lipschitz constant of the GP model, as well as the linear dynamics. Importantly, here we use the local Lipschitz constants. Since the kernel we have choosen implies increasing Lipschitz constants with distance from the origin. the worst-case Lipschitz constant would be too conservative.
In [ ]:
# Lyapunov function:
V, dV = quadratic_lyapunov_function(grid, Pn)
V_max = np.max(V)
accuracy = V_max / 1e10
# Lipschitz constants of Lyapunov function
B_dV = L_V = np.max(np.abs(dV), axis=1)
L_dV = np.max(Pn)
# Kernel parameters
kernel_lengthscale = np.min(gp.kern.radial.lengthscale).squeeze()
kernel_var = gp.kern.radial.variance.values.squeeze()
linear_var = gp.kern.linear.Kdiag(grid).squeeze()
# Dynamics Lipschitz constants
L_g = 2 * np.sqrt(kernel_var * linear_var) / kernel_lengthscale
L_f = np.max(np.abs(An - Bn.dot(Kn)))
# Function bounds
B_g = 2 * np.sqrt(kernel_var * linear_var)
B_f = prior_dynamics(grid)[:, 1]
L = (B_g + B_f) * L_dV + B_dV * (L_g + L_f)
To get an intuition about the task at hand, we compute the maximum, safe level set (ROA) according to the true and prior dynamics. The learning algorithm only has access to the prior dynamics model, not the true model!
The plot shows the maximum level set (orange), and the region where $\dot{V}$ is sufficiently small (red). It can be seen that the prior model estimates a safe region that is too large, since it considers a lighter mass. Also, the third plot shows that we cannot recover the maximum level set with the learning method, since it considers $\dot{V}(x) < -L\tau$, rather than $\dot{V}(x) < 0$. For finer discretizations the two sets will get closer and closer to each other.
In [ ]:
V_dot_true = compute_v_dot_upper_bound(dV, true_dynamics(grid), None)
V_dot_prior = compute_v_dot_upper_bound(dV, prior_dynamics(grid), None)
fig, axes = plt.subplots(1, 3, figsize=(10, 20))
S_true = get_safe_set(V_dot_true, 0, S0=None)
axes[0].imshow(np.reshape(S_true, num_samples).T, extent=extent, origin='lower')
c_true = find_max_levelset(S_true, V, accuracy)
axes[0].imshow(np.reshape(V <= c_true, num_samples).T, extent=extent, origin='lower', alpha=0.3, cmap='viridis')
axes[0].set_title('True safe set (V_dot < 0)')
S_prior = get_safe_set(V_dot_prior, 0, S0=S0)
c_prior = find_max_levelset(S_prior, V, accuracy)
axes[1].imshow(np.reshape(S_prior, num_samples).T, extent=extent, origin='lower')
axes[1].set_title('Prior safe set (V_dot < 0)')
axes[1].imshow(np.reshape(V < c_prior, num_samples).T, extent=extent, origin='lower', alpha=0.3, cmap='viridis')
S_true_L = get_safe_set(V_dot_true, -L*tau, S0=S0)
c_true_L = find_max_levelset(S_true_L, V, accuracy)
axes[2].imshow(np.reshape(S_true_L, num_samples).T, extent=extent, origin='lower')
axes[2].set_title('True safe set (V_dot < -L*tau)')
axes[2].imshow(np.reshape(V < c_true_L, num_samples).T, extent=extent, origin='lower', alpha=0.3, cmap='viridis')
plt.show()
print('Number of true safe points: {0}/{3}\n'
'Number of prior safe points: {1}/{3}\n'
'Number of finite safe points: {2}/{3}\n'.format(np.count_nonzero(V < c_true),
np.count_nonzero(V < c_prior),
np.count_nonzero(V < c_true_L),
grid.shape[0]))
In [ ]:
V, dV = quadratic_lyapunov_function(grid, Pn)
def update_gp():
dynamics_mean, dynamics_var = predict_model(gp, grid)
V_dot = compute_v_dot_upper_bound(dV, dynamics_mean, dynamics_var, beta=2.)
S = get_safe_set(V_dot, -L*tau, S0=S0)
c = find_max_levelset(S, V, accuracy)
S[:] = V <= c
max_id = np.argmax(dynamics_var[S, 1])
max_state = grid[S][[max_id], :].copy()
gp.set_XY(np.vstack([gp.X, max_state]),
np.vstack([gp.Y, true_dynamics(max_state)[:, [1]]]))
return S
In [ ]:
# Try to import a nice progress bar
try:
from tqdm import tqdm
except:
tqdm = lambda x: x
# Update the GP model 100 times
for i in tqdm(range(100)):
S = update_gp()
print('Number of estimated safe points: {0}% relative to true dynamics with V_dot < 0'
.format(np.count_nonzero(S) / np.count_nonzero(V < c_true)))
In [ ]:
def denorm_ellipse(P, level):
"""Return the ellipse _bounds, but denormalized."""
x0, x1_u, x1_l = ellipse_bounds(P, level)
return Tx[0,0] * x0, Tx[1,1] * x1_u, Tx[1,1] * x1_l
c_est = find_max_levelset(S, V, accuracy)
colors = ['b', 'm', 'r']
plt.fill_between(*denorm_ellipse(Pn, c_prior), color=colors[0], alpha=0.5)
plt.fill_between(*denorm_ellipse(Pn, c_true), color=colors[1], alpha=0.5)
plt.fill_between(*denorm_ellipse(Pn, c_est), color=colors[2], alpha=0.5)
patch0 = patches.Patch(color=colors[0], alpha=0.5, label='Prior safe set')
patch1 = patches.Patch(color=colors[1], alpha=0.5, label='True safe set')
patch2 = patches.Patch(color=colors[2], alpha=0.5, label='Estimated safe set')
legs = [patch0, patch1, patch2]
labels = [x.get_label() for x in legs]
leg = plt.legend(legs, labels, loc=3, borderaxespad=0)
data = denormalize_x(gp.X[1:, :])
plt.plot(data[:, 0], data[:, 1], 'x')
plt.xlabel(r'Angle $\theta$')
plt.ylabel(r'Angular velocity $\dot{\theta}$')
plt.show()
In [ ]: