In [ ]:
%load_ext autoreload
%autoreload 2
In [ ]:
from IPython.core.debugger import Tracer # debugging
from IPython.display import clear_output
import time
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns # prettify matplotlib
from mpl_toolkits.mplot3d import Axes3D # for matplotlib 3D plots
import numpy as np
import sklearn.gaussian_process as gp
import scipy.optimize
from scipy.stats import norm # Gaussian/normal distribution
In [ ]:
# make deterministic
np.random.seed(100)
In [ ]:
#f = lambda x: x * np.cos(x)
#x_min = 0
#x_max = 12
f = lambda x: np.exp(-(x - 2)**2) + np.exp(-(x - 6)**2/10) + 1/ (x**2 + 1)
x_min = -2
x_max = 10
xs = np.linspace(x_min, x_max, 100)
ys = f(xs)
In [ ]:
plt.figure(figsize=(16,8))
plt.plot(xs, ys, 'g-')
plt.margins(0.1, 0.1)
plt.xlabel('x')
plt.ylabel('cost')
plt.show()
In [ ]:
inf = float('inf')
def make2D(arr):
''' convert a numpy array with shape (l,) into an array with shape (l,1)
(np.atleast_2d behaves similarly but would give shape (1,l) instead)
'''
return arr.reshape(-1, 1)
def log_uniform(low, high):
''' sample a random number in the interval [low, high] distributed logarithmically within that space '''
return np.exp(np.random.uniform(np.log(low), np.log(high)))
def range_type(range_):
''' determine whether the range is 'linear', 'logarithmic' or 'arbitrary'
range_: must be numpy array
note: range_ must be sorted either ascending or descending to be detected as linear or logarithmic
'''
if len(range_) == 1:
return 'constant'
# 'i' => integer, 'u' => unsigned integer, 'f' => floating point
elif len(range_) < 2 or range_.dtype.kind not in 'iuf':
return 'arbitrary'
else:
tmp = range_[1:] - range_[:-1] # differences between element i and element i+1
is_lin = np.all(np.isclose(tmp[0], tmp)) # same difference between each element
if is_lin:
return 'linear'
else:
tmp = np.log(range_)
tmp = tmp[1:] - tmp[:-1]
is_log = np.all(np.isclose(tmp[0], tmp))
if is_log:
return 'logarithmic'
else:
return 'arbitrary'
def sample_from(range_):
''' draw a random sample from the given range in an appropriate mannor
ie if the range is linear: sample uniformally, if the range is logarithmic: sample log-uniformally
note: range_ can be in ascending or descending order
'''
assert len(range_) > 0, 'range_ must not be empty'
t = range_type(range_)
if t == 'linear':
return np.random.uniform(range_[0], range_[-1]) # doesn't matter if ascending order or descending
elif t == 'logarithmic':
return log_uniform(range_[0], range_[-1]) # doesn't matter if ascending order or descending
elif t == 'arbitrary' or t == 'constant':
return np.random.choice(range_)
else:
assert False, 'Invalid range type'
class Sample(object):
def __init__(self, config, cost):
self.config = config
self.cost = cost
def __repr__(self):
return '(config={}, cost={})'.format(config_string(self.config), self.cost)
def __iter__(self):
''' so you can write my_config, my_cost = my_sample '''
yield self.config
yield self.cost
def __eq__(self, other):
return self.config == other.config and self.cost == other.cost
In [ ]:
a = np.linspace(0,10, num=11)
t = a[1:] - a[:-1] # differences between element i and element i+1
np.all(np.isclose(t[0], t)) # same difference
b = np.logspace(1,10, num=11, base=3) # base=3, showing that base doesn't matter
t = np.log(b)
t = t[1:] - t[:-1]
np.all(np.isclose(t[0], t))
print(range_type(np.linspace(1, 10, num=12)) == 'linear')
print(range_type(np.linspace(10, 1, num=12)) == 'linear') # can be in descending order
print(range_type(np.array([1,2,3])) == 'linear')
print(range_type(np.array([0,10])) == 'linear') # can be 2 points
print(range_type(np.logspace(1, 10, num=2)) == 'linear') # need at least 3 points to determine logarithmic
print(range_type(np.logspace(1, 10, num=12)) == 'logarithmic')
print(range_type(np.logspace(10, 1, num=12)) == 'logarithmic') # can be in descending order
print(range_type(np.logspace(1, 10, num=12, base=14)) == 'logarithmic') # can be any base
print(range_type(np.array([12, 1, 10])) == 'arbitrary')
print(range_type(np.array([])) == 'arbitrary')
print(range_type(np.array(["hi", "there"])) == 'arbitrary')
print(range_type(np.array(["hi"])) == 'constant')
print(range_type(np.array([1])) == 'constant')
Resources
https://thuijskens.github.io/2016/12/29/bayesian-optimisation/
https://github.com/thuijskens/bayesian-optimization/blob/master/python/gp.py
https://github.com/fmfn/BayesianOptimization/blob/master/examples/visualization.ipynb
https://github.com/fmfn/BayesianOptimization
unique_rows
functionThe acquisition function is maximised using 'L-BFGS-B' which stands for 'Limited-Memory Broyden-Fletcher-Goldfarb-Shanno Bounded' https://en.wikipedia.org/wiki/Limited-memory_BFGS
Expected improvement $EI$ is one possible acquisition function: $$EI(\mathbf x)=\mathbb E\left[max(0,\; f(\mathbf x)-f(\mathbf x^+))\right]$$ where $f$ is the surrogate objective function and $\mathbf x^+=$ the best known configuration so far.
Maximising the expected improvement will result in the next configuration to test ($\mathbf x$) being better ($f(\mathbf x)$ larger) than $\mathbf x^+$ (but note that $f$ is only an approximation to the real objective function). $$\mathbf x_{\mathrm{next}}=\arg\max_{\mathbf x}EI(\mathbf x)$$
If $f$ is a Gaussian Process (which it is in this case) then $EI$ can be calculated analytically:
$$EI(\mathbf x)=\begin{cases} \left(\mu(\mathbf x)-f(\mathbf x^+)\right)\mathbf\Phi(Z) \;+\; \sigma(\mathbf x)\phi(Z) & \text{if } \sigma(\mathbf x)>0\\ 0 & \text{if } \sigma(\mathbf x) = 0 \end{cases}$$$$Z=\frac{\mu(\mathbf x)-f(\mathbf x^+)}{\sigma(\mathbf x)}$$Where
a parameter $\xi$ can be introduced to control the exploitation-exploration trade-off ($\xi=0.01$ works well in almost all cases (Lizotte, 2008))
$$EI(\mathbf x)=\begin{cases} \left(\mu(\mathbf x)-f(\mathbf x^+)-\xi\right)\mathbf\Phi(Z) \;+\; \sigma(\mathbf x)\phi(Z) & \text{if } \sigma(\mathbf x)>0\\ 0 & \text{if } \sigma(\mathbf x) = 0 \end{cases}$$$$Z=\begin{cases} \frac{\mu(\mathbf x)-f(\mathbf x^+)-\xi}{\sigma(\mathbf x)} & \text{if }\sigma(\mathbf x)>0\\ 0 & \text{if }\sigma(\mathbf x) = 0 \end{cases}$$Early on in development I was confused by the minimiser as it wasn't catching the peak of the acquisition function every time and it looked like it was instead settling randomly somewhere along the y=0 line. I no longer think this is the case. I think that the minimiser is a local optimiser and so any peak is fine, although there will usually be a single high peak, that local minimum is very thin and there are other much flatter local optima elsewhere in the parameter space. It is actually a good thing for exploring more of the parameter space that samples from other local optima are chosen occasionally.
In [ ]:
def expected_improvement(xs, gp_model, best_cost, maximising_cost=False, xi=0.01):
''' expected improvement acquisition function
xs: array of configurations to evaluate the GP at
gp_model: the GP fitted to the past configuration
best_cost: the (actual) cost of the best known configuration (either smallest or largest depending on maximising_cost)
maximising_cost: True => higher cost is better, False => lower cost is better
'''
mus, sigmas = gp_model.predict(xs, return_std=True)
sigmas = make2D(sigmas)
sf = 1 if maximising_cost else -1 # scaling factor
diff = sf * (mus - best_cost - xi) # mu(x) - f(x+) - xi
with np.errstate(divide='ignore'):
Zs = diff / sigmas # produces inf where sigmas[i] == 0.0
EIs = diff * norm.cdf(Zs) + sigmas * norm.pdf(Zs)
EIs[sigmas == 0.0] = 0.0 # replace the infs with 0s
return EIs
In [ ]:
pre_samples = 3 # number of samples to obtain randomly before fitting the GP
max_iterations = 1 # number of configurations to test
num_restarts = 10 # number of restarts of the acquisition function optimisation
maximising_cost = True # True => higher cost is better
acquisition_function = expected_improvement
prob_random = 0.1 # probability of ignoring the Bayesian optimisation and choosing randomly for the next sample
'''
gp_params = dict( # Gaussian process parameters
alpha = 1e-10,
kernel = gp.kernels.Matern(length_scale=1),
n_restarts_optimizer = 10,
#normalize_y = True # make the mean 0
)
'''
gp_params = dict( # Gaussian process parameters
#alpha = 1e-5,
#kernel = gp.kernels.RBF(length_scale=1.0, length_scale_bounds="fixed"), # the default kernel
#kernel = gp.kernels.Matern(length_scale=1.0, length_scale_bounds="fixed"),
#n_restarts_optimizer = 10,
alpha = 1e-7,
#kernel = gp.kernels.Matern(nu=2.5),
kernel = gp.kernels.Matern(),
n_restarts_optimizer = 10,
normalize_y = True # make the mean 0 (theoretically a bad thing, see docs, but can help)
)
In [ ]:
def test_config(config):
return f(config['x'])
def config_to_input(config):
'''
given a config dictionary, return a numpy array containing the parameters that should be used as inputs to
the Bayesian optimisation algorithm, ie excluding parameters that are constant
'''
return np.array([[config['x'], config['z']]])
def input_to_config(input_):
'''
given a numpy array of parameter values, return a configuration dict with all parameters present
(ie filling out the constant values)
'''
return {'x' : input_[0], 'z' : input_[1]}
First obtain a few randomly selected samples from the objective function to begin fitting the surrogate function to it.
In [ ]:
def random_sample():
return {'x' : sample_from(xs), 'z' : 1.0}
# samples stored as Sample objects for storage
samples = []
for i in range(pre_samples):
config = random_sample()
samples.append(Sample(config, test_config(config)))
In [ ]:
#TODO: document params to GP
gp_model = gp.GaussianProcessRegressor(**gp_params)
# samples stored as numpy arrays for calculations
sx = np.vstack([config_to_input(s.config) for s in samples])
sy = make2D(np.array([s.cost for s in samples]))
# best known configuration and the corresponding cost of that configuration
if maximising_cost:
best_sample = max(samples, key=lambda s: s.cost)
else:
best_sample = min(samples, key=lambda s: s.cost)
In [ ]:
def plot_bayes_step_1D(xs, param_name, true_ys, gp_params, samples):
'''
xs: possible values along a single parameter
param_name: the name of the parameter that xs corresponds to
true_ys: true cost function corresponding to xs (None to omit)
'''
plt.figure(figsize=(16,8))
plt.margins(0.1, 0.1)
plt.subplot(2, 1, 1) # nrows, ncols, plot_number
plt.xlabel('paramater: ' + param_name)
plt.ylabel('cost')
if true_ys is not None:
plt.plot(xs, true_ys, 'g-')
sx = np.array([[s.config[param_name]] for s in samples])
sy = np.array([[s.cost] for s in samples])
plt.plot(sx, sy, 'bo')
gp_model = gp.GaussianProcessRegressor(**gp_params)
gp_model.fit(sx, sy)
mu, sigma = gp_model.predict(make2D(xs), return_std=True)
mu = mu.flatten()
plt.plot(xs, mu, 'm-')
plt.fill_between(xs, mu - sigma, mu + sigma, alpha=0.3, color='y')
plt.plot(sample.config[param_name], sample.cost, 'y*', markersize=30, alpha=0.8)
plt.subplot(2, 1, 2) # nrows, ncols, plot_number
plt.xlabel('paramater: ' + param_name)
plt.ylabel('EI')
ac = acquisition_function(make2D(xs), gp_model, best_sample.cost, maximising_cost)
plt.plot(xs, ac, 'r-')
plt.show()
In [ ]:
def plot_bayes_step_slice(xs, param_name, true_ys, gp_model, sx, sy, next_x, next_ac):
'''
xs: possible values along a single parameter
param_name: the name of the parameter that xs corresponds to
true_ys: true cost function corresponding to xs (None to omit)
gp_model: the gaussian process used in the Bayesian optimisation (already fitted to the data)
sx: sample inputs, shape = (num_samples, num_attribs)
sy: sample objective values, shape = (num_samples, 1)
next_x: config (dict) for the next sample to take
next_ac: (positive) acquisition function value for next_x
'''
plt.figure(figsize=(16,8))
plt.margins(0.1, 0.1)
plt.subplots_adjust(hspace=0.3)
plt.subplot(2, 1, 1) # nrows, ncols, plot_number
plt.xlabel('paramater: ' + param_name)
plt.ylabel('cost')
plt.title('Surrogate objective function')
if true_ys is not None:
plt.plot(xs, true_ys, 'g-')
plt.plot([input_to_config(x)[param_name] for x in sx], sy, 'bo')
# take the next_x configuration and peterb the parameter `param_name` while leaving the others constant
def mks(x):
c = next_x.copy()
c[param_name] = x
return config_to_input(c)
inputs_ = np.vstack([mks(x) for x in xs])
mu, sigma = gp_model.predict(inputs_, return_std=True)
mu = mu.flatten()
plt.plot(xs, mu, 'm-')
plt.fill_between(xs, mu - 3*sigma, mu + 3*sigma, alpha=0.3, color='y')
#plt.plot(sample.config[param_name], sample.cost, 'y*', markersize=30, alpha=0.8)
plt.axvline(x=next_x[param_name])
plt.subplot(2, 1, 2) # nrows, ncols, plot_number
plt.xlabel('paramater: ' + param_name)
plt.ylabel('EI')
plt.title('acquisition function')
ac = acquisition_function(inputs_, gp_model, best_sample.cost, maximising_cost)
plt.plot(xs, ac, 'r-')
plt.axvline(x=next_x[param_name])
plt.plot(next_x[param_name], next_ac, 'b*', markersize=20, alpha=0.8)
plt.show()
In [ ]:
for i in range(max_iterations):
gp_model.fit(sx, sy)
# maximise the acquisition function to obtain the next configuration to test
# scipy has no maximise function, so instead minimise the negation of the acquisition function
# reshape(1,-1) => 1 sample (row) with N attributes (cols). Needed because x is passed as shape (N,)
neg_acquisition_function = lambda x: -acquisition_function(x.reshape(1,-1), gp_model, best_sample.cost, maximising_cost)
# minimise the negative acquisition function
best_next_x = None
best_neg_ac = 0 # negative acquisition function value for best_next_x
for j in range(num_restarts):
starting_point = config_to_input(random_sample())
# result is an OptimizeResult object
result = scipy.optimize.minimize(
fun=neg_acquisition_function,
x0=starting_point,
bounds=[(x_min, x_max), (0, 2)],
method='L-BFGS-B',
options=dict(maxiter=15000) # maxiter=15000 is default
)
assert result.success
# result.fun == negative acquisition function evaluated at result.x
if result.fun < best_neg_ac:
best_next_x = result.x
best_neg_ac = result.fun
# acquisition function optimisation finished:
# best_next_x = argmax(acquisition_function)
if np.random.uniform() < prob_random:
print('choosing random sample because prob_random={}'.format(prob_random))
best_next_x = random_sample()
best_neg_ac = 1
elif best_next_x is None:
print('choosing random sample because no sample was found with acquisition value >0')
best_next_x = random_sample()
best_neg_ac = 1
elif np.any(np.linalg.norm(best_next_x - sx) <= 1e-7):
print('choosing random sample to avoid samples being too close')
best_next_x = random_sample() # choose randomly instead
best_neg_ac = 1
else:
best_next_x = input_to_config(best_next_x)
#plot_bayes_step_1D(xs, 'x', ys, gp_params, samples)
plot_bayes_step_slice(xs, 'x', ys, gp_model, sx, sy, best_next_x, -best_neg_ac)
# having two samples too close together will 'break' the GP
# will assume that randomly chosen samples and the pre-samples are unlikely to ever be too close
sample = Sample(best_next_x, test_config(best_next_x))
samples.append(sample)
# config and cost should be numpy arrays
sx = np.append(sx, config_to_input(sample.config), axis=0)
sy = np.append(sy, make2D(sample.cost), axis=0)
# update current best
if ((maximising_cost and sample.cost > best_sample.cost) or
(not maximising_cost and sample.cost < best_sample.cost)):
best_sample = sample
print('iteration done')
In [ ]: