In [1]:
%%javascript
$.getScript('https://kmahelona.github.io/ipython_notebook_goodies/ipython_notebook_toc.js')
Guided Policy Search (GPS) is a technique that transforms the Reinforcement Learning (RL) task of policy search into a Supervised Learning problem, where the training set is generated by a simple trajectory-centric RL algorithm.
This algorithm optimizes linear-Gaussian controllers $p_i (u_t | x_t)$. Each $p_i (u_t | x_t)$ succeeds in the task from different initial states which helps the algorithm to generalize to other states from the same distribution. The final policy $\pi_\theta(u_t | o_t )$ learned with GPS is only provided with observations $o_t$ of the full state $x_t$, and assumed dynamics are assumed to be unknown.
We draw sample trajectories $\tau_i^j$ for each initial state on the physical system by running the corresponding controller $p_i(u_t | x_t)$. The samples are used to fit the dynamics $p_i (x_{t+1} | x_t, u_t)$ that are used to improve the controllers $p_i(u_t | x_t)$, and serve as training data for the policy $\pi_\theta(u_t | o_t )$. Within the graph we can observe how there's a loop that alternates between optimizing each trajectory distribution $p_i (\tau)$ and optimizing the policy $\pi_\theta(u_t | o_t )$ to match these trajectory distributions.
This work is based on https://arxiv.org/abs/1504.00702. Refer to http://rll.berkeley.edu/gps/ for the original implementation.
corresponding symbol | definition | |
---|---|---|
$p_i(u_t | x_t)$ | linear-Gaussian controllers, they induce the trajectory distributions $p_i (\tau)$ |
$\hat{p_i}(u_t | x_t)$ | previous controllers, previous time step t-1 |
$\pi_\theta(u_t | o_t )$ | final policy learned |
$p_i (\tau)$ | trajectory distribution induced from the linear-Gaussian controllers, guiding distribution | |
$\tau_i^j$ | sample trajectories, sampled from the distribution | |
$o_t$ | observations | |
$x_t$ | full state | |
$pi (x{t+1} | x_t, u_t)$ | system dynamics |
In [2]:
import numpy as np
def gauss_fit_joint_prior(pts, mu0, Phi, m, n0, dwts, dX, dU, sig_reg):
""" Perform Gaussian fit to data with a prior. """
# Build weights matrix.
D = np.diag(dwts)
# Compute empirical mean and covariance.
mun = np.sum((pts.T * dwts).T, axis=0)
diff = pts - mun
empsig = diff.T.dot(D).dot(diff)
empsig = 0.5 * (empsig + empsig.T)
# MAP estimate of joint distribution.
N = dwts.shape[0]
mu = mun
sigma = (N * empsig + Phi + (N * m) / (N + m) *
np.outer(mun - mu0, mun - mu0)) / (N + n0)
sigma = 0.5 * (sigma + sigma.T)
# Add sigma regularization.
sigma += sig_reg
# Conditioning to get dynamics.
fd = np.linalg.solve(sigma[:dX, :dX], sigma[:dX, dX:dX+dU]).T
fc = mu[dX:dX+dU] - fd.dot(mu[:dX])
dynsig = sigma[dX:dX+dU, dX:dX+dU] - fd.dot(sigma[:dX, :dX]).dot(fd.T)
dynsig = 0.5 * (dynsig + dynsig.T)
return fd, fc, dynsig
In [3]:
from gps_pb2 import ACTION
class Sample(object):
"""
Class that handles the representation of a trajectory and stores a
single trajectory.
Note: must be serializable for easy saving, no C++ references!
"""
def __init__(self, agent):
self.agent = agent
self.T = agent.T
self.dX = agent.dX
self.dU = agent.dU
self.dO = agent.dO
self.dM = agent.dM
# Dictionary containing the sample data from various sensors.
self._data = {}
self._X = np.empty((self.T, self.dX))
self._X.fill(np.nan)
self._obs = np.empty((self.T, self.dO))
self._obs.fill(np.nan)
self._meta = np.empty(self.dM)
self._meta.fill(np.nan)
def set(self, sensor_name, sensor_data, t=None):
""" Set trajectory data for a particular sensor. """
if t is None:
self._data[sensor_name] = sensor_data
self._X.fill(np.nan) # Invalidate existing X.
self._obs.fill(np.nan) # Invalidate existing obs.
self._meta.fill(np.nan) # Invalidate existing meta data.
else:
if sensor_name not in self._data:
self._data[sensor_name] = \
np.empty((self.T,) + sensor_data.shape)
self._data[sensor_name].fill(np.nan)
self._data[sensor_name][t, :] = sensor_data
self._X[t, :].fill(np.nan)
self._obs[t, :].fill(np.nan)
def get(self, sensor_name, t=None):
""" Get trajectory data for a particular sensor. """
return (self._data[sensor_name] if t is None
else self._data[sensor_name][t, :])
def get_X(self, t=None):
""" Get the state. Put it together if not precomputed. """
X = self._X if t is None else self._X[t, :]
if np.any(np.isnan(X)):
for data_type in self._data:
if data_type not in self.agent.x_data_types:
continue
data = (self._data[data_type] if t is None
else self._data[data_type][t, :])
self.agent.pack_data_x(X, data, data_types=[data_type])
return X
def get_U(self, t=None):
""" Get the action. """
return self._data[ACTION] if t is None else self._data[ACTION][t, :]
def get_obs(self, t=None):
""" Get the observation. Put it together if not precomputed. """
obs = self._obs if t is None else self._obs[t, :]
if np.any(np.isnan(obs)):
for data_type in self._data:
if data_type not in self.agent.obs_data_types:
continue
if data_type in self.agent.meta_data_types:
continue
data = (self._data[data_type] if t is None
else self._data[data_type][t, :])
self.agent.pack_data_obs(obs, data, data_types=[data_type])
return obs
def get_meta(self):
""" Get the meta data. Put it together if not precomputed. """
meta = self._meta
if np.any(np.isnan(meta)):
for data_type in self._data:
if data_type not in self.agent.meta_data_types:
continue
data = self._data[data_type]
self.agent.pack_data_meta(meta, data, data_types=[data_type])
return meta
# For pickling.
def __getstate__(self):
state = self.__dict__.copy()
state.pop('agent')
return state
# For unpickling.
def __setstate__(self, state):
self.__dict__ = state
self.__dict__['agent'] = None
In [4]:
#NOISE = 19 # initially generated by the protocol buffer compiler.
from gps_pb2 import NOISE
class SampleList(object):
""" Class that handles writes and reads to sample data. """
def __init__(self, samples):
self._samples = samples
def get_X(self, idx=None):
""" Returns N x T x dX numpy array of states. """
if idx is None:
idx = range(len(self._samples))
return np.asarray([self._samples[i].get_X() for i in idx])
def get_U(self, idx=None):
""" Returns N x T x dU numpy array of actions. """
if idx is None:
idx = range(len(self._samples))
return np.asarray([self._samples[i].get_U() for i in idx])
def get_noise(self, idx=None):
""" Returns N x T x dU numpy array of noise generated during rollouts. """
if idx is None:
idx = range(len(self._samples))
return np.asarray([self._samples[i].get(NOISE) for i in idx])
def get_obs(self, idx=None):
""" Returns N x T x dO numpy array of features. """
if idx is None:
idx = range(len(self._samples))
return np.asarray([self._samples[i].get_obs() for i in idx])
def get_samples(self, idx=None):
""" Returns N sample objects. """
if idx is None:
idx = range(len(self._samples))
return [self._samples[i] for i in idx]
def num_samples(self):
""" Returns number of samples. """
return len(self._samples)
# Convenience methods.
def __len__(self):
return self.num_samples()
def __getitem__(self, idx):
return self.get_samples([idx])[0]
In [5]:
import abc
class Dynamics(object):
""" Dynamics superclass. """
__metaclass__ = abc.ABCMeta
def __init__(self, hyperparams):
#def __init__(self):
self._hyperparams = hyperparams
# TODO - Currently assuming that dynamics will always be linear
# with X.
# TODO - Allocate arrays using hyperparams dU, dX, T.
# Fitted dynamics: x_t+1 = Fm * [x_t;u_t] + fv.
self.Fm = np.array(np.nan)
self.fv = np.array(np.nan)
self.dyn_covar = np.array(np.nan) # Covariance.
@abc.abstractmethod
def update_prior(self, sample):
""" Update dynamics prior. """
raise NotImplementedError("Must be implemented in subclass.")
@abc.abstractmethod
def get_prior(self):
""" Returns prior object. """
raise NotImplementedError("Must be implemented in subclass.")
@abc.abstractmethod
def fit(self, sample_list):
""" Fit dynamics. """
raise NotImplementedError("Must be implemented in subclass.")
def copy(self):
""" Return a copy of the dynamics estimate. """
dyn = type(self)(self._hyperparams)
#dyn = type(self)()
dyn.Fm = np.copy(self.Fm)
dyn.fv = np.copy(self.fv)
dyn.dyn_covar = np.copy(self.dyn_covar)
return dyn
We define the Gaussian mixture model (GMM) class
In [6]:
import logging
import scipy.linalg
def logsum(vec, axis=0, keepdims=True):
#TODO: Add a docstring.
maxv = np.max(vec, axis=axis, keepdims=keepdims)
maxv[maxv == -float('inf')] = 0
return np.log(np.sum(np.exp(vec-maxv), axis=axis, keepdims=keepdims)) + maxv
class GMM(object):
""" Gaussian Mixture Model. """
def __init__(self, init_sequential=False, eigreg=False, warmstart=True):
self.init_sequential = init_sequential
self.eigreg = eigreg
self.warmstart = warmstart
self.sigma = None
def inference(self, pts):
"""
Evaluate dynamics prior.
Args:
pts: A N x D array of points.
"""
# Compute posterior cluster weights.
logwts = self.clusterwts(pts)
# Compute posterior mean and covariance.
mu0, Phi = self.moments(logwts)
# Set hyperparameters.
m = self.N
n0 = m - 2 - mu0.shape[0]
# Normalize.
m = float(m) / self.N
n0 = float(n0) / self.N
return mu0, Phi, m, n0
def estep(self, data):
"""
Compute log observation probabilities under GMM.
Args:
data: A N x D array of points.
Returns:
logobs: A N x K array of log probabilities (for each point
on each cluster).
"""
# Constants.
N, D = data.shape
K = self.sigma.shape[0]
logobs = -0.5*np.ones((N, K))*D*np.log(2*np.pi)
for i in range(K):
mu, sigma = self.mu[i], self.sigma[i]
L = scipy.linalg.cholesky(sigma, lower=True)
logobs[:, i] -= np.sum(np.log(np.diag(L)))
diff = (data - mu).T
soln = scipy.linalg.solve_triangular(L, diff, lower=True)
logobs[:, i] -= 0.5*np.sum(soln**2, axis=0)
logobs += self.logmass.T
return logobs
def moments(self, logwts):
"""
Compute the moments of the cluster mixture with logwts.
Args:
logwts: A K x 1 array of log cluster probabilities.
Returns:
mu: A (D,) mean vector.
sigma: A D x D covariance matrix.
"""
# Exponentiate.
wts = np.exp(logwts)
# Compute overall mean.
mu = np.sum(self.mu * wts, axis=0)
# Compute overall covariance.
diff = self.mu - np.expand_dims(mu, axis=0)
diff_expand = np.expand_dims(self.mu, axis=1) * \
np.expand_dims(diff, axis=2)
wts_expand = np.expand_dims(wts, axis=2)
sigma = np.sum((self.sigma + diff_expand) * wts_expand, axis=0)
return mu, sigma
def clusterwts(self, data):
"""
Compute cluster weights for specified points under GMM.
Args:
data: An N x D array of points
Returns:
A K x 1 array of average cluster log probabilities.
"""
# Compute probability of each point under each cluster.
logobs = self.estep(data)
# Renormalize to get cluster weights.
logwts = logobs - logsum(logobs, axis=1)
# Average the cluster probabilities.
logwts = logsum(logwts, axis=0) - np.log(data.shape[0])
return logwts.T
def update(self, data, K, max_iterations=100):
"""
Run EM to update clusters.
Args:
data: An N x D data matrix, where N = number of data points.
K: Number of clusters to use.
"""
# Constants.
N = data.shape[0]
Do = data.shape[1]
LOGGER.debug('Fitting GMM with %d clusters on %d points', K, N)
if (not self.warmstart or self.sigma is None or
K != self.sigma.shape[0]):
# Initialization.
LOGGER.debug('Initializing GMM.')
self.sigma = np.zeros((K, Do, Do))
self.mu = np.zeros((K, Do))
self.logmass = np.log(1.0 / K) * np.ones((K, 1))
self.mass = (1.0 / K) * np.ones((K, 1))
self.N = data.shape[0]
N = self.N
# Set initial cluster indices.
if not self.init_sequential:
cidx = np.random.randint(0, K, size=(1, N))
else:
raise NotImplementedError()
# Initialize.
for i in range(K):
cluster_idx = (cidx == i)[0]
mu = np.mean(data[cluster_idx, :], axis=0)
diff = (data[cluster_idx, :] - mu).T
sigma = (1.0 / K) * (diff.dot(diff.T))
self.mu[i, :] = mu
self.sigma[i, :, :] = sigma + np.eye(Do) * 2e-6
prevll = -float('inf')
for itr in range(max_iterations):
# E-step: compute cluster probabilities.
logobs = self.estep(data)
# Compute log-likelihood.
ll = np.sum(logsum(logobs, axis=1))
LOGGER.debug('GMM itr %d/%d. Log likelihood: %f',
itr, max_iterations, ll)
if ll < prevll:
# TODO: Why does log-likelihood decrease sometimes?
LOGGER.debug('Log-likelihood decreased! Ending on itr=%d/%d',
itr, max_iterations)
break
if np.abs(ll-prevll) < 1e-5*prevll:
LOGGER.debug('GMM converged on itr=%d/%d',
itr, max_iterations)
break
prevll = ll
# Renormalize to get cluster weights.
logw = logobs - logsum(logobs, axis=1)
assert logw.shape == (N, K)
# Renormalize again to get weights for refitting clusters.
logwn = logw - logsum(logw, axis=0)
assert logwn.shape == (N, K)
w = np.exp(logwn)
# M-step: update clusters.
# Fit cluster mass.
self.logmass = logsum(logw, axis=0).T
self.logmass = self.logmass - logsum(self.logmass, axis=0)
assert self.logmass.shape == (K, 1)
self.mass = np.exp(self.logmass)
# Reboot small clusters.
w[:, (self.mass < (1.0 / K) * 1e-4)[:, 0]] = 1.0 / N
# Fit cluster means.
w_expand = np.expand_dims(w, axis=2)
data_expand = np.expand_dims(data, axis=1)
self.mu = np.sum(w_expand * data_expand, axis=0)
# Fit covariances.
wdata = data_expand * np.sqrt(w_expand)
assert wdata.shape == (N, K, Do)
for i in range(K):
# Compute weighted outer product.
XX = wdata[:, i, :].T.dot(wdata[:, i, :])
mu = self.mu[i, :]
self.sigma[i, :, :] = XX - np.outer(mu, mu)
if self.eigreg: # Use eigenvalue regularization.
raise NotImplementedError()
else: # Use quick and dirty regularization.
sigma = self.sigma[i, :, :]
self.sigma[i, :, :] = 0.5 * (sigma + sigma.T) + \
1e-6 * np.eye(Do)
DynamicsPriorGMM
Optimizing the linear-Gaussian controllers $p_i(u_t | x_t)$ (that induce the trajectories $p_i (\tau)$) requires fitting the system dynamics $p_i (x_{t+1} | x_t, u_t)$ at each iteration to samples generated on the physical system from the previous controller $\hat{p_i}(u_t | x_t)$.
The linear-Gaussian dynamics are defined as $p_i (x_{t+1} | x_t, u_t) = \mathcal{N} (f_{xt}x_t + f_{ut}u_t + f_{ct}, F_t)$, and the data that we obtain from the robot can be viewed as tuples $\{x_t^i, u_t^i, x_{t+1}^i\}$. A simple way to fit these linear-Gaussian dynamics is to use linear regression to determine $f_x$, $f_u$ and $f_c$, and fit $F_t$ based on errors however the sample complexity of linear regression scales with the dimensionality of the full state space $x_t$.
Although this might be an issue for high-dimensional robotic systems, we can observe that the dynamics at nearby time steps are strongly correlated which means that we can dramatically reduce the sample complexity of the dynamics fitting by bringing in information from previous time steps. This implementation will fit a global model to all of the transitions $\{x_t^i, u_t^i, x_{t+1}^i\}$ for all t and all tuples from prior iterations and then use this model as a prior for fitting the dynamics at each time step.
Below, the definition of GMM prior for dynamics estimation.
In [7]:
import copy
# DynamicsPriorGMM
DYN_PRIOR_GMM = {
'min_samples_per_cluster': 20,
'max_clusters': 50,
'max_samples': 20,
'strength': 1.0,
}
# As defined in the code examples
DYN_PRIOR_GMM_example = {
'min_samples_per_cluster': 40,
'max_clusters': 20,
'max_samples': 20,
'strength': 1.0,
}
class DynamicsPriorGMM(object):
"""
A dynamics prior encoded as a GMM over [x_t, u_t, x_t+1] points.
See:
S. Levine*, C. Finn*, T. Darrell, P. Abbeel, "End-to-end
training of Deep Visuomotor Policies", arXiv:1504.00702,
Appendix A.3.
"""
def __init__(self, hyperparams):
"""
Hyperparameters:
min_samples_per_cluster: Minimum samples per cluster.
max_clusters: Maximum number of clusters to fit.
max_samples: Maximum number of trajectories to use for
fitting the GMM at any given time.
strength: Adjusts the strength of the prior.
"""
config = copy.deepcopy(DYN_PRIOR_GMM)
#config = copy.deepcopy(DYN_PRIOR_GMM_example)
config.update(hyperparams)
self._hyperparams = config
self.X = None
self.U = None
self.gmm = GMM()
self._min_samp = self._hyperparams['min_samples_per_cluster']
self._max_samples = self._hyperparams['max_samples']
self._max_clusters = self._hyperparams['max_clusters']
self._strength = self._hyperparams['strength']
# Should we use copy.min_samples_per_cluster, etc. instead?
#self._min_samp = DYN_PRIOR_GMM_example.min_samples_per_cluster
#self._max_samples = DYN_PRIOR_GMM_example.max_samples
#self._max_clusters = DYN_PRIOR_GMM_example.max_clusters
#self._strength = DYN_PRIOR_GMM_example.strength
def initial_state(self):
""" Return dynamics prior for initial time step. """
# Compute mean and covariance.
mu0 = np.mean(self.X[:, 0, :], axis=0)
Phi = np.diag(np.var(self.X[:, 0, :], axis=0))
# Factor in multiplier.
n0 = self.X.shape[2] * self._strength
m = self.X.shape[2] * self._strength
# Multiply Phi by m (since it was normalized before).
Phi = Phi * m
return mu0, Phi, m, n0
def update(self, X, U):
"""
Update prior with additional data.
Args:
X: A N x T x dX matrix of sequential state data.
U: A N x T x dU matrix of sequential control data.
"""
# Constants.
T = X.shape[1] - 1
# Append data to dataset.
if self.X is None:
self.X = X
else:
self.X = np.concatenate([self.X, X], axis=0)
if self.U is None:
self.U = U
else:
self.U = np.concatenate([self.U, U], axis=0)
# Remove excess samples from dataset.
start = max(0, self.X.shape[0] - self._max_samples + 1)
self.X = self.X[start:, :]
self.U = self.U[start:, :]
# Compute cluster dimensionality.
Do = X.shape[2] + U.shape[2] + X.shape[2] #TODO: Use Xtgt.
# Create dataset.
N = self.X.shape[0]
xux = np.reshape(
np.c_[self.X[:, :T, :], self.U[:, :T, :], self.X[:, 1:(T+1), :]],
[T * N, Do]
)
# Choose number of clusters.
K = int(max(2, min(self._max_clusters,
np.floor(float(N * T) / self._min_samp))))
LOGGER.debug('Generating %d clusters for dynamics GMM.', K)
# Update GMM.
self.gmm.update(xux, K)
def eval(self, Dx, Du, pts):
"""
Evaluate prior.
Args:
pts: A N x Dx+Du+Dx matrix.
"""
# Construct query data point by rearranging entries and adding
# in reference.
assert pts.shape[1] == Dx + Du + Dx
# Perform query and fix mean.
mu0, Phi, m, n0 = self.gmm.inference(pts)
# Factor in multiplier.
n0 = n0 * self._strength
m = m * self._strength
# Multiply Phi by m (since it was normalized before).
Phi *= m
return mu0, Phi, m, n0
In [8]:
#regularization = 1e-6
class DynamicsLRPrior(Dynamics):
""" Dynamics with linear regression, with arbitrary prior. """
def __init__(self, hyperparams):
#def __init__(self):
Dynamics.__init__(self, hyperparams)
#Dynamics.__init__(self)
self.Fm = None
self.fv = None
self.dyn_covar = None
#self.prior = DynamicsPriorGMM() # Refer to the corresponding class for a deeper understanding
self.prior = self._hyperparams['prior']['type'](self._hyperparams['prior'])
def update_prior(self, samples):
""" Update dynamics prior. """
X = samples.get_X()
U = samples.get_U()
self.prior.update(X, U)
def get_prior(self):
""" Return the dynamics prior. """
return self.prior
#TODO: Merge this with DynamicsLR.fit - lots of duplicated code.
def fit(self, X, U):
""" Fit dynamics. """
N, T, dX = X.shape
dU = U.shape[2]
if N == 1:
raise ValueError("Cannot fit dynamics on 1 sample")
self.Fm = np.zeros([T, dX, dX+dU])
self.fv = np.zeros([T, dX])
self.dyn_covar = np.zeros([T, dX, dX])
it = slice(dX+dU)
ip = slice(dX+dU, dX+dU+dX)
# Fit dynamics with least squares regression.
dwts = (1.0 / N) * np.ones(N)
for t in range(T - 1):
Ys = np.c_[X[:, t, :], U[:, t, :], X[:, t+1, :]]
# Obtain Normal-inverse-Wishart prior.
mu0, Phi, mm, n0 = self.prior.eval(dX, dU, Ys)
sig_reg = np.zeros((dX+dU+dX, dX+dU+dX))
sig_reg[it, it] = self._hyperparams['regularization']
#sig_reg[it, it] = regularization
Fm, fv, dyn_covar = gauss_fit_joint_prior(Ys,
mu0, Phi, mm, n0, dwts, dX+dU, dX, sig_reg)
self.Fm[t, :, :] = Fm
self.fv[t, :] = fv
self.dyn_covar[t, :, :] = dyn_covar
return self.Fm, self.fv, self.dyn_covar
In [9]:
class BundleType(object):
"""
This class bundles many fields, similar to a record or a mutable
namedtuple.
"""
def __init__(self, variables):
for var, val in variables.items():
object.__setattr__(self, var, val)
# Freeze fields so new ones cannot be set.
def __setattr__(self, key, value):
if not hasattr(self, key):
raise AttributeError("%r has no attribute %s" % (self, key))
object.__setattr__(self, key, value)
def check_shape(value, expected_shape, name=''):
"""
Throws a ValueError if value.shape != expected_shape.
Args:
value: Matrix to shape check.
expected_shape: A tuple or list of integers.
name: An optional name to add to the exception message.
"""
if value.shape != tuple(expected_shape):
raise ValueError('Shape mismatch %s: Expected %s, got %s' %
(name, str(expected_shape), str(value.shape)))
def finite_differences(func, inputs, func_output_shape=(), epsilon=1e-5):
"""
Computes gradients via finite differences.
derivative = (func(x+epsilon) - func(x-epsilon)) / (2*epsilon)
Args:
func: Function to compute gradient of. Inputs and outputs can be
arbitrary dimension.
inputs: Vector value to compute gradient at.
func_output_shape: Shape of the output of func. Default is
empty-tuple, which works for scalar-valued functions.
epsilon: Difference to use for computing gradient.
Returns:
Gradient vector of each dimension of func with respect to each
dimension of input.
"""
gradient = np.zeros(inputs.shape+func_output_shape)
for idx, _ in np.ndenumerate(inputs):
test_input = np.copy(inputs)
test_input[idx] += epsilon
obj_d1 = func(test_input)
assert obj_d1.shape == func_output_shape
test_input = np.copy(inputs)
test_input[idx] -= epsilon
obj_d2 = func(test_input)
assert obj_d2.shape == func_output_shape
diff = (obj_d1 - obj_d2) / (2 * epsilon)
gradient[idx] += diff
return gradient
In [10]:
class IterationData(BundleType):
""" Collection of iteration variables. """
def __init__(self):
variables = {
'sample_list': None, # List of samples for the current iteration.
'traj_info': None, # Current TrajectoryInfo object.
'pol_info': None, # Current PolicyInfo object.
'traj_distr': None, # Initial trajectory distribution.
'new_traj_distr': None, # Updated trajectory distribution.
'cs': None, # Sample costs of the current iteration.
'step_mult': 1.0, # KL step multiplier for the current iteration.
'eta': 1.0, # Dual variable used in LQR backward pass.
}
BundleType.__init__(self, variables)
class TrajectoryInfo(BundleType):
""" Collection of trajectory-related variables. """
def __init__(self):
variables = {
'dynamics': None, # Dynamics object for the current iteration.
'x0mu': None, # Mean for the initial state, used by the dynamics.
'x0sigma': None, # Covariance for the initial state distribution.
'cc': None, # Cost estimate constant term.
'cv': None, # Cost estimate vector term.
'Cm': None, # Cost estimate matrix term.
'last_kl_step': float('inf'), # KL step of the previous iteration.
}
BundleType.__init__(self, variables)
In [11]:
class PolicyInfo(BundleType):
""" Collection of policy-related variables. """
def __init__(self, hyperparams):
T, dU, dX = hyperparams['T'], hyperparams['dU'], hyperparams['dX']
variables = {
'lambda_k': np.zeros((T, dU)), # Dual variables.
'lambda_K': np.zeros((T, dU, dX)), # Dual variables.
'pol_wt': hyperparams['init_pol_wt'] * np.ones(T), # Policy weight.
'pol_mu': None, # Mean of the current policy output.
'pol_sig': None, # Covariance of the current policy output.
'pol_K': np.zeros((T, dU, dX)), # Policy linearization.
'pol_k': np.zeros((T, dU)), # Policy linearization.
'pol_S': np.zeros((T, dU, dU)), # Policy linearization covariance.
'chol_pol_S': np.zeros((T, dU, dU)), # Cholesky decomp of covar.
'prev_kl': None, # Previous KL divergence.
'init_kl': None, # The initial KL divergence, before the iteration.
'policy_samples': [], # List of current policy samples.
'policy_prior': None, # Current prior for policy linearization.
}
BundleType.__init__(self, variables)
def traj_distr(self):
""" Create a trajectory distribution object from policy info. """
T, dU, dX = self.pol_K.shape
# Compute inverse policy covariances.
inv_pol_S = np.empty_like(self.chol_pol_S)
for t in range(T):
inv_pol_S[t, :, :] = np.linalg.solve(
self.chol_pol_S[t, :, :],
np.linalg.solve(self.chol_pol_S[t, :, :].T, np.eye(dU))
)
return LinearGaussianPolicy(self.pol_K, self.pol_k, self.pol_S,
self.chol_pol_S, inv_pol_S)
In [12]:
import random
def extract_condition(hyperparams, m):
"""
Pull the relevant hyperparameters corresponding to the specified
condition, and return a new hyperparameter dictionary.
"""
return {var: val[m] if isinstance(val, list) else val
for var, val in hyperparams.items()}
# Algorithm
ALG = {
'inner_iterations': 1, # Number of iterations.
'min_eta': 1e-5, # Minimum initial lagrange multiplier in DGD for
# trajectory optimization.
'kl_step':0.2,
'min_step_mult':0.01,
'max_step_mult':10.0,
'min_mult': 0.1,
'max_mult': 5.0,
# Trajectory settings.
'initial_state_var':1e-6,
'init_traj_distr': None, # A list of initial LinearGaussianPolicy
# objects for each condition.
# Trajectory optimization.
'traj_opt': None,
# Weight of maximum entropy term in trajectory optimization.
'max_ent_traj': 0.0,
# Dynamics hyperaparams.
'dynamics': None,
# Costs.
'cost': None, # A list of Cost objects for each condition.
# Whether or not to sample with neural net policy (only for badmm/mdgps).
'sample_on_policy': False,
# Inidicates if the algorithm requires fitting of the dynamics.
'fit_dynamics': True,
}
class Algorithm(object):
""" Algorithm superclass. """
__metaclass__ = abc.ABCMeta
def __init__(self, hyperparams):
config = copy.deepcopy(ALG)
config.update(hyperparams)
self._hyperparams = config
if 'train_conditions' in hyperparams:
self._cond_idx = hyperparams['train_conditions']
self.M = len(self._cond_idx)
else:
self.M = hyperparams['conditions']
self._cond_idx = range(self.M)
self._hyperparams['train_conditions'] = self._cond_idx
self._hyperparams['test_conditions'] = self._cond_idx
self.iteration_count = 0
# Grab a few values from the agent.
agent = self._hyperparams['agent']
self.T = self._hyperparams['T'] = agent.T
self.dU = self._hyperparams['dU'] = agent.dU
self.dX = self._hyperparams['dX'] = agent.dX
self.dO = self._hyperparams['dO'] = agent.dO
init_traj_distr = config['init_traj_distr']
init_traj_distr['x0'] = agent.x0
init_traj_distr['dX'] = agent.dX
init_traj_distr['dU'] = agent.dU
del self._hyperparams['agent'] # Don't want to pickle this.
# IterationData objects for each condition.
self.cur = [IterationData() for _ in range(self.M)]
self.prev = [IterationData() for _ in range(self.M)]
if self._hyperparams['fit_dynamics']:
dynamics = self._hyperparams['dynamics']
for m in range(self.M):
self.cur[m].traj_info = TrajectoryInfo()
if self._hyperparams['fit_dynamics']:
self.cur[m].traj_info.dynamics = dynamics['type'](dynamics)
init_traj_distr = extract_condition(
self._hyperparams['init_traj_distr'], self._cond_idx[m]
)
self.cur[m].traj_distr = init_traj_distr['type'](init_traj_distr)
self.traj_opt = hyperparams['traj_opt']['type'](
hyperparams['traj_opt']
)
if type(hyperparams['cost']) == list:
self.cost = [
hyperparams['cost'][i]['type'](hyperparams['cost'][i])
for i in range(self.M)
]
else:
self.cost = [
hyperparams['cost']['type'](hyperparams['cost'])
for _ in range(self.M)
]
self.base_kl_step = self._hyperparams['kl_step']
@abc.abstractmethod
def iteration(self, sample_list):
""" Run iteration of the algorithm. """
raise NotImplementedError("Must be implemented in subclass")
def _update_dynamics(self):
"""
Instantiate dynamics objects and update prior. Fit dynamics to
current samples.
"""
for m in range(self.M):
cur_data = self.cur[m].sample_list
X = cur_data.get_X()
U = cur_data.get_U()
# Update prior and fit dynamics.
self.cur[m].traj_info.dynamics.update_prior(cur_data)
self.cur[m].traj_info.dynamics.fit(X, U)
# Fit x0mu/x0sigma.
x0 = X[:, 0, :]
x0mu = np.mean(x0, axis=0)
self.cur[m].traj_info.x0mu = x0mu
self.cur[m].traj_info.x0sigma = np.diag(
np.maximum(np.var(x0, axis=0),
self._hyperparams['initial_state_var'])
)
prior = self.cur[m].traj_info.dynamics.get_prior()
if prior:
mu0, Phi, priorm, n0 = prior.initial_state()
N = len(cur_data)
self.cur[m].traj_info.x0sigma += \
Phi + (N*priorm) / (N+priorm) * \
np.outer(x0mu-mu0, x0mu-mu0) / (N+n0)
def _update_trajectories(self):
"""
Compute new linear Gaussian controllers.
"""
if not hasattr(self, 'new_traj_distr'):
self.new_traj_distr = [
self.cur[cond].traj_distr for cond in range(self.M)
]
for cond in range(self.M):
self.new_traj_distr[cond], self.cur[cond].eta = \
self.traj_opt.update(cond, self)
def _eval_cost(self, cond):
"""
Evaluate costs for all samples for a condition.
Args:
cond: Condition to evaluate cost on.
"""
# Constants.
T, dX, dU = self.T, self.dX, self.dU
N = len(self.cur[cond].sample_list)
# Compute cost.
cs = np.zeros((N, T))
cc = np.zeros((N, T))
cv = np.zeros((N, T, dX+dU))
Cm = np.zeros((N, T, dX+dU, dX+dU))
for n in range(N):
sample = self.cur[cond].sample_list[n]
# Get costs.
l, lx, lu, lxx, luu, lux = self.cost[cond].eval(sample)
cc[n, :] = l
cs[n, :] = l
# Assemble matrix and vector.
cv[n, :, :] = np.c_[lx, lu]
Cm[n, :, :, :] = np.concatenate(
(np.c_[lxx, np.transpose(lux, [0, 2, 1])], np.c_[lux, luu]),
axis=1
)
# Adjust for expanding cost around a sample.
X = sample.get_X()
U = sample.get_U()
yhat = np.c_[X, U]
rdiff = -yhat
rdiff_expand = np.expand_dims(rdiff, axis=2)
cv_update = np.sum(Cm[n, :, :, :] * rdiff_expand, axis=1)
cc[n, :] += np.sum(rdiff * cv[n, :, :], axis=1) + 0.5 * \
np.sum(rdiff * cv_update, axis=1)
cv[n, :, :] += cv_update
# Fill in cost estimate.
self.cur[cond].traj_info.cc = np.mean(cc, 0) # Constant term (scalar).
self.cur[cond].traj_info.cv = np.mean(cv, 0) # Linear term (vector).
self.cur[cond].traj_info.Cm = np.mean(Cm, 0) # Quadratic term (matrix).
self.cur[cond].cs = cs # True value of cost.
def _advance_iteration_variables(self):
"""
Move all 'cur' variables to 'prev', and advance iteration
counter.
"""
self.iteration_count += 1
self.prev = copy.deepcopy(self.cur)
# TODO: change IterationData to reflect new stuff better
for m in range(self.M):
self.prev[m].new_traj_distr = self.new_traj_distr[m]
self.cur = [IterationData() for _ in range(self.M)]
for m in range(self.M):
self.cur[m].traj_info = TrajectoryInfo()
self.cur[m].traj_info.dynamics = copy.deepcopy(self.prev[m].traj_info.dynamics)
self.cur[m].step_mult = self.prev[m].step_mult
self.cur[m].eta = self.prev[m].eta
self.cur[m].traj_distr = self.new_traj_distr[m]
delattr(self, 'new_traj_distr')
def _set_new_mult(self, predicted_impr, actual_impr, m):
"""
Adjust step size multiplier according to the predicted versus
actual improvement.
"""
# Model improvement as I = predicted_dI * KL + penalty * KL^2,
# where predicted_dI = pred/KL and penalty = (act-pred)/(KL^2).
# Optimize I w.r.t. KL: 0 = predicted_dI + 2 * penalty * KL =>
# KL' = (-predicted_dI)/(2*penalty) = (pred/2*(pred-act)) * KL.
# Therefore, the new multiplier is given by pred/2*(pred-act).
new_mult = predicted_impr / (2.0 * max(1e-4,
predicted_impr - actual_impr))
new_mult = max(0.1, min(5.0, new_mult))
new_step = max(
min(new_mult * self.cur[m].step_mult,
self._hyperparams['max_step_mult']),
self._hyperparams['min_step_mult']
)
self.cur[m].step_mult = new_step
if new_mult > 1:
LOGGER.debug('Increasing step size multiplier to %f', new_step)
else:
LOGGER.debug('Decreasing step size multiplier to %f', new_step)
def _measure_ent(self, m):
""" Measure the entropy of the current trajectory. """
ent = 0
for t in range(self.T):
ent = ent + np.sum(
np.log(np.diag(self.cur[m].traj_distr.chol_pol_covar[t, :, :]))
)
return ent
# For pickling.
def __getstate__(self):
state = self.__dict__.copy()
state['_random_state'] = random.getstate()
state['_np_random_state'] = np.random.get_state()
return state
# For unpickling.
def __setstate__(self, state):
self.__dict__ = state
random.setstate(state.pop('_random_state'))
np.random.set_state(state.pop('_np_random_state'))
In [13]:
import scipy as sp
# AlgorithmBADMM
ALG_BADMM = {
'inner_iterations': 4,
'policy_dual_rate': 0.1,
'policy_dual_rate_covar': 0.0,
'fixed_lg_step': 0,
'lg_step_schedule': 10.0,
'ent_reg_schedule': 0.0,
'init_pol_wt': 0.01,
'policy_sample_mode': 'add',
'exp_step_increase': 2.0,
'exp_step_decrease': 0.5,
'exp_step_upper': 0.5,
'exp_step_lower': 1.0,
}
class AlgorithmBADMM(Algorithm):
"""
Sample-based joint policy learning and trajectory optimization with
BADMM-based guided policy search algorithm.
"""
def __init__(self, hyperparams):
config = copy.deepcopy(ALG_BADMM)
config.update(hyperparams)
Algorithm.__init__(self, config)
policy_prior = self._hyperparams['policy_prior']
for m in range(self.M):
self.cur[m].pol_info = PolicyInfo(self._hyperparams)
self.cur[m].pol_info.policy_prior = \
policy_prior['type'](policy_prior)
self.policy_opt = self._hyperparams['policy_opt']['type'](
self._hyperparams['policy_opt'], self.dO, self.dU
)
def iteration(self, sample_lists):
"""
Run iteration of BADMM-based guided policy search.
Args:
sample_lists: List of SampleList objects for each condition.
"""
for m in range(self.M):
self.cur[m].sample_list = sample_lists[m]
self._set_interp_values()
self._update_dynamics() # Update dynamics model using all sample.
self._update_step_size() # KL Divergence step size.
for m in range(self.M):
# save initial kl for debugging / visualization
self.cur[m].pol_info.init_kl = self._policy_kl(m)[0]
# Run inner loop to compute new policies.
for inner_itr in range(self._hyperparams['inner_iterations']):
#TODO: Could start from init controller.
if self.iteration_count > 0 or inner_itr > 0:
# Update the policy.
self._update_policy(inner_itr)
for m in range(self.M):
self._update_policy_fit(m) # Update policy priors.
if self.iteration_count > 0 or inner_itr > 0:
step = (inner_itr == self._hyperparams['inner_iterations'] - 1)
# Update dual variables.
for m in range(self.M):
self._policy_dual_step(m, step=step)
self._update_trajectories()
self._advance_iteration_variables()
def _set_interp_values(self):
"""
Use iteration-based interpolation to set values of some
schedule-based parameters.
"""
# Compute temporal interpolation value.
t = min((self.iteration_count + 1.0) /
(self._hyperparams['iterations'] - 1), 1)
# Perform iteration-based interpolation of entropy penalty.
if type(self._hyperparams['ent_reg_schedule']) in (int, float):
self.policy_opt.set_ent_reg(self._hyperparams['ent_reg_schedule'])
else:
sch = self._hyperparams['ent_reg_schedule']
self.policy_opt.set_ent_reg(
np.exp(np.interp(t, np.linspace(0, 1, num=len(sch)),
np.log(sch)))
)
# Perform iteration-based interpolation of Lagrange multiplier.
if type(self._hyperparams['lg_step_schedule']) in (int, float):
self._hyperparams['lg_step'] = self._hyperparams['lg_step_schedule']
else:
sch = self._hyperparams['lg_step_schedule']
self._hyperparams['lg_step'] = np.exp(
np.interp(t, np.linspace(0, 1, num=len(sch)), np.log(sch))
)
def _update_step_size(self):
""" Evaluate costs on samples, and adjust the step size. """
# Evaluate cost function for all conditions and samples.
for m in range(self.M):
self._update_policy_fit(m, init=True)
self._eval_cost(m)
# Adjust step size relative to the previous iteration.
if self.iteration_count >= 1 and self.prev[m].sample_list:
self._stepadjust(m)
def _update_policy(self, inner_itr):
""" Compute the new policy. """
dU, dO, T = self.dU, self.dO, self.T
# Compute target mean, cov, and weight for each sample.
obs_data, tgt_mu = np.zeros((0, T, dO)), np.zeros((0, T, dU))
tgt_prc, tgt_wt = np.zeros((0, T, dU, dU)), np.zeros((0, T))
for m in range(self.M):
samples = self.cur[m].sample_list
X = samples.get_X()
N = len(samples)
if inner_itr > 0:
traj, pol_info = self.new_traj_distr[m], self.cur[m].pol_info
else:
traj, pol_info = self.cur[m].traj_distr, self.cur[m].pol_info
mu = np.zeros((N, T, dU))
prc = np.zeros((N, T, dU, dU))
wt = np.zeros((N, T))
# Get time-indexed actions.
for t in range(T):
# Compute actions along this trajectory.
prc[:, t, :, :] = np.tile(traj.inv_pol_covar[t, :, :],
[N, 1, 1])
for i in range(N):
mu[i, t, :] = \
(traj.K[t, :, :].dot(X[i, t, :]) + traj.k[t, :]) - \
np.linalg.solve(
prc[i, t, :, :] / pol_info.pol_wt[t],
pol_info.lambda_K[t, :, :].dot(X[i, t, :]) + \
pol_info.lambda_k[t, :]
)
wt[:, t].fill(pol_info.pol_wt[t])
tgt_mu = np.concatenate((tgt_mu, mu))
tgt_prc = np.concatenate((tgt_prc, prc))
tgt_wt = np.concatenate((tgt_wt, wt))
obs_data = np.concatenate((obs_data, samples.get_obs()))
self.policy_opt.update(obs_data, tgt_mu, tgt_prc, tgt_wt)
def _update_policy_fit(self, m, init=False):
"""
Re-estimate the local policy values in the neighborhood of the
trajectory.
Args:
m: Condition
init: Whether this is the initial fitting of the policy.
"""
dX, dU, T = self.dX, self.dU, self.T
# Choose samples to use.
samples = self.cur[m].sample_list
N = len(samples)
pol_info = self.cur[m].pol_info
X = samples.get_X()
obs = samples.get_obs().copy()
pol_mu, pol_sig = self.policy_opt.prob(obs)[:2]
pol_info.pol_mu, pol_info.pol_sig = pol_mu, pol_sig
# Update policy prior.
policy_prior = pol_info.policy_prior
if init:
samples = SampleList(self.cur[m].sample_list)
mode = self._hyperparams['policy_sample_mode']
else:
samples = SampleList([])
mode = 'add' # Don't replace with empty samples
policy_prior.update(samples, self.policy_opt, mode)
# Fit linearization and store in pol_info.
pol_info.pol_K, pol_info.pol_k, pol_info.pol_S = \
policy_prior.fit(X, pol_mu, pol_sig)
for t in range(T):
pol_info.chol_pol_S[t, :, :] = \
sp.linalg.cholesky(pol_info.pol_S[t, :, :])
def _policy_dual_step(self, m, step=False):
"""
Update the dual variables for the specified condition.
Args:
m: Condition
step: Whether or not to update pol_wt.
"""
dU, T = self.dU, self.T
samples = self.cur[m].sample_list
N = len(samples)
X = samples.get_X()
if 'new_traj_distr' in dir(self):
traj, pol_info = self.new_traj_distr[m], self.cur[m].pol_info
else:
traj, pol_info = self.cur[m].traj_distr, self.cur[m].pol_info
# Compute trajectory action at each sampled state.
traj_mu = np.zeros((N, T, dU))
for i in range(N):
for t in range(T):
traj_mu[i, t, :] = traj.K[t, :, :].dot(X[i, t, :]) + \
traj.k[t, :]
# Compute policy action at each sampled state.
pol_mu = pol_info.pol_mu
# Compute the difference and increment based on pol_wt.
for t in range(T):
tU, pU = traj_mu[:, t, :], pol_mu[:, t, :]
# Increment mean term.
pol_info.lambda_k[t, :] -= self._hyperparams['policy_dual_rate'] * \
pol_info.pol_wt[t] * \
traj.inv_pol_covar[t, :, :].dot(np.mean(tU - pU, axis=0))
# Increment covariance term.
t_covar, p_covar = traj.K[t, :, :], pol_info.pol_K[t, :, :]
pol_info.lambda_K[t, :, :] -= \
self._hyperparams['policy_dual_rate_covar'] * \
pol_info.pol_wt[t] * \
traj.inv_pol_covar[t, :, :].dot(t_covar - p_covar)
# Compute KL divergence.
kl_m = self._policy_kl(m)[0]
if step:
lg_step = self._hyperparams['lg_step']
# Increment pol_wt based on change in KL divergence.
if self._hyperparams['fixed_lg_step'] == 1:
# Take fixed size step.
pol_info.pol_wt = np.array([
max(wt + lg_step, 0) for wt in pol_info.pol_wt
])
elif self._hyperparams['fixed_lg_step'] == 2:
# (In/De)crease based on change in constraint
# satisfaction.
if hasattr(pol_info, 'prev_kl'):
kl_change = kl_m / pol_info.prev_kl
for i in range(len(pol_info.pol_wt)):
if kl_change[i] < 0.8:
pol_info.pol_wt[i] *= 0.5
elif kl_change[i] >= 0.95:
pol_info.pol_wt[i] *= 2.0
elif self._hyperparams['fixed_lg_step'] == 3:
# (In/De)crease based on difference from average.
if hasattr(pol_info, 'prev_kl'):
lower = np.mean(kl_m) - \
self._hyperparams['exp_step_lower'] * np.std(kl_m)
upper = np.mean(kl_m) + \
self._hyperparams['exp_step_upper'] * np.std(kl_m)
for i in range(len(pol_info.pol_wt)):
if kl_m[i] < lower:
pol_info.pol_wt[i] *= \
self._hyperparams['exp_step_decrease']
elif kl_m[i] >= upper:
pol_info.pol_wt[i] *= \
self._hyperparams['exp_step_increase']
else:
# Standard DGD step.
pol_info.pol_wt = np.array([
max(pol_info.pol_wt[t] + lg_step * kl_m[t], 0)
for t in range(T)
])
pol_info.prev_kl = kl_m
def _advance_iteration_variables(self):
"""
Move all 'cur' variables to 'prev', reinitialize 'cur'
variables, and advance iteration counter.
"""
Algorithm._advance_iteration_variables(self)
for m in range(self.M):
self.cur[m].traj_info.last_kl_step = \
self.prev[m].traj_info.last_kl_step
self.cur[m].pol_info = copy.deepcopy(self.prev[m].pol_info)
def _stepadjust(self, m):
"""
Calculate new step sizes.
Args:
m: Condition
"""
# Compute values under Laplace approximation. This is the policy
# that the previous samples were actually drawn from under the
# dynamics that were estimated from the previous samples.
prev_laplace_obj, prev_laplace_kl = self._estimate_cost(
self.prev[m].traj_distr, self.prev[m].traj_info, self.prev[m].pol_info, m
)
# This is the policy that we just used under the dynamics that
# were estimated from the previous samples (so this is the cost
# we thought we would have).
new_pred_laplace_obj, new_pred_laplace_kl = self._estimate_cost(
self.cur[m].traj_distr, self.prev[m].traj_info, self.prev[m].pol_info, m
)
# This is the actual cost we have under the current trajectory
# based on the latest samples.
new_actual_laplace_obj, new_actual_laplace_kl = self._estimate_cost(
self.cur[m].traj_distr, self.cur[m].traj_info, self.cur[m].pol_info, m
)
# Measure the entropy of the current trajectory (for printout).
ent = self._measure_ent(m)
# Compute actual objective values based on the samples.
prev_mc_obj = np.mean(np.sum(self.prev[m].cs, axis=1), axis=0)
new_mc_obj = np.mean(np.sum(self.cur[m].cs, axis=1), axis=0)
# Compute sample-based estimate of KL divergence between policy
# and trajectories.
new_mc_kl = self._policy_kl(m)[0]
if self.iteration_count >= 1 and self.prev[m].sample_list:
prev_mc_kl = self._policy_kl(m, prev=True)[0]
else:
prev_mc_kl = np.zeros_like(new_mc_kl)
# Compute full policy KL divergence objective terms by applying
# the Lagrange multipliers.
pol_wt = self.cur[m].pol_info.pol_wt
prev_laplace_kl_sum = np.sum(prev_laplace_kl * pol_wt)
new_pred_laplace_kl_sum = np.sum(new_pred_laplace_kl * pol_wt)
new_actual_laplace_kl_sum = np.sum(new_actual_laplace_kl * pol_wt)
prev_mc_kl_sum = np.sum(prev_mc_kl * pol_wt)
new_mc_kl_sum = np.sum(new_mc_kl * pol_wt)
LOGGER.debug(
'Trajectory step: ent: %f cost: %f -> %f KL: %f -> %f',
ent, prev_mc_obj, new_mc_obj, prev_mc_kl_sum, new_mc_kl_sum
)
# Compute predicted and actual improvement.
predicted_impr = np.sum(prev_laplace_obj) + prev_laplace_kl_sum - \
np.sum(new_pred_laplace_obj) - new_pred_laplace_kl_sum
actual_impr = np.sum(prev_laplace_obj) + prev_laplace_kl_sum - \
np.sum(new_actual_laplace_obj) - new_actual_laplace_kl_sum
# Print improvement details.
LOGGER.debug('Previous cost: Laplace: %f MC: %f',
np.sum(prev_laplace_obj), prev_mc_obj)
LOGGER.debug('Predicted new cost: Laplace: %f MC: %f',
np.sum(new_pred_laplace_obj), new_mc_obj)
LOGGER.debug('Actual new cost: Laplace: %f MC: %f',
np.sum(new_actual_laplace_obj), new_mc_obj)
LOGGER.debug('Previous KL: Laplace: %f MC: %f',
np.sum(prev_laplace_kl), np.sum(prev_mc_kl))
LOGGER.debug('Predicted new KL: Laplace: %f MC: %f',
np.sum(new_pred_laplace_kl), np.sum(new_mc_kl))
LOGGER.debug('Actual new KL: Laplace: %f MC: %f',
np.sum(new_actual_laplace_kl), np.sum(new_mc_kl))
LOGGER.debug('Previous w KL: Laplace: %f MC: %f',
prev_laplace_kl_sum, prev_mc_kl_sum)
LOGGER.debug('Predicted w new KL: Laplace: %f MC: %f',
new_pred_laplace_kl_sum, new_mc_kl_sum)
LOGGER.debug('Actual w new KL: Laplace %f MC: %f',
new_actual_laplace_kl_sum, new_mc_kl_sum)
LOGGER.debug('Predicted/actual improvement: %f / %f',
predicted_impr, actual_impr)
# Compute actual KL step taken at last iteration.
actual_step = self.cur[m].traj_info.last_kl_step / \
(self._hyperparams['kl_step'] * self.T)
if actual_step < self.cur[m].step_mult:
self.cur[m].step_mult = max(actual_step,
self._hyperparams['min_step_mult'])
self._set_new_mult(predicted_impr, actual_impr, m)
def _policy_kl(self, m, prev=False):
"""
Monte-Carlo estimate of KL divergence between policy and
trajectory.
"""
dU, T = self.dU, self.T
if prev:
traj, pol_info = self.prev[m].traj_distr, self.cur[m].pol_info
samples = self.prev[m].sample_list
else:
traj, pol_info = self.cur[m].traj_distr, self.cur[m].pol_info
samples = self.cur[m].sample_list
N = len(samples)
X, obs = samples.get_X(), samples.get_obs()
kl, kl_m = np.zeros((N, T)), np.zeros(T)
kl_l, kl_lm = np.zeros((N, T)), np.zeros(T)
# Compute policy mean and covariance at each sample.
pol_mu, _, pol_prec, pol_det_sigma = self.policy_opt.prob(obs.copy())
# Compute KL divergence.
for t in range(T):
# Compute trajectory action at sample.
traj_mu = np.zeros((N, dU))
for i in range(N):
traj_mu[i, :] = traj.K[t, :, :].dot(X[i, t, :]) + traj.k[t, :]
diff = pol_mu[:, t, :] - traj_mu
tr_pp_ct = pol_prec[:, t, :, :] * traj.pol_covar[t, :, :]
k_ln_det_ct = 0.5 * dU + np.sum(
np.log(np.diag(traj.chol_pol_covar[t, :, :]))
)
ln_det_cp = np.log(pol_det_sigma[:, t])
# IMPORTANT: Note that this assumes that pol_prec does not
# depend on state!!!!
# (Only the last term makes this assumption.)
d_pp_d = np.sum(diff * (diff.dot(pol_prec[1, t, :, :])), axis=1)
kl[:, t] = 0.5 * np.sum(np.sum(tr_pp_ct, axis=1), axis=1) - \
k_ln_det_ct + 0.5 * ln_det_cp + 0.5 * d_pp_d
tr_pp_ct_m = np.mean(tr_pp_ct, axis=0)
kl_m[t] = 0.5 * np.sum(np.sum(tr_pp_ct_m, axis=0), axis=0) - \
k_ln_det_ct + 0.5 * np.mean(ln_det_cp) + \
0.5 * np.mean(d_pp_d)
# Compute trajectory action at sample with Lagrange
# multiplier.
traj_mu = np.zeros((N, dU))
for i in range(N):
traj_mu[i, :] = \
(traj.K[t, :, :] - pol_info.lambda_K[t, :, :]).dot(
X[i, t, :]
) + (traj.k[t, :] - pol_info.lambda_k[t, :])
# Compute KL divergence with Lagrange multiplier.
diff_l = pol_mu[:, t, :] - traj_mu
d_pp_d_l = np.sum(diff_l * (diff_l.dot(pol_prec[1, t, :, :])),
axis=1)
kl_l[:, t] = 0.5 * np.sum(np.sum(tr_pp_ct, axis=1), axis=1) - \
k_ln_det_ct + 0.5 * ln_det_cp + 0.5 * d_pp_d_l
kl_lm[t] = 0.5 * np.sum(np.sum(tr_pp_ct_m, axis=0), axis=0) - \
k_ln_det_ct + 0.5 * np.mean(ln_det_cp) + \
0.5 * np.mean(d_pp_d_l)
return kl_m, kl, kl_lm, kl_l
def _estimate_cost(self, traj_distr, traj_info, pol_info, m):
"""
Compute Laplace approximation to expected cost.
Args:
traj_distr: A linear Gaussian policy object.
traj_info: A TrajectoryInfo object.
pol_info: Policy linearization info.
m: Condition number.
"""
# Constants.
T, dU, dX = self.T, self.dU, self.dX
# Perform forward pass (note that we repeat this here, because
# traj_info may have different dynamics from the ones that were
# used to compute the distribution already saved in traj).
mu, sigma = self.traj_opt.forward(traj_distr, traj_info)
# Compute cost.
predicted_cost = np.zeros(T)
for t in range(T):
predicted_cost[t] = traj_info.cc[t] + 0.5 * \
(np.sum(sigma[t, :, :] * traj_info.Cm[t, :, :]) +
mu[t, :].T.dot(traj_info.Cm[t, :, :]).dot(mu[t, :])) + \
mu[t, :].T.dot(traj_info.cv[t, :])
# Compute KL divergence.
predicted_kl = np.zeros(T)
for t in range(T):
inv_pS = np.linalg.solve(
pol_info.chol_pol_S[t, :, :],
np.linalg.solve(pol_info.chol_pol_S[t, :, :].T, np.eye(dU))
)
Ufb = pol_info.pol_K[t, :, :].dot(mu[t, :dX].T) + \
pol_info.pol_k[t, :]
diff = mu[t, dX:] - Ufb
Kbar = traj_distr.K[t, :, :] - pol_info.pol_K[t, :, :]
predicted_kl[t] = 0.5 * (diff).dot(inv_pS).dot(diff) + \
0.5 * np.sum(traj_distr.pol_covar[t, :, :] * inv_pS) + \
0.5 * np.sum(
sigma[t, :dX, :dX] * Kbar.T.dot(inv_pS).dot(Kbar)
) + np.sum(
np.log(np.diag(pol_info.chol_pol_S[t, :, :]))
) - np.sum(
np.log(np.diag(traj_distr.chol_pol_covar[t, :, :]))
) + 0.5 * dU
return predicted_cost, predicted_kl
def compute_costs(self, m, eta, augment=True):
""" Compute cost estimates used in the LQR backward pass. """
traj_info, traj_distr = self.cur[m].traj_info, self.cur[m].traj_distr
if not augment: # Whether to augment cost with term to penalize KL
return traj_info.Cm, traj_info.cv
pol_info = self.cur[m].pol_info
multiplier = self._hyperparams['max_ent_traj']
T, dU, dX = traj_distr.T, traj_distr.dU, traj_distr.dX
Cm, cv = np.copy(traj_info.Cm), np.copy(traj_info.cv)
# Modify policy action via Lagrange multiplier.
cv[:, dX:] -= pol_info.lambda_k
Cm[:, dX:, :dX] -= pol_info.lambda_K
Cm[:, :dX, dX:] -= np.transpose(pol_info.lambda_K, [0, 2, 1])
#Pre-process the costs with KL-divergence terms.
TKLm = np.zeros((T, dX+dU, dX+dU))
TKLv = np.zeros((T, dX+dU))
PKLm = np.zeros((T, dX+dU, dX+dU))
PKLv = np.zeros((T, dX+dU))
fCm, fcv = np.zeros(Cm.shape), np.zeros(cv.shape)
for t in range(T):
K, k = traj_distr.K[t, :, :], traj_distr.k[t, :]
inv_pol_covar = traj_distr.inv_pol_covar[t, :, :]
# Trajectory KL-divergence terms.
TKLm[t, :, :] = np.vstack([
np.hstack([
K.T.dot(inv_pol_covar).dot(K),
-K.T.dot(inv_pol_covar)]),
np.hstack([-inv_pol_covar.dot(K), inv_pol_covar])
])
TKLv[t, :] = np.concatenate([
K.T.dot(inv_pol_covar).dot(k), -inv_pol_covar.dot(k)
])
# Policy KL-divergence terms.
inv_pol_S = np.linalg.solve(
pol_info.chol_pol_S[t, :, :],
np.linalg.solve(pol_info.chol_pol_S[t, :, :].T, np.eye(dU))
)
KB, kB = pol_info.pol_K[t, :, :], pol_info.pol_k[t, :]
PKLm[t, :, :] = np.vstack([
np.hstack([KB.T.dot(inv_pol_S).dot(KB), -KB.T.dot(inv_pol_S)]),
np.hstack([-inv_pol_S.dot(KB), inv_pol_S])
])
PKLv[t, :] = np.concatenate([
KB.T.dot(inv_pol_S).dot(kB), -inv_pol_S.dot(kB)
])
wt = pol_info.pol_wt[t]
fCm[t, :, :] = (Cm[t, :, :] + TKLm[t, :, :] * eta +
PKLm[t, :, :] * wt) / (eta + wt + multiplier)
fcv[t, :] = (cv[t, :] + TKLv[t, :] * eta +
PKLv[t, :] * wt) / (eta + wt + multiplier)
return fCm, fcv
In [14]:
# AlgorithmMDGPS
ALG_MDGPS = {
# TODO: remove need for init_pol_wt in MDGPS
'init_pol_wt': 0.01,
'policy_sample_mode': 'add',
# Whether to use 'laplace' or 'mc' cost in step adjusment
'step_rule': 'laplace',
}
class AlgorithmMDGPS(Algorithm):
"""
Sample-based joint policy learning and trajectory optimization with
(approximate) mirror descent guided policy search algorithm.
"""
def __init__(self, hyperparams):
config = copy.deepcopy(ALG_MDGPS)
config.update(hyperparams)
Algorithm.__init__(self, config)
policy_prior = self._hyperparams['policy_prior']
for m in range(self.M):
self.cur[m].pol_info = PolicyInfo(self._hyperparams)
self.cur[m].pol_info.policy_prior = \
policy_prior['type'](policy_prior)
self.policy_opt = self._hyperparams['policy_opt']['type'](
self._hyperparams['policy_opt'], self.dO, self.dU
)
def iteration(self, sample_lists):
"""
Run iteration of MDGPS-based guided policy search.
Args:
sample_lists: List of SampleList objects for each condition.
"""
# Store the samples and evaluate the costs.
for m in range(self.M):
self.cur[m].sample_list = sample_lists[m]
self._eval_cost(m)
# Update dynamics linearizations.
self._update_dynamics()
# On the first iteration, need to catch policy up to init_traj_distr.
if self.iteration_count == 0:
self.new_traj_distr = [
self.cur[cond].traj_distr for cond in range(self.M)
]
self._update_policy()
# Update policy linearizations.
for m in range(self.M):
self._update_policy_fit(m)
# C-step
if self.iteration_count > 0:
self._stepadjust()
self._update_trajectories()
# S-step
self._update_policy()
# Prepare for next iteration
self._advance_iteration_variables()
def _update_policy(self):
""" Compute the new policy. """
dU, dO, T = self.dU, self.dO, self.T
# Compute target mean, cov, and weight for each sample.
obs_data, tgt_mu = np.zeros((0, T, dO)), np.zeros((0, T, dU))
tgt_prc, tgt_wt = np.zeros((0, T, dU, dU)), np.zeros((0, T))
for m in range(self.M):
samples = self.cur[m].sample_list
X = samples.get_X()
N = len(samples)
traj, pol_info = self.new_traj_distr[m], self.cur[m].pol_info
mu = np.zeros((N, T, dU))
prc = np.zeros((N, T, dU, dU))
wt = np.zeros((N, T))
# Get time-indexed actions.
for t in range(T):
# Compute actions along this trajectory.
prc[:, t, :, :] = np.tile(traj.inv_pol_covar[t, :, :],
[N, 1, 1])
for i in range(N):
mu[i, t, :] = (traj.K[t, :, :].dot(X[i, t, :]) + traj.k[t, :])
wt[:, t].fill(pol_info.pol_wt[t])
tgt_mu = np.concatenate((tgt_mu, mu))
tgt_prc = np.concatenate((tgt_prc, prc))
tgt_wt = np.concatenate((tgt_wt, wt))
obs_data = np.concatenate((obs_data, samples.get_obs()))
self.policy_opt.update(obs_data, tgt_mu, tgt_prc, tgt_wt)
def _update_policy_fit(self, m):
"""
Re-estimate the local policy values in the neighborhood of the
trajectory.
Args:
m: Condition
"""
dX, dU, T = self.dX, self.dU, self.T
# Choose samples to use.
samples = self.cur[m].sample_list
N = len(samples)
pol_info = self.cur[m].pol_info
X = samples.get_X()
obs = samples.get_obs().copy()
pol_mu, pol_sig = self.policy_opt.prob(obs)[:2]
pol_info.pol_mu, pol_info.pol_sig = pol_mu, pol_sig
# Update policy prior.
policy_prior = pol_info.policy_prior
samples = SampleList(self.cur[m].sample_list)
mode = self._hyperparams['policy_sample_mode']
policy_prior.update(samples, self.policy_opt, mode)
# Fit linearization and store in pol_info.
pol_info.pol_K, pol_info.pol_k, pol_info.pol_S = \
policy_prior.fit(X, pol_mu, pol_sig)
for t in range(T):
pol_info.chol_pol_S[t, :, :] = \
sp.linalg.cholesky(pol_info.pol_S[t, :, :])
def _advance_iteration_variables(self):
"""
Move all 'cur' variables to 'prev', reinitialize 'cur'
variables, and advance iteration counter.
"""
Algorithm._advance_iteration_variables(self)
for m in range(self.M):
self.cur[m].traj_info.last_kl_step = \
self.prev[m].traj_info.last_kl_step
self.cur[m].pol_info = copy.deepcopy(self.prev[m].pol_info)
def _stepadjust(self):
"""
Calculate new step sizes. This version uses the same step size
for all conditions.
"""
# Compute previous cost and previous expected cost.
prev_M = len(self.prev) # May be different in future.
prev_laplace = np.empty(prev_M)
prev_mc = np.empty(prev_M)
prev_predicted = np.empty(prev_M)
for m in range(prev_M):
prev_nn = self.prev[m].pol_info.traj_distr()
prev_lg = self.prev[m].new_traj_distr
# Compute values under Laplace approximation. This is the policy
# that the previous samples were actually drawn from under the
# dynamics that were estimated from the previous samples.
prev_laplace[m] = self.traj_opt.estimate_cost(
prev_nn, self.prev[m].traj_info
).sum()
# This is the actual cost that we experienced.
prev_mc[m] = self.prev[m].cs.mean(axis=0).sum()
# This is the policy that we just used under the dynamics that
# were estimated from the prev samples (so this is the cost
# we thought we would have).
prev_predicted[m] = self.traj_opt.estimate_cost(
prev_lg, self.prev[m].traj_info
).sum()
# Compute current cost.
cur_laplace = np.empty(self.M)
cur_mc = np.empty(self.M)
for m in range(self.M):
cur_nn = self.cur[m].pol_info.traj_distr()
# This is the actual cost we have under the current trajectory
# based on the latest samples.
cur_laplace[m] = self.traj_opt.estimate_cost(
cur_nn, self.cur[m].traj_info
).sum()
cur_mc[m] = self.cur[m].cs.mean(axis=0).sum()
# Compute predicted and actual improvement.
prev_laplace = prev_laplace.mean()
prev_mc = prev_mc.mean()
prev_predicted = prev_predicted.mean()
cur_laplace = cur_laplace.mean()
cur_mc = cur_mc.mean()
if self._hyperparams['step_rule'] == 'laplace':
predicted_impr = prev_laplace - prev_predicted
actual_impr = prev_laplace - cur_laplace
elif self._hyperparams['step_rule'] == 'mc':
predicted_impr = prev_mc - prev_predicted
actual_impr = prev_mc - cur_mc
LOGGER.debug('Previous cost: Laplace: %f, MC: %f',
prev_laplace, prev_mc)
LOGGER.debug('Predicted cost: Laplace: %f', prev_predicted)
LOGGER.debug('Actual cost: Laplace: %f, MC: %f',
cur_laplace, cur_mc)
for m in range(self.M):
self._set_new_mult(predicted_impr, actual_impr, m)
def compute_costs(self, m, eta, augment=True):
""" Compute cost estimates used in the LQR backward pass. """
traj_info, traj_distr = self.cur[m].traj_info, self.cur[m].traj_distr
if not augment: # Whether to augment cost with term to penalize KL
return traj_info.Cm, traj_info.cv
pol_info = self.cur[m].pol_info
multiplier = self._hyperparams['max_ent_traj']
T, dU, dX = traj_distr.T, traj_distr.dU, traj_distr.dX
Cm, cv = np.copy(traj_info.Cm), np.copy(traj_info.cv)
PKLm = np.zeros((T, dX+dU, dX+dU))
PKLv = np.zeros((T, dX+dU))
fCm, fcv = np.zeros(Cm.shape), np.zeros(cv.shape)
for t in range(T):
# Policy KL-divergence terms.
inv_pol_S = np.linalg.solve(
pol_info.chol_pol_S[t, :, :],
np.linalg.solve(pol_info.chol_pol_S[t, :, :].T, np.eye(dU))
)
KB, kB = pol_info.pol_K[t, :, :], pol_info.pol_k[t, :]
PKLm[t, :, :] = np.vstack([
np.hstack([KB.T.dot(inv_pol_S).dot(KB), -KB.T.dot(inv_pol_S)]),
np.hstack([-inv_pol_S.dot(KB), inv_pol_S])
])
PKLv[t, :] = np.concatenate([
KB.T.dot(inv_pol_S).dot(kB), -inv_pol_S.dot(kB)
])
fCm[t, :, :] = (Cm[t, :, :] + PKLm[t, :, :] * eta) / (eta + multiplier)
fcv[t, :] = (cv[t, :] + PKLv[t, :] * eta) / (eta + multiplier)
return fCm, fcv
In [15]:
class AlgorithmTrajOpt(Algorithm):
""" Sample-based trajectory optimization. """
def __init__(self, hyperparams):
Algorithm.__init__(self, hyperparams)
def iteration(self, sample_lists):
"""
Run iteration of LQR.
Args:
sample_lists: List of SampleList objects for each condition.
"""
for m in range(self.M):
self.cur[m].sample_list = sample_lists[m]
# Update dynamics model using all samples.
self._update_dynamics()
self._update_step_size() # KL Divergence step size.
# Run inner loop to compute new policies.
for _ in range(self._hyperparams['inner_iterations']):
self._update_trajectories()
self._advance_iteration_variables()
def _update_step_size(self):
""" Evaluate costs on samples, and adjust the step size. """
# Evaluate cost function for all conditions and samples.
for m in range(self.M):
self._eval_cost(m)
# Adjust step size relative to the previous iteration.
for m in range(self.M):
if self.iteration_count >= 1 and self.prev[m].sample_list:
self._stepadjust(m)
def _stepadjust(self, m):
"""
Calculate new step sizes.
Args:
m: Condition
"""
# Compute values under Laplace approximation. This is the policy
# that the previous samples were actually drawn from under the
# dynamics that were estimated from the previous samples.
previous_laplace_obj = self.traj_opt.estimate_cost(
self.prev[m].traj_distr, self.prev[m].traj_info
)
# This is the policy that we just used under the dynamics that
# were estimated from the previous samples (so this is the cost
# we thought we would have).
new_predicted_laplace_obj = self.traj_opt.estimate_cost(
self.cur[m].traj_distr, self.prev[m].traj_info
)
# This is the actual cost we have under the current trajectory
# based on the latest samples.
new_actual_laplace_obj = self.traj_opt.estimate_cost(
self.cur[m].traj_distr, self.cur[m].traj_info
)
# Measure the entropy of the current trajectory (for printout).
ent = self._measure_ent(m)
# Compute actual objective values based on the samples.
previous_mc_obj = np.mean(np.sum(self.prev[m].cs, axis=1), axis=0)
new_mc_obj = np.mean(np.sum(self.cur[m].cs, axis=1), axis=0)
LOGGER.debug('Trajectory step: ent: %f cost: %f -> %f',
ent, previous_mc_obj, new_mc_obj)
# Compute predicted and actual improvement.
predicted_impr = np.sum(previous_laplace_obj) - \
np.sum(new_predicted_laplace_obj)
actual_impr = np.sum(previous_laplace_obj) - \
np.sum(new_actual_laplace_obj)
# Print improvement details.
LOGGER.debug('Previous cost: Laplace: %f MC: %f',
np.sum(previous_laplace_obj), previous_mc_obj)
LOGGER.debug('Predicted new cost: Laplace: %f MC: %f',
np.sum(new_predicted_laplace_obj), new_mc_obj)
LOGGER.debug('Actual new cost: Laplace: %f MC: %f',
np.sum(new_actual_laplace_obj), new_mc_obj)
LOGGER.debug('Predicted/actual improvement: %f / %f',
predicted_impr, actual_impr)
self._set_new_mult(predicted_impr, actual_impr, m)
def compute_costs(self, m, eta, augment=True):
""" Compute cost estimates used in the LQR backward pass. """
traj_info, traj_distr = self.cur[m].traj_info, self.cur[m].traj_distr
if not augment: # Whether to augment cost with term to penalize KL
return traj_info.Cm, traj_info.cv
multiplier = self._hyperparams['max_ent_traj']
fCm, fcv = traj_info.Cm / (eta + multiplier), traj_info.cv / (eta + multiplier)
K, ipc, k = traj_distr.K, traj_distr.inv_pol_covar, traj_distr.k
# Add in the trajectory divergence term.
for t in range(self.T - 1, -1, -1):
fCm[t, :, :] += eta / (eta + multiplier) * np.vstack([
np.hstack([
K[t, :, :].T.dot(ipc[t, :, :]).dot(K[t, :, :]),
-K[t, :, :].T.dot(ipc[t, :, :])
]),
np.hstack([
-ipc[t, :, :].dot(K[t, :, :]), ipc[t, :, :]
])
])
fcv[t, :] += eta / (eta + multiplier) * np.hstack([
K[t, :, :].T.dot(ipc[t, :, :]).dot(k[t, :]),
-ipc[t, :, :].dot(k[t, :])
])
return fCm, fcv
In [16]:
import abc
class TrajOpt(object):
""" Trajectory optimization superclass. """
__metaclass__ = abc.ABCMeta
def __init__(self, hyperparams):
self._hyperparams = hyperparams
@abc.abstractmethod
def update(self):
""" Update trajectory distributions. """
raise NotImplementedError("Must be implemented in subclass.")
In [17]:
""" This file defines code for iLQG-based trajectory optimization. """
from numpy.linalg import LinAlgError
import scipy as sp
# Constants used in TrajOptLQR.
DGD_MAX_ITER = 50
DGD_MAX_LS_ITER = 20
DGD_MAX_GD_ITER = 200
ALPHA, BETA1, BETA2, EPS = 0.005, 0.9, 0.999, 1e-8 # Adam parameters
def traj_distr_kl(new_mu, new_sigma, new_traj_distr, prev_traj_distr, tot=True):
"""
Compute KL divergence between new and previous trajectory
distributions.
Args:
new_mu: T x dX, mean of new trajectory distribution.
new_sigma: T x dX x dX, variance of new trajectory distribution.
new_traj_distr: A linear Gaussian policy object, new
distribution.
prev_traj_distr: A linear Gaussian policy object, previous
distribution.
tot: Whether or not to sum KL across all time steps.
Returns:
kl_div: The KL divergence between the new and previous
trajectories.
"""
# Constants.
T = new_mu.shape[0]
dU = new_traj_distr.dU
# Initialize vector of divergences for each time step.
kl_div = np.zeros(T)
# Step through trajectory.
for t in range(T):
# Fetch matrices and vectors from trajectory distributions.
mu_t = new_mu[t, :]
sigma_t = new_sigma[t, :, :]
K_prev = prev_traj_distr.K[t, :, :]
K_new = new_traj_distr.K[t, :, :]
k_prev = prev_traj_distr.k[t, :]
k_new = new_traj_distr.k[t, :]
chol_prev = prev_traj_distr.chol_pol_covar[t, :, :]
chol_new = new_traj_distr.chol_pol_covar[t, :, :]
# Compute log determinants and precision matrices.
logdet_prev = 2 * sum(np.log(np.diag(chol_prev)))
logdet_new = 2 * sum(np.log(np.diag(chol_new)))
prc_prev = sp.linalg.solve_triangular(
chol_prev, sp.linalg.solve_triangular(chol_prev.T, np.eye(dU),
lower=True)
)
prc_new = sp.linalg.solve_triangular(
chol_new, sp.linalg.solve_triangular(chol_new.T, np.eye(dU),
lower=True)
)
# Construct matrix, vector, and constants.
M_prev = np.r_[
np.c_[K_prev.T.dot(prc_prev).dot(K_prev), -K_prev.T.dot(prc_prev)],
np.c_[-prc_prev.dot(K_prev), prc_prev]
]
M_new = np.r_[
np.c_[K_new.T.dot(prc_new).dot(K_new), -K_new.T.dot(prc_new)],
np.c_[-prc_new.dot(K_new), prc_new]
]
v_prev = np.r_[K_prev.T.dot(prc_prev).dot(k_prev),
-prc_prev.dot(k_prev)]
v_new = np.r_[K_new.T.dot(prc_new).dot(k_new), -prc_new.dot(k_new)]
c_prev = 0.5 * k_prev.T.dot(prc_prev).dot(k_prev)
c_new = 0.5 * k_new.T.dot(prc_new).dot(k_new)
# Compute KL divergence at timestep t.
kl_div[t] = max(
0,
-0.5 * mu_t.T.dot(M_new - M_prev).dot(mu_t) -
mu_t.T.dot(v_new - v_prev) - c_new + c_prev -
0.5 * np.sum(sigma_t * (M_new-M_prev)) - 0.5 * logdet_new +
0.5 * logdet_prev
)
# Add up divergences across time to get total divergence.
return np.sum(kl_div) if tot else kl_div
def traj_distr_kl_alt(new_mu, new_sigma, new_traj_distr, prev_traj_distr, tot=True):
"""
This function computes the same quantity as the function above.
However, it is easier to modify and understand this function, i.e.,
passing in a different mu and sigma to this function will behave properly.
"""
T, dX, dU = new_mu.shape[0], new_traj_distr.dX, new_traj_distr.dU
kl_div = np.zeros(T)
for t in range(T):
K_prev = prev_traj_distr.K[t, :, :]
K_new = new_traj_distr.K[t, :, :]
k_prev = prev_traj_distr.k[t, :]
k_new = new_traj_distr.k[t, :]
sig_prev = prev_traj_distr.pol_covar[t, :, :]
sig_new = new_traj_distr.pol_covar[t, :, :]
chol_prev = prev_traj_distr.chol_pol_covar[t, :, :]
chol_new = new_traj_distr.chol_pol_covar[t, :, :]
inv_prev = prev_traj_distr.inv_pol_covar[t, :, :]
inv_new = new_traj_distr.inv_pol_covar[t, :, :]
logdet_prev = 2 * sum(np.log(np.diag(chol_prev)))
logdet_new = 2 * sum(np.log(np.diag(chol_new)))
K_diff, k_diff = K_prev - K_new, k_prev - k_new
mu, sigma = new_mu[t, :dX], new_sigma[t, :dX, :dX]
kl_div[t] = max(
0,
0.5 * (logdet_prev - logdet_new - new_traj_distr.dU +
np.sum(np.diag(inv_prev.dot(sig_new))) +
k_diff.T.dot(inv_prev).dot(k_diff) +
mu.T.dot(K_diff.T).dot(inv_prev).dot(K_diff).dot(mu) +
np.sum(np.diag(K_diff.T.dot(inv_prev).dot(K_diff).dot(sigma))) +
2 * k_diff.T.dot(inv_prev).dot(K_diff).dot(mu))
)
return np.sum(kl_div) if tot else kl_div
# TrajOptLQRPython
TRAJ_OPT_LQR = {
# Dual variable updates for non-PD Q-function.
'del0': 1e-4,
'eta_error_threshold': 1e16,
'min_eta': 1e-8,
'max_eta': 1e16,
'cons_per_step': False, # Whether or not to enforce separate KL constraints at each time step.
'use_prev_distr': False, # Whether or not to measure expected KL under the previous traj distr.
'update_in_bwd_pass': True, # Whether or not to update the TVLG controller during the bwd pass.
}
class TrajOptLQRPython(TrajOpt):
""" LQR trajectory optimization, Python implementation. """
def __init__(self, hyperparams):
config = copy.deepcopy(TRAJ_OPT_LQR)
config.update(hyperparams)
TrajOpt.__init__(self, config)
self.cons_per_step = config['cons_per_step']
self._use_prev_distr = config['use_prev_distr']
self._update_in_bwd_pass = config['update_in_bwd_pass']
# TODO - Add arg and return spec on this function.
def update(self, m, algorithm):
""" Run dual gradient decent to optimize trajectories. """
T = algorithm.T
eta = algorithm.cur[m].eta
if self.cons_per_step and type(eta) in (int, float):
eta = np.ones(T) * eta
step_mult = algorithm.cur[m].step_mult
traj_info = algorithm.cur[m].traj_info
if isinstance(algorithm, AlgorithmMDGPS):
# For MDGPS, constrain to previous NN linearization
prev_traj_distr = algorithm.cur[m].pol_info.traj_distr()
else:
# For BADMM/trajopt, constrain to previous LG controller
prev_traj_distr = algorithm.cur[m].traj_distr
# Set KL-divergence step size (epsilon).
kl_step = algorithm.base_kl_step * step_mult
if not self.cons_per_step:
kl_step *= T
# We assume at min_eta, kl_div > kl_step, opposite for max_eta.
if not self.cons_per_step:
min_eta = self._hyperparams['min_eta']
max_eta = self._hyperparams['max_eta']
LOGGER.debug("Running DGD for trajectory %d, eta: %f", m, eta)
else:
min_eta = np.ones(T) * self._hyperparams['min_eta']
max_eta = np.ones(T) * self._hyperparams['max_eta']
LOGGER.debug("Running DGD for trajectory %d, avg eta: %f", m,
np.mean(eta[:-1]))
max_itr = (DGD_MAX_LS_ITER if self.cons_per_step else
DGD_MAX_ITER)
for itr in range(max_itr):
if not self.cons_per_step:
LOGGER.debug("Iteration %d, bracket: (%.2e , %.2e , %.2e)", itr,
min_eta, eta, max_eta)
# Run fwd/bwd pass, note that eta may be updated.
# Compute KL divergence constraint violation.
traj_distr, eta = self.backward(prev_traj_distr, traj_info,
eta, algorithm, m)
if not self._use_prev_distr:
new_mu, new_sigma = self.forward(traj_distr, traj_info)
kl_div = traj_distr_kl(
new_mu, new_sigma, traj_distr, prev_traj_distr,
tot=(not self.cons_per_step)
)
else:
prev_mu, prev_sigma = self.forward(prev_traj_distr, traj_info)
kl_div = traj_distr_kl_alt(
prev_mu, prev_sigma, traj_distr, prev_traj_distr,
tot=(not self.cons_per_step)
)
con = kl_div - kl_step
# Convergence check - constraint satisfaction.
if self._conv_check(con, kl_step):
if not self.cons_per_step:
LOGGER.debug("KL: %f / %f, converged iteration %d", kl_div,
kl_step, itr)
else:
LOGGER.debug(
"KL: %f / %f, converged iteration %d",
np.mean(kl_div[:-1]), np.mean(kl_step[:-1]), itr
)
break
if not self.cons_per_step:
# Choose new eta (bisect bracket or multiply by constant)
if con < 0: # Eta was too big.
max_eta = eta
geom = np.sqrt(min_eta*max_eta) # Geometric mean.
new_eta = max(geom, 0.1*max_eta)
LOGGER.debug("KL: %f / %f, eta too big, new eta: %f",
kl_div, kl_step, new_eta)
else: # Eta was too small.
min_eta = eta
geom = np.sqrt(min_eta*max_eta) # Geometric mean.
new_eta = min(geom, 10.0*min_eta)
LOGGER.debug("KL: %f / %f, eta too small, new eta: %f",
kl_div, kl_step, new_eta)
# Logarithmic mean: log_mean(x,y) = (y - x)/(log(y) - log(x))
eta = new_eta
else:
for t in range(T):
if con[t] < 0:
max_eta[t] = eta[t]
geom = np.sqrt(min_eta[t]*max_eta[t])
eta[t] = max(geom, 0.1*max_eta[t])
else:
min_eta[t] = eta[t]
geom = np.sqrt(min_eta[t]*max_eta[t])
eta[t] = min(geom, 10.0*min_eta[t])
if itr % 10 == 0:
LOGGER.debug("avg KL: %f / %f, avg new eta: %f",
np.mean(kl_div[:-1]), np.mean(kl_step[:-1]),
np.mean(eta[:-1]))
if (self.cons_per_step and not self._conv_check(con, kl_step)):
m_b, v_b = np.zeros(T-1), np.zeros(T-1)
for itr in range(DGD_MAX_GD_ITER):
traj_distr, eta = self.backward(prev_traj_distr, traj_info,
eta, algorithm, m)
if not self._use_prev_distr:
new_mu, new_sigma = self.forward(traj_distr, traj_info)
kl_div = traj_distr_kl(
new_mu, new_sigma, traj_distr, prev_traj_distr,
tot=False
)
else:
prev_mu, prev_sigma = self.forward(prev_traj_distr,
traj_info)
kl_div = traj_distr_kl_alt(
prev_mu, prev_sigma, traj_distr, prev_traj_distr,
tot=False
)
con = kl_div - kl_step
if self._conv_check(con, kl_step):
LOGGER.debug(
"KL: %f / %f, converged iteration %d",
np.mean(kl_div[:-1]), np.mean(kl_step[:-1]), itr
)
break
m_b = (BETA1 * m_b + (1-BETA1) * con[:-1])
m_u = m_b / (1 - BETA1 ** (itr+1))
v_b = (BETA2 * v_b + (1-BETA2) * np.square(con[:-1]))
v_u = v_b / (1 - BETA2 ** (itr+1))
eta[:-1] = np.minimum(
np.maximum(eta[:-1] + ALPHA * m_u / (np.sqrt(v_u) + EPS),
self._hyperparams['min_eta']),
self._hyperparams['max_eta']
)
if itr % 10 == 0:
LOGGER.debug("avg KL: %f / %f, avg new eta: %f",
np.mean(kl_div[:-1]), np.mean(kl_step[:-1]),
np.mean(eta[:-1]))
if (np.mean(kl_div) > np.mean(kl_step) and
not self._conv_check(con, kl_step)):
LOGGER.warning(
"Final KL divergence after DGD convergence is too high."
)
return traj_distr, eta
def estimate_cost(self, traj_distr, traj_info):
""" Compute Laplace approximation to expected cost. """
# Constants.
T = traj_distr.T
# Perform forward pass (note that we repeat this here, because
# traj_info may have different dynamics from the ones that were
# used to compute the distribution already saved in traj).
mu, sigma = self.forward(traj_distr, traj_info)
# Compute cost.
predicted_cost = np.zeros(T)
for t in range(T):
predicted_cost[t] = traj_info.cc[t] + 0.5 * \
np.sum(sigma[t, :, :] * traj_info.Cm[t, :, :]) + 0.5 * \
mu[t, :].T.dot(traj_info.Cm[t, :, :]).dot(mu[t, :]) + \
mu[t, :].T.dot(traj_info.cv[t, :])
return predicted_cost
def forward(self, traj_distr, traj_info):
"""
Perform LQR forward pass. Computes state-action marginals from
dynamics and policy.
Args:
traj_distr: A linear Gaussian policy object.
traj_info: A TrajectoryInfo object.
Returns:
mu: A T x dX mean action vector.
sigma: A T x dX x dX covariance matrix.
"""
# Compute state-action marginals from specified conditional
# parameters and current traj_info.
T = traj_distr.T
dU = traj_distr.dU
dX = traj_distr.dX
# Constants.
idx_x = slice(dX)
# Allocate space.
sigma = np.zeros((T, dX+dU, dX+dU))
mu = np.zeros((T, dX+dU))
# Pull out dynamics.
Fm = traj_info.dynamics.Fm
fv = traj_info.dynamics.fv
dyn_covar = traj_info.dynamics.dyn_covar
# Set initial covariance (initial mu is always zero).
sigma[0, idx_x, idx_x] = traj_info.x0sigma
mu[0, idx_x] = traj_info.x0mu
for t in range(T):
sigma[t, :, :] = np.vstack([
np.hstack([
sigma[t, idx_x, idx_x],
sigma[t, idx_x, idx_x].dot(traj_distr.K[t, :, :].T)
]),
np.hstack([
traj_distr.K[t, :, :].dot(sigma[t, idx_x, idx_x]),
traj_distr.K[t, :, :].dot(sigma[t, idx_x, idx_x]).dot(
traj_distr.K[t, :, :].T
) + traj_distr.pol_covar[t, :, :]
])
])
mu[t, :] = np.hstack([
mu[t, idx_x],
traj_distr.K[t, :, :].dot(mu[t, idx_x]) + traj_distr.k[t, :]
])
if t < T - 1:
sigma[t+1, idx_x, idx_x] = \
Fm[t, :, :].dot(sigma[t, :, :]).dot(Fm[t, :, :].T) + \
dyn_covar[t, :, :]
mu[t+1, idx_x] = Fm[t, :, :].dot(mu[t, :]) + fv[t, :]
return mu, sigma
def backward(self, prev_traj_distr, traj_info, eta, algorithm, m):
"""
Perform LQR backward pass. This computes a new linear Gaussian
policy object.
Args:
prev_traj_distr: A linear Gaussian policy object from
previous iteration.
traj_info: A TrajectoryInfo object.
eta: Dual variable.
algorithm: Algorithm object needed to compute costs.
m: Condition number.
Returns:
traj_distr: A new linear Gaussian policy.
new_eta: The updated dual variable. Updates happen if the
Q-function is not PD.
"""
# Constants.
T = prev_traj_distr.T
dU = prev_traj_distr.dU
dX = prev_traj_distr.dX
if self._update_in_bwd_pass:
traj_distr = prev_traj_distr.nans_like()
else:
traj_distr = prev_traj_distr.copy()
# Store pol_wt if necessary
if type(algorithm) == AlgorithmBADMM:
pol_wt = algorithm.cur[m].pol_info.pol_wt
idx_x = slice(dX)
idx_u = slice(dX, dX+dU)
# Pull out dynamics.
Fm = traj_info.dynamics.Fm
fv = traj_info.dynamics.fv
# Non-SPD correction terms.
del_ = self._hyperparams['del0']
if self.cons_per_step:
del_ = np.ones(T) * del_
eta0 = eta
# Run dynamic programming.
fail = True
while fail:
fail = False # Flip to true on non-symmetric PD.
# Allocate.
Vxx = np.zeros((T, dX, dX))
Vx = np.zeros((T, dX))
Qtt = np.zeros((T, dX+dU, dX+dU))
Qt = np.zeros((T, dX+dU))
if not self._update_in_bwd_pass:
new_K, new_k = np.zeros((T, dU, dX)), np.zeros((T, dU))
new_pS = np.zeros((T, dU, dU))
new_ipS, new_cpS = np.zeros((T, dU, dU)), np.zeros((T, dU, dU))
fCm, fcv = algorithm.compute_costs(
m, eta, augment=(not self.cons_per_step)
)
# Compute state-action-state function at each time step.
for t in range(T - 1, -1, -1):
# Add in the cost.
Qtt[t] = fCm[t, :, :] # (X+U) x (X+U)
Qt[t] = fcv[t, :] # (X+U) x 1
# Add in the value function from the next time step.
if t < T - 1:
if type(algorithm) == AlgorithmBADMM:
multiplier = (pol_wt[t+1] + eta)/(pol_wt[t] + eta)
else:
multiplier = 1.0
Qtt[t] += multiplier * \
Fm[t, :, :].T.dot(Vxx[t+1, :, :]).dot(Fm[t, :, :])
Qt[t] += multiplier * \
Fm[t, :, :].T.dot(Vx[t+1, :] +
Vxx[t+1, :, :].dot(fv[t, :]))
# Symmetrize quadratic component.
Qtt[t] = 0.5 * (Qtt[t] + Qtt[t].T)
if not self.cons_per_step:
inv_term = Qtt[t, idx_u, idx_u]
k_term = Qt[t, idx_u]
K_term = Qtt[t, idx_u, idx_x]
else:
inv_term = (1.0 / eta[t]) * Qtt[t, idx_u, idx_u] + \
prev_traj_distr.inv_pol_covar[t]
k_term = (1.0 / eta[t]) * Qt[t, idx_u] - \
prev_traj_distr.inv_pol_covar[t].dot(prev_traj_distr.k[t])
K_term = (1.0 / eta[t]) * Qtt[t, idx_u, idx_x] - \
prev_traj_distr.inv_pol_covar[t].dot(prev_traj_distr.K[t])
# Compute Cholesky decomposition of Q function action
# component.
try:
U = sp.linalg.cholesky(inv_term)
L = U.T
except LinAlgError as e:
# Error thrown when Qtt[idx_u, idx_u] is not
# symmetric positive definite.
LOGGER.debug('LinAlgError: %s', e)
fail = t if self.cons_per_step else True
break
if self._hyperparams['update_in_bwd_pass']:
# Store conditional covariance, inverse, and Cholesky.
traj_distr.inv_pol_covar[t, :, :] = inv_term
traj_distr.pol_covar[t, :, :] = sp.linalg.solve_triangular(
U, sp.linalg.solve_triangular(L, np.eye(dU), lower=True)
)
traj_distr.chol_pol_covar[t, :, :] = sp.linalg.cholesky(
traj_distr.pol_covar[t, :, :]
)
# Compute mean terms.
traj_distr.k[t, :] = -sp.linalg.solve_triangular(
U, sp.linalg.solve_triangular(L, k_term, lower=True)
)
traj_distr.K[t, :, :] = -sp.linalg.solve_triangular(
U, sp.linalg.solve_triangular(L, K_term, lower=True)
)
else:
# Store conditional covariance, inverse, and Cholesky.
new_ipS[t, :, :] = inv_term
new_pS[t, :, :] = sp.linalg.solve_triangular(
U, sp.linalg.solve_triangular(L, np.eye(dU), lower=True)
)
new_cpS[t, :, :] = sp.linalg.cholesky(
new_pS[t, :, :]
)
# Compute mean terms.
new_k[t, :] = -sp.linalg.solve_triangular(
U, sp.linalg.solve_triangular(L, k_term, lower=True)
)
new_K[t, :, :] = -sp.linalg.solve_triangular(
U, sp.linalg.solve_triangular(L, K_term, lower=True)
)
# Compute value function.
if (self.cons_per_step or
not self._hyperparams['update_in_bwd_pass']):
Vxx[t, :, :] = Qtt[t, idx_x, idx_x] + \
traj_distr.K[t].T.dot(Qtt[t, idx_u, idx_u]).dot(traj_distr.K[t]) + \
(2 * Qtt[t, idx_x, idx_u]).dot(traj_distr.K[t])
Vx[t, :] = Qt[t, idx_x].T + \
Qt[t, idx_u].T.dot(traj_distr.K[t]) + \
traj_distr.k[t].T.dot(Qtt[t, idx_u, idx_u]).dot(traj_distr.K[t]) + \
Qtt[t, idx_x, idx_u].dot(traj_distr.k[t])
else:
Vxx[t, :, :] = Qtt[t, idx_x, idx_x] + \
Qtt[t, idx_x, idx_u].dot(traj_distr.K[t, :, :])
Vx[t, :] = Qt[t, idx_x] + \
Qtt[t, idx_x, idx_u].dot(traj_distr.k[t, :])
Vxx[t, :, :] = 0.5 * (Vxx[t, :, :] + Vxx[t, :, :].T)
if not self._hyperparams['update_in_bwd_pass']:
traj_distr.K, traj_distr.k = new_K, new_k
traj_distr.pol_covar = new_pS
traj_distr.inv_pol_covar = new_ipS
traj_distr.chol_pol_covar = new_cpS
# Increment eta on non-SPD Q-function.
if fail:
if not self.cons_per_step:
old_eta = eta
eta = eta0 + del_
LOGGER.debug('Increasing eta: %f -> %f', old_eta, eta)
del_ *= 2 # Increase del_ exponentially on failure.
else:
old_eta = eta[fail]
eta[fail] = eta0[fail] + del_[fail]
LOGGER.debug('Increasing eta %d: %f -> %f',
fail, old_eta, eta[fail])
del_[fail] *= 2 # Increase del_ exponentially on failure.
if self.cons_per_step:
fail_check = (eta[fail] >= 1e16)
else:
fail_check = (eta >= 1e16)
if fail_check:
if np.any(np.isnan(Fm)) or np.any(np.isnan(fv)):
raise ValueError('NaNs encountered in dynamics!')
raise ValueError('Failed to find PD solution even for very \
large eta (check that dynamics and cost are \
reasonably well conditioned)!')
return traj_distr, eta
def _conv_check(self, con, kl_step):
"""Function that checks whether dual gradient descent has converged."""
if self.cons_per_step:
return all([abs(con[t]) < (0.1*kl_step[t]) for t in range(con.size)])
return abs(con) < 0.1 * kl_step
First, utilities and functions
In [18]:
RAMP_CONSTANT = 1
RAMP_LINEAR = 2
RAMP_QUADRATIC = 3
RAMP_FINAL_ONLY = 4
# CostAction
COST_ACTION = {
'wu': np.array([]), # Torque penalties, must be 1 x dU numpy array.
}
# CostState
COST_STATE = {
'ramp_option': RAMP_CONSTANT, # How target cost ramps over time.
'l1': 0.0,
'l2': 1.0,
'alpha': 1e-2,
'wp_final_multiplier': 1.0, # Weight multiplier on final time step.
'data_types': {
'JointAngle': {
'target_state': None, # Target state - must be set.
'wp': None, # State weights - must be set.
},
},
}
# CostSum
COST_SUM = {
'costs': [], # A list of hyperparam dictionaries for each cost.
'weights': [], # Weight multipliers for each cost.
}
def get_ramp_multiplier(ramp_option, T, wp_final_multiplier=1.0):
"""
Return a time-varying multiplier.
Returns:
A (T,) float vector containing weights for each time step.
"""
if ramp_option == RAMP_CONSTANT:
wpm = np.ones(T)
elif ramp_option == RAMP_LINEAR:
wpm = (np.arange(T, dtype=np.float32) + 1) / T
elif ramp_option == RAMP_QUADRATIC:
wpm = ((np.arange(T, dtype=np.float32) + 1) / T) ** 2
elif ramp_option == RAMP_FINAL_ONLY:
wpm = np.zeros(T)
wpm[T-1] = 1.0
else:
raise ValueError('Unknown cost ramp requested!')
wpm[-1] *= wp_final_multiplier
return wpm
def evall1l2term(wp, d, Jd, Jdd, l1, l2, alpha):
"""
Evaluate and compute derivatives for combined l1/l2 norm penalty.
loss = (0.5 * l2 * d^2) + (l1 * sqrt(alpha + d^2))
Args:
wp: T x D matrix with weights for each dimension and time step.
d: T x D states to evaluate norm on.
Jd: T x D x Dx Jacobian - derivative of d with respect to state.
Jdd: T x D x Dx x Dx Jacobian - 2nd derivative of d with respect
to state.
l1: l1 loss weight.
l2: l2 loss weight.
alpha: Constant added in square root.
"""
# Get trajectory length.
T, _ = d.shape
# Compute scaled quantities.
sqrtwp = np.sqrt(wp)
dsclsq = d * sqrtwp
dscl = d * wp
dscls = d * (wp ** 2)
# Compute total cost.
l = 0.5 * np.sum(dsclsq ** 2, axis=1) * l2 + \
np.sqrt(alpha + np.sum(dscl ** 2, axis=1)) * l1
# First order derivative terms.
d1 = dscl * l2 + (
dscls / np.sqrt(alpha + np.sum(dscl ** 2, axis=1, keepdims=True)) * l1
)
lx = np.sum(Jd * np.expand_dims(d1, axis=2), axis=1)
# Second order terms.
psq = np.expand_dims(
np.sqrt(alpha + np.sum(dscl ** 2, axis=1, keepdims=True)), axis=1
)
d2 = l1 * (
(np.expand_dims(np.eye(wp.shape[1]), axis=0) *
(np.expand_dims(wp ** 2, axis=1) / psq)) -
((np.expand_dims(dscls, axis=1) *
np.expand_dims(dscls, axis=2)) / psq ** 3)
)
d2 += l2 * (
np.expand_dims(wp, axis=2) * np.tile(np.eye(wp.shape[1]), [T, 1, 1])
)
d1_expand = np.expand_dims(np.expand_dims(d1, axis=-1), axis=-1)
sec = np.sum(d1_expand * Jdd, axis=1)
Jd_expand_1 = np.expand_dims(np.expand_dims(Jd, axis=2), axis=4)
Jd_expand_2 = np.expand_dims(np.expand_dims(Jd, axis=1), axis=3)
d2_expand = np.expand_dims(np.expand_dims(d2, axis=-1), axis=-1)
lxx = np.sum(np.sum(Jd_expand_1 * Jd_expand_2 * d2_expand, axis=1), axis=1)
lxx += 0.5 * sec + 0.5 * np.transpose(sec, [0, 2, 1])
return l, lx, lxx
def evallogl2term(wp, d, Jd, Jdd, l1, l2, alpha):
"""
Evaluate and compute derivatives for combined l1/l2 norm penalty.
loss = (0.5 * l2 * d^2) + (0.5 * l1 * log(alpha + d^2))
Args:
wp: T x D matrix with weights for each dimension and time step.
d: T x D states to evaluate norm on.
Jd: T x D x Dx Jacobian - derivative of d with respect to state.
Jdd: T x D x Dx x Dx Jacobian - 2nd derivative of d with respect
to state.
l1: l1 loss weight.
l2: l2 loss weight.
alpha: Constant added in square root.
"""
# Get trajectory length.
T, _ = d.shape
# Compute scaled quantities.
sqrtwp = np.sqrt(wp)
dsclsq = d * sqrtwp
dscl = d * wp
dscls = d * (wp ** 2)
# Compute total cost.
l = 0.5 * np.sum(dsclsq ** 2, axis=1) * l2 + \
0.5 * np.log(alpha + np.sum(dscl ** 2, axis=1)) * l1
# First order derivative terms.
d1 = dscl * l2 + (
dscls / (alpha + np.sum(dscl ** 2, axis=1, keepdims=True)) * l1
)
lx = np.sum(Jd * np.expand_dims(d1, axis=2), axis=1)
# Second order terms.
psq = np.expand_dims(
alpha + np.sum(dscl ** 2, axis=1, keepdims=True), axis=1
)
#TODO: Need * 2.0 somewhere in following line, or * 0.0 which is
# wrong but better.
d2 = l1 * (
(np.expand_dims(np.eye(wp.shape[1]), axis=0) *
(np.expand_dims(wp ** 2, axis=1) / psq)) -
((np.expand_dims(dscls, axis=1) *
np.expand_dims(dscls, axis=2)) / psq ** 2)
)
d2 += l2 * (
np.expand_dims(wp, axis=2) * np.tile(np.eye(wp.shape[1]), [T, 1, 1])
)
d1_expand = np.expand_dims(np.expand_dims(d1, axis=-1), axis=-1)
sec = np.sum(d1_expand * Jdd, axis=1)
Jd_expand_1 = np.expand_dims(np.expand_dims(Jd, axis=2), axis=4)
Jd_expand_2 = np.expand_dims(np.expand_dims(Jd, axis=1), axis=3)
d2_expand = np.expand_dims(np.expand_dims(d2, axis=-1), axis=-1)
lxx = np.sum(np.sum(Jd_expand_1 * Jd_expand_2 * d2_expand, axis=1), axis=1)
lxx += 0.5 * sec + 0.5 * np.transpose(sec, [0, 2, 1])
return l, lx, lxx
In [19]:
class Cost(object):
""" Cost superclass. """
__metaclass__ = abc.ABCMeta
def __init__(self, hyperparams):
self._hyperparams = hyperparams
@abc.abstractmethod
def eval(self, sample):
"""
Evaluate cost function and derivatives.
Args:
sample: A single sample.
"""
raise NotImplementedError("Must be implemented in subclass.")
class CostAction(Cost):
""" Computes torque penalties. """
def __init__(self, hyperparams):
config = copy.deepcopy(COST_ACTION)
config.update(hyperparams)
Cost.__init__(self, config)
def eval(self, sample):
"""
Evaluate cost function and derivatives on a sample.
Args:
sample: A single sample
"""
sample_u = sample.get_U()
T = sample.T
Du = sample.dU
Dx = sample.dX
l = 0.5 * np.sum(self._hyperparams['wu'] * (sample_u ** 2), axis=1)
lu = self._hyperparams['wu'] * sample_u
lx = np.zeros((T, Dx))
luu = np.tile(np.diag(self._hyperparams['wu']), [T, 1, 1])
lxx = np.zeros((T, Dx, Dx))
lux = np.zeros((T, Du, Dx))
return l, lx, lu, lxx, luu, lux
class CostState(Cost):
""" Computes l1/l2 distance to a fixed target state. """
def __init__(self, hyperparams):
config = copy.deepcopy(COST_STATE)
config.update(hyperparams)
Cost.__init__(self, config)
def eval(self, sample):
"""
Evaluate cost function and derivatives on a sample.
Args:
sample: A single sample
"""
T = sample.T
Du = sample.dU
Dx = sample.dX
final_l = np.zeros(T)
final_lu = np.zeros((T, Du))
final_lx = np.zeros((T, Dx))
final_luu = np.zeros((T, Du, Du))
final_lxx = np.zeros((T, Dx, Dx))
final_lux = np.zeros((T, Du, Dx))
for data_type in self._hyperparams['data_types']:
config = self._hyperparams['data_types'][data_type]
wp = config['wp']
tgt = config['target_state']
x = sample.get(data_type)
_, dim_sensor = x.shape
wpm = get_ramp_multiplier(
self._hyperparams['ramp_option'], T,
wp_final_multiplier=self._hyperparams['wp_final_multiplier']
)
wp = wp * np.expand_dims(wpm, axis=-1)
# Compute state penalty.
dist = x - tgt
# Evaluate penalty term.
l, ls, lss = evall1l2term(
wp, dist, np.tile(np.eye(dim_sensor), [T, 1, 1]),
np.zeros((T, dim_sensor, dim_sensor, dim_sensor)),
self._hyperparams['l1'], self._hyperparams['l2'],
self._hyperparams['alpha']
)
final_l += l
sample.agent.pack_data_x(final_lx, ls, data_types=[data_type])
sample.agent.pack_data_x(final_lxx, lss,
data_types=[data_type, data_type])
return final_l, final_lx, final_lu, final_lxx, final_luu, final_lux
class CostSum(Cost):
""" A wrapper cost function that adds other cost functions. """
def __init__(self, hyperparams):
config = copy.deepcopy(COST_SUM)
config.update(hyperparams)
Cost.__init__(self, config)
self._costs = []
self._weights = self._hyperparams['weights']
for cost in self._hyperparams['costs']:
self._costs.append(cost['type'](cost))
def eval(self, sample):
"""
Evaluate cost function and derivatives.
Args:
sample: A single sample
"""
l, lx, lu, lxx, luu, lux = self._costs[0].eval(sample)
# Compute weighted sum of each cost value and derivatives.
weight = self._weights[0]
l = l * weight
lx = lx * weight
lu = lu * weight
lxx = lxx * weight
luu = luu * weight
lux = lux * weight
for i in range(1, len(self._costs)):
pl, plx, plu, plxx, pluu, plux = self._costs[i].eval(sample)
weight = self._weights[i]
l = l + pl * weight
lx = lx + plx * weight
lu = lu + plu * weight
lxx = lxx + plxx * weight
luu = luu + pluu * weight
lux = lux + plux * weight
return l, lx, lu, lxx, luu, lux
In [20]:
def generate_experiment_info(config):
"""
Generate experiment info, to be displayed by GPS Trainig GUI.
Assumes config is the config created in hyperparams.py
"""
common = config['common']
algorithm = config['algorithm']
if type(algorithm['cost']) == list:
algorithm_cost_type = algorithm['cost'][0]['type'].__name__
if (algorithm_cost_type) == 'CostSum':
algorithm_cost_type += '(%s)' % ', '.join(
map(lambda cost: cost['type'].__name__,
algorithm['cost'][0]['costs']))
else:
algorithm_cost_type = algorithm['cost']['type'].__name__
if (algorithm_cost_type) == 'CostSum':
algorithm_cost_type += '(%s)' % ', '.join(
map(lambda cost: cost['type'].__name__,
algorithm['cost']['costs']))
if 'dynamics' in algorithm:
alg_dyn = str(algorithm['dynamics']['type'].__name__)
else:
alg_dyn = 'None'
return (
'exp_name: ' + str(common['experiment_name']) + '\n' +
'alg_type: ' + str(algorithm['type'].__name__) + '\n' +
'alg_dyn: ' + alg_dyn + '\n' +
'alg_cost: ' + str(algorithm_cost_type) + '\n' +
'iterations: ' + str(config['iterations']) + '\n' +
'conditions: ' + str(algorithm['conditions']) + '\n' +
'samples: ' + str(config['num_samples']) + '\n'
)
In [21]:
import logging
try:
import cPickle as pickle
except:
import pickle
LOGGER = logging.getLogger(__name__)
class DataLogger(object):
"""
This class pickles data into files and unpickles data from files.
TODO: Handle logging text to terminal, GUI text, and/or log file at
DEBUG, INFO, WARN, ERROR, FATAL levels.
TODO: Handle logging data to terminal, GUI text/plots, and/or data
files.
"""
def __init__(self):
pass
def pickle(self, filename, data):
""" Pickle data into file specified by filename. """
pickle.dump(data, open(filename, 'wb'))
def unpickle(self, filename):
""" Unpickle data from file specified by filename. """
try:
return pickle.load(open(filename, 'rb'))
except IOError:
LOGGER.debug('Unpickle error. Cannot find file: %s', filename)
return None
In [22]:
from gps_pb2 import ACTION
# Agent
AGENT = {
'dH': 0,
'x0var': 0,
'noisy_body_idx': np.array([]),
'noisy_body_var': np.array([]),
'pos_body_idx': np.array([]),
'pos_body_offset': np.array([]),
'smooth_noise': True,
'smooth_noise_var': 2.0,
'smooth_noise_renormalize': True,
}
class Agent(object):
"""
Agent superclass. The agent interacts with the environment to
collect samples.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, hyperparams):
config = copy.deepcopy(AGENT)
config.update(hyperparams)
self._hyperparams = config
# Store samples, along with size/index information for samples.
self._samples = [[] for _ in range(self._hyperparams['conditions'])]
self.T = self._hyperparams['T']
self.dU = self._hyperparams['sensor_dims'][ACTION]
self.x_data_types = self._hyperparams['state_include']
self.obs_data_types = self._hyperparams['obs_include']
if 'meta_include' in self._hyperparams:
self.meta_data_types = self._hyperparams['meta_include']
else:
self.meta_data_types = []
# List of indices for each data type in state X.
self._state_idx, i = [], 0
for sensor in self.x_data_types:
dim = self._hyperparams['sensor_dims'][sensor]
self._state_idx.append(list(range(i, i+dim)))
i += dim
self.dX = i
# List of indices for each data type in observation.
self._obs_idx, i = [], 0
for sensor in self.obs_data_types:
dim = self._hyperparams['sensor_dims'][sensor]
self._obs_idx.append(list(range(i, i+dim)))
i += dim
self.dO = i
# List of indices for each data type in meta data.
self._meta_idx, i = [], 0
for sensor in self.meta_data_types:
dim = self._hyperparams['sensor_dims'][sensor]
self._meta_idx.append(list(range(i, i+dim)))
i += dim
self.dM = i
self._x_data_idx = {d: i for d, i in zip(self.x_data_types,
self._state_idx)}
self._obs_data_idx = {d: i for d, i in zip(self.obs_data_types,
self._obs_idx)}
self._meta_data_idx = {d: i for d, i in zip(self.meta_data_types,
self._meta_idx)}
@abc.abstractmethod
def sample(self, policy, condition, verbose=True, save=True, noisy=True):
"""
Draw a sample from the environment, using the specified policy
and under the specified condition, with or without noise.
"""
raise NotImplementedError("Must be implemented in subclass.")
def reset(self, condition):
""" Reset environment to the specified condition. """
pass # May be overridden in subclass.
def get_samples(self, condition, start=0, end=None):
"""
Return the requested samples based on the start and end indices.
Args:
start: Starting index of samples to return.
end: End index of samples to return.
"""
return (SampleList(self._samples[condition][start:]) if end is None
else SampleList(self._samples[condition][start:end]))
def clear_samples(self, condition=None):
"""
Reset the samples for a given condition, defaulting to all conditions.
Args:
condition: Condition for which to reset samples.
"""
if condition is None:
self._samples = [[] for _ in range(self._hyperparams['conditions'])]
else:
self._samples[condition] = []
def delete_last_sample(self, condition):
""" Delete the last sample from the specified condition. """
self._samples[condition].pop()
def get_idx_x(self, sensor_name):
"""
Return the indices corresponding to a certain state sensor name.
Args:
sensor_name: The name of the sensor.
"""
return self._x_data_idx[sensor_name]
def get_idx_obs(self, sensor_name):
"""
Return the indices corresponding to a certain observation sensor name.
Args:
sensor_name: The name of the sensor.
"""
return self._obs_data_idx[sensor_name]
def pack_data_obs(self, existing_mat, data_to_insert, data_types,
axes=None):
"""
Update the observation matrix with new data.
Args:
existing_mat: Current observation matrix.
data_to_insert: New data to insert into the existing matrix.
data_types: Name of the sensors to insert data for.
axes: Which axes to insert data. Defaults to the last axes.
"""
num_sensor = len(data_types)
if axes is None:
# If axes not specified, assume indexing on last dimensions.
axes = list(range(-1, -num_sensor - 1, -1))
else:
# Make sure number of sensors and axes are consistent.
if num_sensor != len(axes):
raise ValueError(
'Length of sensors (%d) must equal length of axes (%d)',
num_sensor, len(axes)
)
# Shape checks.
insert_shape = list(existing_mat.shape)
for i in range(num_sensor):
# Make sure to slice along X.
if existing_mat.shape[axes[i]] != self.dO:
raise ValueError('Axes must be along an dX=%d dimensional axis',
self.dO)
insert_shape[axes[i]] = len(self._obs_data_idx[data_types[i]])
if tuple(insert_shape) != data_to_insert.shape:
raise ValueError('Data has shape %s. Expected %s',
data_to_insert.shape, tuple(insert_shape))
# Actually perform the slice.
index = [slice(None) for _ in range(len(existing_mat.shape))]
for i in range(num_sensor):
index[axes[i]] = slice(self._obs_data_idx[data_types[i]][0],
self._obs_data_idx[data_types[i]][-1] + 1)
existing_mat[index] = data_to_insert
def pack_data_meta(self, existing_mat, data_to_insert, data_types,
axes=None):
"""
Update the meta data matrix with new data.
Args:
existing_mat: Current meta data matrix.
data_to_insert: New data to insert into the existing matrix.
data_types: Name of the sensors to insert data for.
axes: Which axes to insert data. Defaults to the last axes.
"""
num_sensor = len(data_types)
if axes is None:
# If axes not specified, assume indexing on last dimensions.
axes = list(range(-1, -num_sensor - 1, -1))
else:
# Make sure number of sensors and axes are consistent.
if num_sensor != len(axes):
raise ValueError(
'Length of sensors (%d) must equal length of axes (%d)',
num_sensor, len(axes)
)
# Shape checks.
insert_shape = list(existing_mat.shape)
for i in range(num_sensor):
# Make sure to slice along X.
if existing_mat.shape[axes[i]] != self.dM:
raise ValueError('Axes must be along an dX=%d dimensional axis',
self.dM)
insert_shape[axes[i]] = len(self._meta_data_idx[data_types[i]])
if tuple(insert_shape) != data_to_insert.shape:
raise ValueError('Data has shape %s. Expected %s',
data_to_insert.shape, tuple(insert_shape))
# Actually perform the slice.
index = [slice(None) for _ in range(len(existing_mat.shape))]
for i in range(num_sensor):
index[axes[i]] = slice(self._meta_data_idx[data_types[i]][0],
self._meta_data_idx[data_types[i]][-1] + 1)
existing_mat[index] = data_to_insert
def pack_data_x(self, existing_mat, data_to_insert, data_types, axes=None):
"""
Update the state matrix with new data.
Args:
existing_mat: Current state matrix.
data_to_insert: New data to insert into the existing matrix.
data_types: Name of the sensors to insert data for.
axes: Which axes to insert data. Defaults to the last axes.
"""
num_sensor = len(data_types)
if axes is None:
# If axes not specified, assume indexing on last dimensions.
axes = list(range(-1, -num_sensor - 1, -1))
else:
# Make sure number of sensors and axes are consistent.
if num_sensor != len(axes):
raise ValueError(
'Length of sensors (%d) must equal length of axes (%d)',
num_sensor, len(axes)
)
# Shape checks.
insert_shape = list(existing_mat.shape)
for i in range(num_sensor):
# Make sure to slice along X.
if existing_mat.shape[axes[i]] != self.dX:
raise ValueError('Axes must be along an dX=%d dimensional axis',
self.dX)
insert_shape[axes[i]] = len(self._x_data_idx[data_types[i]])
if tuple(insert_shape) != data_to_insert.shape:
raise ValueError('Data has shape %s. Expected %s',
data_to_insert.shape, tuple(insert_shape))
# Actually perform the slice.
index = [slice(None) for _ in range(len(existing_mat.shape))]
for i in range(num_sensor):
index[axes[i]] = slice(self._x_data_idx[data_types[i]][0],
self._x_data_idx[data_types[i]][-1] + 1)
existing_mat[index] = data_to_insert
def unpack_data_x(self, existing_mat, data_types, axes=None):
"""
Returns the requested data from the state matrix.
Args:
existing_mat: State matrix to unpack from.
data_types: Names of the sensor to unpack.
axes: Which axes to unpack along. Defaults to the last axes.
"""
num_sensor = len(data_types)
if axes is None:
# If axes not specified, assume indexing on last dimensions.
axes = list(range(-1, -num_sensor - 1, -1))
else:
# Make sure number of sensors and axes are consistent.
if num_sensor != len(axes):
raise ValueError(
'Length of sensors (%d) must equal length of axes (%d)',
num_sensor, len(axes)
)
# Shape checks.
for i in range(num_sensor):
# Make sure to slice along X.
if existing_mat.shape[axes[i]] != self.dX:
raise ValueError('Axes must be along an dX=%d dimensional axis',
self.dX)
# Actually perform the slice.
index = [slice(None) for _ in range(len(existing_mat.shape))]
for i in range(num_sensor):
index[axes[i]] = slice(self._x_data_idx[data_types[i]][0],
self._x_data_idx[data_types[i]][-1] + 1)
return existing_mat[index]
In [23]:
import scipy.ndimage as sp_ndimage
def generate_noise(T, dU, hyperparams):
"""
Generate a T x dU gaussian-distributed noise vector. This will
approximately have mean 0 and variance 1, ignoring smoothing.
Args:
T: Number of time steps.
dU: Dimensionality of actions.
Hyperparams:
smooth: Whether or not to perform smoothing of noise.
var : If smooth=True, applies a Gaussian filter with this
variance.
renorm : If smooth=True, renormalizes data to have variance 1
after smoothing.
"""
smooth, var = hyperparams['smooth_noise'], hyperparams['smooth_noise_var']
renorm = hyperparams['smooth_noise_renormalize']
noise = np.random.randn(T, dU)
if smooth:
# Smooth noise. This violates the controller assumption, but
# might produce smoother motions.
for i in range(dU):
noise[:, i] = sp_ndimage.filters.gaussian_filter(noise[:, i], var)
if renorm:
variance = np.var(noise, axis=0)
noise = noise / np.sqrt(variance)
return noise
def setup(value, n):
""" Go through various types of hyperparameters. """
if not isinstance(value, list):
try:
return [value.copy() for _ in range(n)]
except AttributeError:
return [value for _ in range(n)]
assert len(value) == n, \
'Number of elements must match number of conditions or 1.'
return value
In [24]:
from copy import deepcopy
from gps_pb2 import ACTION
AGENT_BOX2D = {
'render': True,
}
class AgentBox2D(Agent):
"""
All communication between the algorithms and Box2D is done through
this class.
"""
def __init__(self, hyperparams):
config = deepcopy(AGENT_BOX2D)
config.update(hyperparams)
Agent.__init__(self, config)
self._setup_conditions()
self._setup_world(self._hyperparams["world"],
self._hyperparams["target_state"],
self._hyperparams["render"])
def _setup_conditions(self):
"""
Helper method for setting some hyperparameters that may vary by
condition.
"""
conds = self._hyperparams['conditions']
for field in ('x0', 'x0var', 'pos_body_idx', 'pos_body_offset',
'noisy_body_idx', 'noisy_body_var'):
self._hyperparams[field] = setup(self._hyperparams[field], conds)
def _setup_world(self, world, target, render):
"""
Helper method for handling setup of the Box2D world.
"""
self.x0 = self._hyperparams["x0"]
self._worlds = [world(self.x0[i], target, render)
for i in range(self._hyperparams['conditions'])]
def sample(self, policy, condition, verbose=False, save=True, noisy=True):
"""
Runs a trial and constructs a new sample containing information
about the trial.
Args:
policy: Policy to to used in the trial.
condition (int): Which condition setup to run.
verbose (boolean): Whether or not to plot the trial (not used here).
save (boolean): Whether or not to store the trial into the samples.
noisy (boolean): Whether or not to use noise during sampling.
"""
self._worlds[condition].run()
self._worlds[condition].reset_world()
b2d_X = self._worlds[condition].get_state()
new_sample = self._init_sample(b2d_X)
U = np.zeros([self.T, self.dU])
if noisy:
noise = generate_noise(self.T, self.dU, self._hyperparams)
else:
noise = np.zeros((self.T, self.dU))
for t in range(self.T):
X_t = new_sample.get_X(t=t)
obs_t = new_sample.get_obs(t=t)
U[t, :] = policy.act(X_t, obs_t, t, noise[t, :])
if (t+1) < self.T:
for _ in range(self._hyperparams['substeps']):
self._worlds[condition].run_next(U[t, :])
b2d_X = self._worlds[condition].get_state()
self._set_sample(new_sample, b2d_X, t)
new_sample.set(ACTION, U)
if save:
self._samples[condition].append(new_sample)
return new_sample
def _init_sample(self, b2d_X):
"""
Construct a new sample and fill in the first time step.
"""
sample = Sample(self)
self._set_sample(sample, b2d_X, -1)
return sample
def _set_sample(self, sample, b2d_X, t):
for sensor in b2d_X.keys():
sample.set(sensor, np.array(b2d_X[sensor]), t=t+1)
In [25]:
class fwSettings(object):
""" This class contains the settings for Box2D's framwork. """
backend = 'pygame'
# Physics options
hz = 20.0
velocityIterations = 8
positionIterations = 3
enableWarmStarting = True
enableContinuous = True
enableSubStepping = False
# Drawing
drawStats = True
drawShapes = True
drawJoints = True
drawCoreShapes = False
drawAABBs = False
drawOBBs = False
drawPairs = False
drawContactPoints = False
maxContactPoints = 100
drawContactNormals = False
drawFPS = True
drawMenu = True # toggle by pressing F1
drawCOMs = False # Centers of mass
pointSize = 2.5 # pixel radius for drawing points
# Miscellaneous testbed options
pause = False
singleStep = False
onlyInit = False
# text variable
checkboxes = (("Warm Starting", "enableWarmStarting"),
("Time of Impact", "enableContinuous"),
("Sub-Stepping", "enableSubStepping"),
("Draw", None),
("Shapes", "drawShapes"),
("Joints", "drawJoints"),
("AABBs", "drawAABBs"),
("Pairs", "drawPairs"),
("Contact Points", "drawContactPoints"),
("Contact Normals", "drawContactNormals"),
("Center of Masses", "drawCOMs"),
("Statistics", "drawStats"),
("FPS", "drawFPS"),
("Control", None),
("Pause" "pause"),
("Single Step", "singleStep"))
sliders = [
{'name' : 'hz', 'text' : 'Hertz', 'min' : 5, 'max' : 200},
{'name' : 'positionIterations', 'text' :
'Pos Iters', 'min' : 0, 'max' : 100},
{'name' : 'velocityIterations', 'text' :
'Vel Iters', 'min' : 1, 'max' : 500},
]
list_options = [i for i in dir(fwSettings) if not i.startswith('_')]
In [26]:
import Box2D as b2
from time import time
class fwQueryCallback(b2.b2QueryCallback):
"""
This callback for each fixture in the world.
"""
def __init__(self, p):
super(fwQueryCallback, self).__init__()
self.point = p
self.fixture = None
def ReportFixture(self, fixture):
"""
This method is called to query for a fixture.
"""
body = fixture.body
if body.type == b2.b2_dynamicBody:
inside = fixture.TestPoint(self.point)
if inside:
self.fixture = fixture
# We found the object, so stop the query
return False
# Continue the query
return True
class FrameworkBase(b2.b2ContactListener):
"""
The base of the main Box2D GUI framework.
"""
name = "None"
description = None
TEXTLINE_START = 30
colors = {
'joint_line' : b2.b2Color(0.8, 0.8, 0.8),
'contact_add' : b2.b2Color(0.3, 0.95, 0.3),
'contact_persist' : b2.b2Color(0.3, 0.3, 0.95),
'contact_normal' : b2.b2Color(0.4, 0.9, 0.4),
}
def __reset(self):
""" Reset all of the variables to their starting values.
Not to be called except at initialization."""
# Box2D-related
self.points = []
self.settings = fwSettings
self.using_contacts = False
self.stepCount = 0
# Box2D-callbacks
self.destructionListener = None
self.renderer = None
def __init__(self):
super(FrameworkBase, self).__init__()
self.__reset()
# Box2D Initialization
self.world = b2.b2World(gravity=(0, -10), doSleep=True)
self.world.contactListener = self
self.t_steps, self.t_draws = [], []
def __del__(self):
pass
def Step(self, settings, action=None):
"""
The main physics step.
Takes care of physics drawing
(callbacks are executed after the world.Step() )
and drawing additional information.
"""
assert action is None,\
'action should only be used in subclass'
self.stepCount += 1
# Don't do anything if the setting's Hz are <= 0
if settings.hz > 0.0:
timeStep = 1.0 / settings.hz
else:
timeStep = 0.0
# Set the flags based on what the settings show
if self.renderer:
self.renderer.flags = dict(
drawShapes=settings.drawShapes,
drawJoints=settings.drawJoints,
drawAABBs=settings.drawAABBs,
drawPairs=settings.drawPairs,
drawCOMs=settings.drawCOMs,
convertVertices=isinstance(self.renderer, b2.b2DrawExtended)
)
# Set the other settings that aren't contained in the flags
self.world.warmStarting = settings.enableWarmStarting
self.world.continuousPhysics = settings.enableContinuous
self.world.subStepping = settings.enableSubStepping
# Reset the collision points
self.points = []
# Tell Box2D to step
t_step = time()
self.world.Step(timeStep, settings.velocityIterations,
settings.positionIterations)
t_step = time()-t_step
# Update the debug draw settings so that the vertices will be properly
# converted to screen coordinates
t_draw = time()
if self.renderer:
self.renderer.StartDraw()
self.world.DrawDebugData()
if self.renderer:
# Draw each of the contact points in different colors.
if self.settings.drawContactPoints:
for point in self.points:
if point['state'] == b2.b2_addState:
self.renderer.DrawPoint(self.renderer.to_screen(
point['position']), settings.pointSize,
self.colors['contact_add'])
elif point['state'] == b2.b2_persistState:
self.renderer.DrawPoint(self.renderer.to_screen(
point['position']), settings.pointSize,
self.colors['contact_persist'])
if settings.drawContactNormals:
for point in self.points:
p1 = self.renderer.to_screen(point['position'])
p2 = self.renderer.axisScale * point['normal'] + p1
self.renderer.DrawSegment(p1, p2,
self.colors['contact_normal'])
self.renderer.EndDraw()
t_draw = time()-t_draw
t_draw = max(b2.b2_epsilon, t_draw)
t_step = max(b2.b2_epsilon, t_step)
self.t_draws.append(1.0/t_draw)
self.t_steps.append(1.0/t_step)
def SimulationLoop(self, action):
"""
The main simulation loop. Don't override this, override Step instead.
"""
# Reset the text line to start the text from the top
self.textLine = self.TEXTLINE_START
# Draw the name of the test running
self.Print(self.name, (127, 127, 255))
if self.description:
# Draw the name of the test running
for s in self.description.split('\n'):
self.Print(s, (127, 255, 127))
self.Step(self.settings, action)
def PreSolve(self, contact, old_manifold):
"""
This is a critical function when there are many contacts in the world.
It should be optimized as much as possible.
"""
if not (self.settings.drawContactPoints or
self.settings.drawContactNormals or self.using_contacts):
return
elif len(self.points) > self.settings.maxContactPoints:
return
manifold = contact.manifold
if manifold.pointCount == 0:
return
_, state2 = b2.b2GetPointStates(old_manifold, manifold)
if not state2:
return
worldManifold = contact.worldManifold
for i, _ in enumerate(state2):
self.points.append(
{
'fixtureA' : contact.fixtureA,
'fixtureB' : contact.fixtureB,
'position' : worldManifold.points[i],
'normal' : worldManifold.normal,
'state' : state2[i]
})
#print(fwSettings.backend.lower()) # pygame
#print('gps.agent.box2d.'+'%s_framework' % (fwSettings.backend.lower())) # gps.agent.box2d.pygame_framework
#print('%sFramework'%fwSettings.backend.capitalize()) # PygameFramework
#print('%sFramework' %fwSettings.backend.capitalize()) # PygameFramework
#framework_module = __import__('gps.agent.box2d.'+'%s_framework' %
# (fwSettings.backend.lower()),
# fromlist=['%sFramework'
# %fwSettings.backend.capitalize()])
#Framework = getattr(framework_module, '%sFramework' %
# fwSettings.backend.capitalize())
In [27]:
import Box2D as b2
import pygame
GUIEnabled = False
class PygameDraw(b2.b2DrawExtended):
"""
This debug draw class accepts callbacks from Box2D and
handles all of the rendering.
"""
surface = None
axisScale = 10.0
def __init__(self, test=None, **kwargs):
b2.b2DrawExtended.__init__(self, **kwargs)
self.flipX = False
self.flipY = True
self.convertVertices = True
self.test = test
def StartDraw(self):
"""
Called by renderer before drawing.
"""
self.zoom = self.test.viewZoom
self.center = self.test.viewCenter
self.offset = self.test.viewOffset
self.screenSize = self.test.screenSize
def EndDraw(self):
"""
Called by renderer when finished drawing.
"""
pass
def DrawPoint(self, p, size, color):
"""
Draw a single point at point p given a pixel size and color.
"""
self.DrawCircle(p, size/self.zoom, color, drawwidth=0)
def DrawAABB(self, aabb, color):
"""
Draw a wireframe around the AABB with the given color.
"""
points = [(aabb.lowerBound.x, aabb.lowerBound.y),
(aabb.upperBound.x, aabb.lowerBound.y),
(aabb.upperBound.x, aabb.upperBound.y),
(aabb.lowerBound.x, aabb.upperBound.y)]
pygame.draw.aalines(self.surface, color, True, points)
def DrawSegment(self, p1, p2, color):
"""
Draw the line segment from p1-p2 with the specified color.
"""
pygame.draw.aaline(self.surface, color.bytes, p1, p2)
def DrawTransform(self, xf):
"""
Draw the transform xf on the screen
"""
p1 = xf.position
p2 = self.to_screen(p1 + self.axisScale * xf.R.col1)
p3 = self.to_screen(p1 + self.axisScale * xf.R.col2)
p1 = self.to_screen(p1)
pygame.draw.aaline(self.surface, (255, 0, 0), p1, p2)
pygame.draw.aaline(self.surface, (0, 255, 0), p1, p3)
def DrawCircle(self, center, radius, color, drawwidth=1):
"""
Draw a wireframe circle given the center, radius, and color.
"""
radius *= self.zoom
if radius < 1:
radius = 1
else: radius = int(radius)
pygame.draw.circle(self.surface, color.bytes, center, radius, drawwidth)
def DrawSolidCircle(self, center, radius, axis, color):
"""
Draw a solid circle given the center, radius, and color.
"""
radius *= self.zoom
if radius < 1:
radius = 1
else: radius = int(radius)
pygame.draw.circle(self.surface, (color/2).bytes+[127],
center, radius, 0)
pygame.draw.circle(self.surface, color.bytes, center, radius, 1)
pygame.draw.aaline(self.surface, (255, 0, 0), center,
(center[0] - radius*axis[0], center[1] +
radius*axis[1]))
def DrawPolygon(self, vertices, color):
"""
Draw a wireframe polygon given the screen vertices with the given color.
"""
if not vertices:
return
if len(vertices) == 2:
pygame.draw.aaline(self.surface, color.bytes, vertices[0], vertices)
else:
pygame.draw.polygon(self.surface, color.bytes, vertices, 1)
def DrawSolidPolygon(self, vertices, color):
"""
Draw a filled polygon given the screen vertices with the given color.
"""
if not vertices:
return
if len(vertices) == 2:
pygame.draw.aaline(self.surface, color.bytes, vertices[0],
vertices[1])
else:
pygame.draw.polygon(self.surface, (color/2).bytes+[127],
vertices, 0)
pygame.draw.polygon(self.surface, color.bytes, vertices, 1)
class PygameFramework(FrameworkBase):
"""
This class is the framework for running the simulation
"""
def __reset(self):
# Screen/rendering-related
self._viewZoom = 10.0
self._viewCenter = None
self._viewOffset = None
self.screenSize = None
self.rMouseDown = False
self.textLine = 30
self.font = None
self.fps = 0
# GUI-related (PGU)
self.gui_app = None
self.gui_table = None
def __init__(self):
super(PygameFramework, self).__init__()
self.__reset()
print('Initializing pygame framework...')
# Pygame Initialization
pygame.init()
caption = "Python Box2D Testbed - " + self.name
pygame.display.set_caption(caption)
self.screen = pygame.display.set_mode((640, 480))
self.screenSize = b2.b2Vec2(*self.screen.get_size())
self.renderer = PygameDraw(surface=self.screen, test=self)
self.world.renderer = self.renderer
self.font = pygame.font.Font(None, 15)
self.viewCenter = (0, 20.0)
self.groundbody = self.world.CreateBody()
def setCenter(self, value):
"""
Updates the view offset based on the center of the screen.
"""
self._viewCenter = b2.b2Vec2(*value)
self._viewCenter *= self._viewZoom
self._viewOffset = self._viewCenter - self.screenSize/2
def setZoom(self, zoom):
"""
Tells the display the zoom.
"""
self._viewZoom = zoom
viewZoom = property(lambda self: self._viewZoom, setZoom,
doc='Zoom factor for the display')
viewCenter = property(lambda self: self._viewCenter/self._viewZoom,
setCenter, doc='Screen center in camera coordinates')
viewOffset = property(lambda self: self._viewOffset,
doc='Offset of the top-left corner of the screen')
def run(self):
"""
Begins the draw loopn and tells the GUI to paint itself.
"""
# If any of the test constructors update the settings, reflect
# those changes on the GUI before running
if GUIEnabled:
self.gui_table.updateGUI(self.settings)
self.clock = pygame.time.Clock()
self.screen.fill((0, 0, 0))
# Run the simulation loop
self.SimulationLoop([0, 0, 0])
if GUIEnabled and self.settings.drawMenu:
self.gui_app.paint(self.screen)
pygame.display.flip()
self.clock.tick(self.settings.hz)
self.fps = self.clock.get_fps()
def run_next(self, action):
"""
Updates the screen and tells the GUI to paint itself.
"""
self.screen.fill((0, 0, 0))
# Run the simulation loop
self.SimulationLoop(action)
if GUIEnabled and self.settings.drawMenu:
self.gui_app.paint(self.screen)
pygame.display.flip()
self.clock.tick(self.settings.hz)
self.fps = self.clock.get_fps()
def Step(self, settings):
"""
Updates the simulation
"""
if GUIEnabled:
self.gui_table.updateSettings(self.settings)
super(PygameFramework, self).Step(settings)
if GUIEnabled:
self.gui_table.updateGUI(self.settings)
def ConvertScreenToWorld(self, x, y):
"""
Converts the display screen to the simulation's coordinates.
"""
return b2.b2Vec2((x + self.viewOffset.x) / self.viewZoom,
((self.screenSize.y - y + self.viewOffset.y)
/ self.viewZoom))
def DrawStringAt(self, x, y, s, color=(229, 153, 153, 255)):
"""
Draw some text, str, at screen coordinates (x, y).
"""
self.screen.blit(self.font.render(s, True, color), (x, y))
def Print(self, s, color=(229, 153, 153, 255)):
"""
Draw some text at the top status lines
and advance to the next line.
"""
self.screen.blit(self.font.render(s, True, color), (5, self.textLine))
self.textLine += 15
In [28]:
import Box2D as b2
#from framework import Framework
from gps_pb2 import JOINT_ANGLES, JOINT_VELOCITIES, END_EFFECTOR_POINTS
#class ArmWorld(Framework):
class ArmWorld(PygameFramework):
""" This class defines the 2 Link Arm and its environment."""
name = "2 Link Arm"
def __init__(self, x0, target, render):
self.render = render
if self.render:
super(ArmWorld, self).__init__()
else:
self.world = b2.b2World(gravity=(0, -10), doSleep=True)
self.world.gravity = (0.0, 0.0)
fixture_length = 5
self.x0 = x0
rectangle_fixture = b2.b2FixtureDef(
shape=b2.b2PolygonShape(box=(.5, fixture_length)),
density=.5,
friction=1,
)
square_fixture = b2.b2FixtureDef(
shape=b2.b2PolygonShape(box=(1, 1)),
density=100.0,
friction=1,
)
self.base = self.world.CreateBody(
position=(0, 15),
fixtures=square_fixture,
)
self.body1 = self.world.CreateDynamicBody(
position=(0, 2),
fixtures=rectangle_fixture,
angle=b2.b2_pi,
)
self.body2 = self.world.CreateDynamicBody(
fixtures=rectangle_fixture,
position=(0, 2),
angle=b2.b2_pi,
)
self.target1 = self.world.CreateDynamicBody(
fixtures=rectangle_fixture,
position=(0, 0),
angle=b2.b2_pi,
)
self.target2 = self.world.CreateDynamicBody(
fixtures=rectangle_fixture,
position=(0, 0),
angle=b2.b2_pi,
)
self.joint1 = self.world.CreateRevoluteJoint(
bodyA=self.base,
bodyB=self.body1,
localAnchorA=(0, 0),
localAnchorB=(0, fixture_length),
enableMotor=True,
maxMotorTorque=400,
enableLimit=False,
)
self.joint2 = self.world.CreateRevoluteJoint(
bodyA=self.body1,
bodyB=self.body2,
localAnchorA=(0, -(fixture_length - 0.5)),
localAnchorB=(0, fixture_length - 0.5),
enableMotor=True,
maxMotorTorque=400,
enableLimit=False,
)
self.set_joint_angles(self.body1, self.body2, x0[0], x0[1])
self.set_joint_angles(self.target1, self.target2, target[0], target[1])
self.target1.active = False
self.target2.active = False
self.joint1.motorSpeed = x0[2]
self.joint2.motorSpeed = x0[3]
def set_joint_angles(self, body1, body2, angle1, angle2):
""" Converts the given absolute angle of the arms to joint angles"""
pos = self.base.GetWorldPoint((0, 0))
body1.angle = angle1 + np.pi
new_pos = body1.GetWorldPoint((0, 5))
body1.position += pos - new_pos
body2.angle = angle2 + body1.angle
pos = body1.GetWorldPoint((0, -4.5))
new_pos = body2.GetWorldPoint((0, 4.5))
body2.position += pos - new_pos
def run(self):
"""Initiates the first time step
"""
if self.render:
super(ArmWorld, self).run()
else:
self.run_next(None)
def run_next(self, action):
"""Moves forward in time one step. Calls the renderer if applicable."""
if self.render:
super(ArmWorld, self).run_next(action)
else:
if action is not None:
self.joint1.motorSpeed = action[0]
self.joint2.motorSpeed = action[1]
self.world.Step(1.0 / fwSettings.hz, fwSettings.velocityIterations,
fwSettings.positionIterations)
def Step(self, settings, action):
"""Moves forward in time one step. Called by the renderer"""
self.joint1.motorSpeed = action[0]
self.joint2.motorSpeed = action[1]
super(ArmWorld, self).Step(settings)
def reset_world(self):
"""Returns the world to its intial state"""
self.world.ClearForces()
self.joint1.motorSpeed = 0
self.joint2.motorSpeed = 0
self.body1.linearVelocity = (0, 0)
self.body1.angularVelocity = 0
self.body2.linearVelocity = (0, 0)
self.body2.angularVelocity = 0
self.set_joint_angles(self.body1, self.body2, self.x0[0], self.x0[1])
def get_state(self):
"""Retrieves the state of the point mass"""
state = {JOINT_ANGLES: np.array([self.joint1.angle,
self.joint2.angle]),
JOINT_VELOCITIES: np.array([self.joint1.speed,
self.joint2.speed]),
END_EFFECTOR_POINTS: np.append(np.array(self.body2.position),[0])}
return state
In [29]:
def guess_dynamics(gains, acc, dX, dU, dt):
"""
Initial guess at the model using position-velocity assumption.
Note: This code assumes joint positions occupy the first dU state
indices and joint velocities occupy the next dU.
Args:
gains: dU dimensional joint gains.
acc: dU dimensional joint acceleration.
dX: Dimensionality of the state.
dU: Dimensionality of the action.
dt: Length of a time step.
Returns:
Fd: A dX by dX+dU transition matrix.
fc: A dX bias vector.
"""
#TODO: Use packing instead of assuming which indices are the joint
# angles.
Fd = np.vstack([
np.hstack([
np.eye(dU), dt * np.eye(dU), np.zeros((dU, dX - dU*2)),
dt ** 2 * np.diag(gains)
]),
np.hstack([
np.zeros((dU, dU)), np.eye(dU), np.zeros((dU, dX - dU*2)),
dt * np.diag(gains)
]),
np.zeros((dX - dU*2, dX+dU))
])
fc = np.hstack([acc * dt ** 2, acc * dt, np.zeros((dX - dU*2))])
return Fd, fc
In [30]:
class Policy(object):
""" Computes actions from states/observations. """
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def act(self, x, obs, t, noise):
"""
Args:
x: State vector.
obs: Observation vector.
t: Time step.
noise: A dU-dimensional noise vector.
Returns:
A dU dimensional action vector.
"""
raise NotImplementedError("Must be implemented in subclass.")
def set_meta_data(self, meta):
"""
Set meta data for policy (e.g., domain image, multi modal observation sizes)
Args:
meta: meta data.
"""
return
In [31]:
class LinearGaussianPolicy(Policy):
"""
Time-varying linear Gaussian policy.
U = K*x + k + noise, where noise ~ N(0, chol_pol_covar)
"""
def __init__(self, K, k, pol_covar, chol_pol_covar, inv_pol_covar):
Policy.__init__(self)
# Assume K has the correct shape, and make sure others match.
self.T = K.shape[0]
self.dU = K.shape[1]
self.dX = K.shape[2]
check_shape(k, (self.T, self.dU))
check_shape(pol_covar, (self.T, self.dU, self.dU))
check_shape(chol_pol_covar, (self.T, self.dU, self.dU))
check_shape(inv_pol_covar, (self.T, self.dU, self.dU))
self.K = K
self.k = k
self.pol_covar = pol_covar
self.chol_pol_covar = chol_pol_covar
self.inv_pol_covar = inv_pol_covar
def act(self, x, obs, t, noise=None):
"""
Return an action for a state.
Args:
x: State vector.
obs: Observation vector.
t: Time step.
noise: Action noise. This will be scaled by the variance.
"""
u = self.K[t].dot(x) + self.k[t]
u += self.chol_pol_covar[t].T.dot(noise)
return u
def fold_k(self, noise):
"""
Fold noise into k.
Args:
noise: A T x Du noise vector with mean 0 and variance 1.
Returns:
k: A T x dU bias vector.
"""
k = np.zeros_like(self.k)
for i in range(self.T):
scaled_noise = self.chol_pol_covar[i].T.dot(noise[i])
k[i] = scaled_noise + self.k[i]
return k
def nans_like(self):
"""
Returns:
A new linear Gaussian policy object with the same dimensions
but all values filled with NaNs.
"""
policy = LinearGaussianPolicy(
np.zeros_like(self.K), np.zeros_like(self.k),
np.zeros_like(self.pol_covar), np.zeros_like(self.chol_pol_covar),
np.zeros_like(self.inv_pol_covar)
)
policy.K.fill(np.nan)
policy.k.fill(np.nan)
policy.pol_covar.fill(np.nan)
policy.chol_pol_covar.fill(np.nan)
policy.inv_pol_covar.fill(np.nan)
return policy
In [32]:
# Initial Linear Gaussian Trajectory Distributions, PD-based initializer.
# Note, PD is the default initializer type.
INIT_LG_PD = {
'init_var': 10.0,
'pos_gains': 10.0, # position gains
'vel_gains_mult': 0.01, # velocity gains multiplier on pos_gains
'init_action_offset': None,
}
# Initial Linear Gaussian Trajectory distribution, LQR-based initializer.
INIT_LG_LQR = {
'init_var': 1.0,
'stiffness': 1.0,
'stiffness_vel': 0.5,
'final_weight': 1.0,
# Parameters for guessing dynamics
'init_acc': [], # dU vector of accelerations, default zeros.
'init_gains': [], # dU vector of gains, default ones.
}
def init_lqr(hyperparams):
"""
Return initial gains for a time-varying linear Gaussian controller
that tries to hold the initial position.
"""
config = copy.deepcopy(INIT_LG_LQR)
config.update(hyperparams)
x0, dX, dU = config['x0'], config['dX'], config['dU']
dt, T = config['dt'], config['T']
#TODO: Use packing instead of assuming which indices are the joint
# angles.
# Notation notes:
# L = loss, Q = q-function (dX+dU dimensional),
# V = value function (dX dimensional), F = dynamics
# Vectors are lower-case, matrices are upper case.
# Derivatives: x = state, u = action, t = state+action (trajectory).
# The time index is denoted by _t after the above.
# Ex. Ltt_t = Loss, 2nd derivative (w.r.t. trajectory),
# indexed by time t.
# Constants.
idx_x = slice(dX) # Slices out state.
idx_u = slice(dX, dX+dU) # Slices out actions.
if len(config['init_acc']) == 0:
config['init_acc'] = np.zeros(dU)
if len(config['init_gains']) == 0:
config['init_gains'] = np.ones(dU)
# Set up simple linear dynamics model.
Fd, fc = guess_dynamics(config['init_gains'], config['init_acc'],
dX, dU, dt)
# Setup a cost function based on stiffness.
# Ltt = (dX+dU) by (dX+dU) - Hessian of loss with respect to
# trajectory at a single timestep.
Ltt = np.diag(np.hstack([
config['stiffness'] * np.ones(dU),
config['stiffness'] * config['stiffness_vel'] * np.ones(dU),
np.zeros(dX - dU*2), np.ones(dU)
]))
Ltt = Ltt / config['init_var'] # Cost function - quadratic term.
lt = -Ltt.dot(np.r_[x0, np.zeros(dU)]) # Cost function - linear term.
# Perform dynamic programming.
K = np.zeros((T, dU, dX)) # Controller gains matrix.
k = np.zeros((T, dU)) # Controller bias term.
PSig = np.zeros((T, dU, dU)) # Covariance of noise.
cholPSig = np.zeros((T, dU, dU)) # Cholesky decomposition.
invPSig = np.zeros((T, dU, dU)) # Inverse of covariance.
vx_t = np.zeros(dX) # Vx = dV/dX. Derivative of value function.
Vxx_t = np.zeros((dX, dX)) # Vxx = ddV/dXdX.
#TODO: A lot of this code is repeated with traj_opt_lqr_python.py
# backward pass.
for t in range(T - 1, -1, -1):
# Compute Q function at this step.
if t == (T - 1):
Ltt_t = config['final_weight'] * Ltt
lt_t = config['final_weight'] * lt
else:
Ltt_t = Ltt
lt_t = lt
# Qtt = (dX+dU) by (dX+dU) 2nd Derivative of Q-function with
# respect to trajectory (dX+dU).
Qtt_t = Ltt_t + Fd.T.dot(Vxx_t).dot(Fd)
# Qt = (dX+dU) 1st Derivative of Q-function with respect to
# trajectory (dX+dU).
qt_t = lt_t + Fd.T.dot(vx_t + Vxx_t.dot(fc))
# Compute preceding value function.
U = sp.linalg.cholesky(Qtt_t[idx_u, idx_u])
L = U.T
invPSig[t, :, :] = Qtt_t[idx_u, idx_u]
PSig[t, :, :] = sp.linalg.solve_triangular(
U, sp.linalg.solve_triangular(L, np.eye(dU), lower=True)
)
cholPSig[t, :, :] = sp.linalg.cholesky(PSig[t, :, :])
K[t, :, :] = -sp.linalg.solve_triangular(
U, sp.linalg.solve_triangular(L, Qtt_t[idx_u, idx_x], lower=True)
)
k[t, :] = -sp.linalg.solve_triangular(
U, sp.linalg.solve_triangular(L, qt_t[idx_u], lower=True)
)
Vxx_t = Qtt_t[idx_x, idx_x] + Qtt_t[idx_x, idx_u].dot(K[t, :, :])
vx_t = qt_t[idx_x] + Qtt_t[idx_x, idx_u].dot(k[t, :])
Vxx_t = 0.5 * (Vxx_t + Vxx_t.T)
return LinearGaussianPolicy(K, k, PSig, cholPSig, invPSig)
#TODO: Fix docstring
def init_pd(hyperparams):
"""
This function initializes the linear-Gaussian controller as a
proportional-derivative (PD) controller with Gaussian noise. The
position gains are controlled by the variable pos_gains, velocity
gains are controlled by pos_gains*vel_gans_mult.
"""
config = copy.deepcopy(INIT_LG_PD)
config.update(hyperparams)
dU, dQ, dX = config['dU'], config['dQ'], config['dX']
x0, T = config['x0'], config['T']
# Choose initialization mode.
Kp = 1.0
Kv = config['vel_gains_mult']
if dU < dQ:
K = -config['pos_gains'] * np.tile(
[np.eye(dU) * Kp, np.zeros((dU, dQ-dU)),
np.eye(dU) * Kv, np.zeros((dU, dQ-dU))],
[T, 1, 1]
)
else:
K = -config['pos_gains'] * np.tile(
np.hstack([
np.eye(dU) * Kp, np.eye(dU) * Kv,
np.zeros((dU, dX - dU*2))
]), [T, 1, 1]
)
k = np.tile(-K[0, :, :].dot(x0), [T, 1])
PSig = config['init_var'] * np.tile(np.eye(dU), [T, 1, 1])
cholPSig = np.sqrt(config['init_var']) * np.tile(np.eye(dU), [T, 1, 1])
invPSig = (1.0 / config['init_var']) * np.tile(np.eye(dU), [T, 1, 1])
return LinearGaussianPolicy(K, k, PSig, cholPSig, invPSig)
In [33]:
import matplotlib as mpl
# mpl.use('Qt4Agg')
qt_found=False
try:
import PyQt4
mpl.use('Qt4Agg')
qt_found = True
except ImportError:
qt_found = False
if qt_found == False:
try:
import PyQt5
mpl.use('Qt5Agg')
qt_found = True
except ImportError:
qt_found = False
import logging
import imp
import os
import os.path
import sys
import argparse
import threading
import time
import traceback
# Add gps/python to path so that imports work.
#sys.path.append('/'.join(str.split(__file__, '/')[:-2]))
from gps.gui.gps_training_gui import GPSTrainingGUI
#from gps.utility.data_logger import DataLogger
#from gps.sample.sample_list import SampleList
class GPSMain(object):
""" Main class to run algorithms and experiments. """
def __init__(self, config, quit_on_end=False):
"""
Initialize GPSMain
Args:
config: Hyperparameters for experiment
quit_on_end: When true, quit automatically on completion
"""
self._quit_on_end = quit_on_end
self._hyperparams = config
self._conditions = config['common']['conditions']
if 'test_conditions' in config['common']:
self._train_idx = config['common']['train_conditions']
self._test_idx = config['common']['test_conditions']
else:
self._train_idx = range(self._conditions)
config['common']['train_conditions'] = config['common']['conditions']
self._hyperparams=config
self._test_idx = self._train_idx
self._data_files_dir = config['common']['data_files_dir']
self.agent = config['agent']['type'](config['agent'])
self.data_logger = DataLogger()
self.gui = GPSTrainingGUI(config['common']) if config['gui_on'] else None
config['algorithm']['agent'] = self.agent
self.algorithm = config['algorithm']['type'](config['algorithm'])
def run(self, itr_load=None):
"""
Run training by iteratively sampling and taking an iteration.
Args:
itr_load: If specified, loads algorithm state from that
iteration, and resumes training at the next iteration.
Returns: None
"""
try:
itr_start = self._initialize(itr_load)
for itr in range(itr_start, self._hyperparams['iterations']):
for cond in self._train_idx:
for i in range(self._hyperparams['num_samples']):
self._take_sample(itr, cond, i)
traj_sample_lists = [
self.agent.get_samples(cond, -self._hyperparams['num_samples'])
for cond in self._train_idx
]
# Clear agent samples.
self.agent.clear_samples()
self._take_iteration(itr, traj_sample_lists)
pol_sample_lists = self._take_policy_samples()
self._log_data(itr, traj_sample_lists, pol_sample_lists)
except Exception as e:
traceback.print_exception(*sys.exc_info())
finally:
self._end()
def test_policy(self, itr, N):
"""
Take N policy samples of the algorithm state at iteration itr,
for testing the policy to see how it is behaving.
(Called directly from the command line --policy flag).
Args:
itr: the iteration from which to take policy samples
N: the number of policy samples to take
Returns: None
"""
algorithm_file = self._data_files_dir + 'algorithm_itr_%02d.pkl' % itr
self.algorithm = self.data_logger.unpickle(algorithm_file)
if self.algorithm is None:
print("Error: cannot find '%s.'" % algorithm_file)
os._exit(1) # called instead of sys.exit(), since t
traj_sample_lists = self.data_logger.unpickle(self._data_files_dir +
('traj_sample_itr_%02d.pkl' % itr))
pol_sample_lists = self._take_policy_samples(N)
self.data_logger.pickle(
self._data_files_dir + ('pol_sample_itr_%02d.pkl' % itr),
copy.copy(pol_sample_lists)
)
if self.gui:
self.gui.update(itr, self.algorithm, self.agent,
traj_sample_lists, pol_sample_lists)
self.gui.set_status_text(('Took %d policy sample(s) from ' +
'algorithm state at iteration %d.\n' +
'Saved to: data_files/pol_sample_itr_%02d.pkl.\n') % (N, itr, itr))
def _initialize(self, itr_load):
"""
Initialize from the specified iteration.
Args:
itr_load: If specified, loads algorithm state from that
iteration, and resumes training at the next iteration.
Returns:
itr_start: Iteration to start from.
"""
if itr_load is None:
if self.gui:
self.gui.set_status_text('Press \'go\' to begin.')
return 0
else:
algorithm_file = self._data_files_dir + 'algorithm_itr_%02d.pkl' % itr_load
self.algorithm = self.data_logger.unpickle(algorithm_file)
if self.algorithm is None:
print("Error: cannot find '%s.'" % algorithm_file)
os._exit(1) # called instead of sys.exit(), since this is in a thread
if self.gui:
traj_sample_lists = self.data_logger.unpickle(self._data_files_dir +
('traj_sample_itr_%02d.pkl' % itr_load))
if self.algorithm.cur[0].pol_info:
pol_sample_lists = self.data_logger.unpickle(self._data_files_dir +
('pol_sample_itr_%02d.pkl' % itr_load))
else:
pol_sample_lists = None
self.gui.set_status_text(
('Resuming training from algorithm state at iteration %d.\n' +
'Press \'go\' to begin.') % itr_load)
return itr_load + 1
def _take_sample(self, itr, cond, i):
"""
Collect a sample from the agent.
Args:
itr: Iteration number.
cond: Condition number.
i: Sample number.
Returns: None
"""
if self.algorithm._hyperparams['sample_on_policy'] \
and self.algorithm.iteration_count > 0:
pol = self.algorithm.policy_opt.policy
else:
pol = self.algorithm.cur[cond].traj_distr
if self.gui:
self.gui.set_image_overlays(cond) # Must call for each new cond.
redo = True
while redo:
while self.gui.mode in ('wait', 'request', 'process'):
if self.gui.mode in ('wait', 'process'):
time.sleep(0.01)
continue
# 'request' mode.
if self.gui.request == 'reset':
try:
self.agent.reset(cond)
except NotImplementedError:
self.gui.err_msg = 'Agent reset unimplemented.'
elif self.gui.request == 'fail':
self.gui.err_msg = 'Cannot fail before sampling.'
self.gui.process_mode() # Complete request.
self.gui.set_status_text(
'Sampling: iteration %d, condition %d, sample %d.' %
(itr, cond, i)
)
self.agent.sample(
pol, cond,
verbose=(i < self._hyperparams['verbose_trials'])
)
if self.gui.mode == 'request' and self.gui.request == 'fail':
redo = True
self.gui.process_mode()
self.agent.delete_last_sample(cond)
else:
redo = False
else:
self.agent.sample(
pol, cond,
verbose=(i < self._hyperparams['verbose_trials'])
)
def _take_iteration(self, itr, sample_lists):
"""
Take an iteration of the algorithm.
Args:
itr: Iteration number.
Returns: None
"""
if self.gui:
self.gui.set_status_text('Calculating.')
self.gui.start_display_calculating()
self.algorithm.iteration(sample_lists)
if self.gui:
self.gui.stop_display_calculating()
def _take_policy_samples(self, N=None):
"""
Take samples from the policy to see how it's doing.
Args:
N : number of policy samples to take per condition
Returns: None
"""
if 'verbose_policy_trials' not in self._hyperparams:
# AlgorithmTrajOpt
return None
verbose = self._hyperparams['verbose_policy_trials']
if self.gui:
self.gui.set_status_text('Taking policy samples.')
pol_samples = [[None] for _ in range(len(self._test_idx))]
# Since this isn't noisy, just take one sample.
# TODO: Make this noisy? Add hyperparam?
# TODO: Take at all conditions for GUI?
for cond in range(len(self._test_idx)):
pol_samples[cond][0] = self.agent.sample(
self.algorithm.policy_opt.policy, self._test_idx[cond],
verbose=verbose, save=False, noisy=False)
return [SampleList(samples) for samples in pol_samples]
def _log_data(self, itr, traj_sample_lists, pol_sample_lists=None):
"""
Log data and algorithm, and update the GUI.
Args:
itr: Iteration number.
traj_sample_lists: trajectory samples as SampleList object
pol_sample_lists: policy samples as SampleList object
Returns: None
"""
if self.gui:
self.gui.set_status_text('Logging data and updating GUI.')
self.gui.update(itr, self.algorithm, self.agent,
traj_sample_lists, pol_sample_lists)
self.gui.save_figure(
self._data_files_dir + ('figure_itr_%02d.png' % itr)
)
if 'no_sample_logging' in self._hyperparams['common']:
return
self.data_logger.pickle(
self._data_files_dir + ('algorithm_itr_%02d.pkl' % itr),
copy.copy(self.algorithm)
)
self.data_logger.pickle(
self._data_files_dir + ('traj_sample_itr_%02d.pkl' % itr),
copy.copy(traj_sample_lists)
)
if pol_sample_lists:
self.data_logger.pickle(
self._data_files_dir + ('pol_sample_itr_%02d.pkl' % itr),
copy.copy(pol_sample_lists)
)
def _end(self):
""" Finish running and exit. """
if self.gui:
self.gui.set_status_text('Training complete.')
self.gui.end_mode()
if self._quit_on_end:
# Quit automatically (for running sequential expts)
os._exit(1)
In [34]:
from gps_pb2 import JOINT_ANGLES, JOINT_VELOCITIES, END_EFFECTOR_POINTS, ACTION
from datetime import datetime
import os
SENSOR_DIMS = {
JOINT_ANGLES: 2,
JOINT_VELOCITIES: 2,
END_EFFECTOR_POINTS: 3,
ACTION: 2
}
#BASE_DIR = '/'.join(str.split(gps_filepath, '/')[:-2])
BASE_DIR = '.'
#EXP_DIR = BASE_DIR + '/../experiments/box2d_arm_example/'
EXP_DIR = BASE_DIR + 'experiments/'
common = {
'experiment_name': 'box2d_arm_example' + '_' + \
datetime.strftime(datetime.now(), '%m-%d-%y_%H-%M'),
'experiment_dir': EXP_DIR,
'data_files_dir': EXP_DIR + 'data_files/',
'log_filename': EXP_DIR + 'log.txt',
'conditions': 1,
}
if not os.path.exists(common['data_files_dir']):
os.makedirs(common['data_files_dir'])
agent = {
'type': AgentBox2D,
'target_state' : np.array([0, 0]),
'world' : ArmWorld,
'render' : False,
'x0': np.array([0.75*np.pi, 0.5*np.pi, 0, 0, 0, 0, 0]),
'rk': 0,
'dt': 0.05,
'substeps': 1,
'conditions': common['conditions'],
'pos_body_idx': np.array([]),
'pos_body_offset': np.array([]),
'T': 100,
'sensor_dims': SENSOR_DIMS,
'state_include': [JOINT_ANGLES, JOINT_VELOCITIES, END_EFFECTOR_POINTS],
'obs_include': [],
}
algorithm = {
'type': AlgorithmTrajOpt,
'conditions': common['conditions'],
}
algorithm['init_traj_distr'] = {
'type': init_lqr,
'init_gains': np.zeros(SENSOR_DIMS[ACTION]),
'init_acc': np.zeros(SENSOR_DIMS[ACTION]),
'init_var': 0.1,
'stiffness': 0.01,
'dt': agent['dt'],
'T': agent['T'],
}
action_cost = {
'type': CostAction,
'wu': np.array([1, 1])
}
state_cost = {
'type': CostState,
'data_types' : {
JOINT_ANGLES: {
'wp': np.array([1, 1]),
'target_state': agent["target_state"],
},
},
}
algorithm['cost'] = {
'type': CostSum,
'costs': [action_cost, state_cost],
'weights': [1e-5, 1.0],
}
algorithm['dynamics'] = {
'type': DynamicsLRPrior,
'regularization': 1e-6,
'prior': {
'type': DynamicsPriorGMM,
'max_clusters': 20,
'min_samples_per_cluster': 40,
'max_samples': 20,
},
}
algorithm['traj_opt'] = {
'type': TrajOptLQRPython,
}
algorithm['policy_opt'] = {}
config_gps = {
'iterations': 10,
'num_samples': 5,
'verbose_trials': 5,
'common': common,
'agent': agent,
'gui_on': True,
'algorithm': algorithm,
}
common['info'] = generate_experiment_info(config_gps)
In [35]:
exp_name = "box2d_arm_example"
import random
import numpy as np
import matplotlib.pyplot as plt
#print(config)
#print("------------------------------")
#print(config_gps)
seed = config_gps.get('random_seed', 0)
random.seed(seed)
np.random.seed(seed)
gps = GPSMain(config_gps)
if config_gps['gui_on']:
run_gps = threading.Thread(
#target=lambda: gps.run(itr_load=resume_training_itr) # no iteration load used at this stage
target=lambda: gps.run()
)
run_gps.daemon = True
run_gps.start()
plt.ioff()
plt.show()
else:
gps.run()
In [ ]: