In [ ]:
%load_ext autoreload
%autoreload 2

In [ ]:
from IPython.core.debugger import Tracer # debugging
from IPython.display import clear_output

import time

%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns # prettify matplotlib
from mpl_toolkits.mplot3d import Axes3D # for matplotlib 3D plots

import numpy as np
import sklearn.gaussian_process as gp
import scipy.optimize
from scipy.stats import norm # Gaussian/normal distribution

In [ ]:
# make deterministic
np.random.seed(100)

In [ ]:
#f = lambda x: x * np.cos(x)
#x_min = 0
#x_max = 12

f = lambda x: np.exp(-(x - 2)**2) + np.exp(-(x - 6)**2/10) + 1/ (x**2 + 1)
x_min = -2
x_max = 10

xs = np.linspace(x_min, x_max, 100)
ys = f(xs)

In [ ]:
plt.figure(figsize=(16,8))
plt.plot(xs, ys, 'g-')
plt.margins(0.1, 0.1)
plt.xlabel('x')
plt.ylabel('cost')
plt.show()

Utilities


In [ ]:
inf = float('inf')

def make2D(arr):
    ''' convert a numpy array with shape (l,) into an array with shape (l,1)
        (np.atleast_2d behaves similarly but would give shape (1,l) instead)
    '''
    return arr.reshape(-1, 1)

def log_uniform(low, high):
    ''' sample a random number in the interval [low, high] distributed logarithmically within that space '''
    return np.exp(np.random.uniform(np.log(low), np.log(high)))

def range_type(range_):
    ''' determine whether the range is 'linear', 'logarithmic' or 'arbitrary'
    range_: must be numpy array
    note: range_ must be sorted either ascending or descending to be detected as linear or logarithmic
    '''
    if len(range_) == 1:
        return 'constant'
    # 'i' => integer, 'u' => unsigned integer, 'f' => floating point
    elif len(range_) < 2 or range_.dtype.kind not in 'iuf':
        return 'arbitrary'
    else:
        tmp = range_[1:] - range_[:-1] # differences between element i and element i+1
        is_lin = np.all(np.isclose(tmp[0], tmp)) # same difference between each element
        if is_lin:
            return 'linear'
        else:
            tmp = np.log(range_)
            tmp = tmp[1:] - tmp[:-1]
            is_log = np.all(np.isclose(tmp[0], tmp))
            if is_log:
                return 'logarithmic'
            else:
                return 'arbitrary'
        
def sample_from(range_):
    ''' draw a random sample from the given range in an appropriate mannor
    ie if the range is linear: sample uniformally, if the range is logarithmic: sample log-uniformally
    note: range_ can be in ascending or descending order
    '''
    assert len(range_) > 0, 'range_ must not be empty'
    t = range_type(range_)
    if t == 'linear':
        return np.random.uniform(range_[0], range_[-1]) # doesn't matter if ascending order or descending
    elif t == 'logarithmic':
        return log_uniform(range_[0], range_[-1]) # doesn't matter if ascending order or descending
    elif t == 'arbitrary' or t == 'constant':
        return np.random.choice(range_)
    else:
        assert False, 'Invalid range type'
        
class Sample(object):
    def __init__(self, config, cost):
        self.config = config
        self.cost = cost
    def __repr__(self):
        return '(config={}, cost={})'.format(config_string(self.config), self.cost)
    def __iter__(self):
        ''' so you can write my_config, my_cost = my_sample '''
        yield self.config
        yield self.cost
    def __eq__(self, other):
        return self.config == other.config and self.cost == other.cost

In [ ]:
a = np.linspace(0,10, num=11)
t = a[1:] - a[:-1] # differences between element i and element i+1
np.all(np.isclose(t[0], t)) # same difference

b = np.logspace(1,10, num=11, base=3) # base=3, showing that base doesn't matter
t = np.log(b)
t = t[1:] - t[:-1]
np.all(np.isclose(t[0], t))

print(range_type(np.linspace(1, 10, num=12)) == 'linear')
print(range_type(np.linspace(10, 1, num=12)) == 'linear') # can be in descending order
print(range_type(np.array([1,2,3])) == 'linear')
print(range_type(np.array([0,10])) == 'linear') # can be 2 points
print(range_type(np.logspace(1, 10, num=2)) == 'linear') # need at least 3 points to determine logarithmic

print(range_type(np.logspace(1, 10, num=12)) == 'logarithmic')
print(range_type(np.logspace(10, 1, num=12)) == 'logarithmic') # can be in descending order
print(range_type(np.logspace(1, 10, num=12, base=14)) == 'logarithmic') # can be any base

print(range_type(np.array([12, 1, 10])) == 'arbitrary')
print(range_type(np.array([])) == 'arbitrary')
print(range_type(np.array(["hi", "there"])) == 'arbitrary')

print(range_type(np.array(["hi"])) == 'constant')
print(range_type(np.array([1])) == 'constant')

Bayesian Optimisation

Resources

  • https://thuijskens.github.io/2016/12/29/bayesian-optimisation/
  • https://github.com/thuijskens/bayesian-optimization/blob/master/python/gp.py
  • https://github.com/fmfn/BayesianOptimization/blob/master/examples/visualization.ipynb
  • Lizotte, 2008
  • https://github.com/fmfn/BayesianOptimization
    • has unique_rows function

The acquisition function is maximised using 'L-BFGS-B' which stands for 'Limited-Memory Broyden-Fletcher-Goldfarb-Shanno Bounded' https://en.wikipedia.org/wiki/Limited-memory_BFGS

Acquisition Function

Expected improvement $EI$ is one possible acquisition function: $$EI(\mathbf x)=\mathbb E\left[max(0,\; f(\mathbf x)-f(\mathbf x^+))\right]$$ where $f$ is the surrogate objective function and $\mathbf x^+=$ the best known configuration so far.

Maximising the expected improvement will result in the next configuration to test ($\mathbf x$) being better ($f(\mathbf x)$ larger) than $\mathbf x^+$ (but note that $f$ is only an approximation to the real objective function). $$\mathbf x_{\mathrm{next}}=\arg\max_{\mathbf x}EI(\mathbf x)$$

If $f$ is a Gaussian Process (which it is in this case) then $EI$ can be calculated analytically:

$$EI(\mathbf x)=\begin{cases} \left(\mu(\mathbf x)-f(\mathbf x^+)\right)\mathbf\Phi(Z) \;+\; \sigma(\mathbf x)\phi(Z) & \text{if } \sigma(\mathbf x)>0\\ 0 & \text{if } \sigma(\mathbf x) = 0 \end{cases}$$$$Z=\frac{\mu(\mathbf x)-f(\mathbf x^+)}{\sigma(\mathbf x)}$$

Where

  • $\phi(\cdot)=$ standard multivariate normal distribution PDF (ie $\boldsymbol\mu=\mathbf 0$, $\Sigma=I$)
  • $\Phi(\cdot)=$ standard multivariate normal distribution CDF

a parameter $\xi$ can be introduced to control the exploitation-exploration trade-off ($\xi=0.01$ works well in almost all cases (Lizotte, 2008))

$$EI(\mathbf x)=\begin{cases} \left(\mu(\mathbf x)-f(\mathbf x^+)-\xi\right)\mathbf\Phi(Z) \;+\; \sigma(\mathbf x)\phi(Z) & \text{if } \sigma(\mathbf x)>0\\ 0 & \text{if } \sigma(\mathbf x) = 0 \end{cases}$$$$Z=\begin{cases} \frac{\mu(\mathbf x)-f(\mathbf x^+)-\xi}{\sigma(\mathbf x)} & \text{if }\sigma(\mathbf x)>0\\ 0 & \text{if }\sigma(\mathbf x) = 0 \end{cases}$$

Note

Early on in development I was confused by the minimiser as it wasn't catching the peak of the acquisition function every time and it looked like it was instead settling randomly somewhere along the y=0 line. I no longer think this is the case. I think that the minimiser is a local optimiser and so any peak is fine, although there will usually be a single high peak, that local minimum is very thin and there are other much flatter local optima elsewhere in the parameter space. It is actually a good thing for exploring more of the parameter space that samples from other local optima are chosen occasionally.


In [ ]:
def expected_improvement(xs, gp_model, best_cost, maximising_cost=False, xi=0.01):
    ''' expected improvement acquisition function
    xs: array of configurations to evaluate the GP at
    gp_model: the GP fitted to the past configuration
    best_cost: the (actual) cost of the best known configuration (either smallest or largest depending on maximising_cost)
    maximising_cost: True => higher cost is better, False => lower cost is better
    '''
    mus, sigmas = gp_model.predict(xs, return_std=True)
    sigmas = make2D(sigmas)
    
    sf = 1 if maximising_cost else -1   # scaling factor
    diff = sf * (mus - best_cost - xi)  # mu(x) - f(x+) - xi
    
    with np.errstate(divide='ignore'):
        Zs = diff / sigmas # produces inf where sigmas[i] == 0.0
        
    EIs = diff * norm.cdf(Zs)  +  sigmas * norm.pdf(Zs)
    EIs[sigmas == 0.0] = 0.0 # replace the infs with 0s
    
    return EIs

Parameters


In [ ]:
pre_samples = 3 # number of samples to obtain randomly before fitting the GP
max_iterations = 1 # number of configurations to test
num_restarts = 10 # number of restarts of the acquisition function optimisation
maximising_cost = True # True => higher cost is better
acquisition_function = expected_improvement
prob_random = 0.1 # probability of ignoring the Bayesian optimisation and choosing randomly for the next sample
'''
gp_params = dict( # Gaussian process parameters
    alpha = 1e-10,
    kernel = gp.kernels.Matern(length_scale=1),
    n_restarts_optimizer = 10,
    #normalize_y = True # make the mean 0
)
'''

gp_params = dict( # Gaussian process parameters
    #alpha = 1e-5,
    #kernel = gp.kernels.RBF(length_scale=1.0, length_scale_bounds="fixed"), # the default kernel
    #kernel = gp.kernels.Matern(length_scale=1.0, length_scale_bounds="fixed"),
    #n_restarts_optimizer = 10,
    
    alpha = 1e-7,
    #kernel = gp.kernels.Matern(nu=2.5),
    kernel = gp.kernels.Matern(),
    n_restarts_optimizer = 10,
    
    normalize_y = True # make the mean 0 (theoretically a bad thing, see docs, but can help)
)

In [ ]:
def test_config(config):
    return f(config['x'])

def config_to_input(config):
    '''
    given a config dictionary, return a numpy array containing the parameters that should be used as inputs to
    the Bayesian optimisation algorithm, ie excluding parameters that are constant
    '''
    return np.array([[config['x'], config['z']]])

def input_to_config(input_):
    '''
    given a numpy array of parameter values, return a configuration dict with all parameters present
    (ie filling out the constant values)
    '''
    return {'x' : input_[0], 'z' : input_[1]}

First obtain a few randomly selected samples from the objective function to begin fitting the surrogate function to it.


In [ ]:
def random_sample():
    return {'x' : sample_from(xs), 'z' : 1.0}

# samples stored as Sample objects for storage
samples = []
for i in range(pre_samples):
    config = random_sample()
    samples.append(Sample(config, test_config(config)))

In [ ]:
#TODO: document params to GP
gp_model = gp.GaussianProcessRegressor(**gp_params)

# samples stored as numpy arrays for calculations
sx = np.vstack([config_to_input(s.config) for s in samples])
sy = make2D(np.array([s.cost for s in samples]))

# best known configuration and the corresponding cost of that configuration
if maximising_cost:
    best_sample = max(samples, key=lambda s: s.cost)
else:
    best_sample = min(samples, key=lambda s: s.cost)

In [ ]:
def plot_bayes_step_1D(xs, param_name, true_ys, gp_params, samples):
    '''
    xs: possible values along a single parameter
    param_name: the name of the parameter that xs corresponds to
    true_ys: true cost function corresponding to xs (None to omit)
    '''
    plt.figure(figsize=(16,8))
    
    plt.margins(0.1, 0.1)

    plt.subplot(2, 1, 1) # nrows, ncols, plot_number
    plt.xlabel('paramater: ' + param_name)
    plt.ylabel('cost')

    if true_ys is not None:
        plt.plot(xs, true_ys, 'g-')
        
    sx = np.array([[s.config[param_name]] for s in samples])
    sy = np.array([[s.cost] for s in samples])
    plt.plot(sx, sy, 'bo')

    gp_model = gp.GaussianProcessRegressor(**gp_params)
    gp_model.fit(sx, sy)
    mu, sigma = gp_model.predict(make2D(xs), return_std=True)
    mu = mu.flatten()
    
    plt.plot(xs, mu, 'm-')
    plt.fill_between(xs, mu - sigma, mu + sigma, alpha=0.3, color='y')
    plt.plot(sample.config[param_name], sample.cost, 'y*', markersize=30, alpha=0.8)

    plt.subplot(2, 1, 2) # nrows, ncols, plot_number
    plt.xlabel('paramater: ' + param_name)
    plt.ylabel('EI')
    ac = acquisition_function(make2D(xs), gp_model, best_sample.cost, maximising_cost)
    plt.plot(xs, ac, 'r-')

    plt.show()

In [ ]:
def plot_bayes_step_slice(xs, param_name, true_ys, gp_model, sx, sy, next_x, next_ac):
    '''
    xs: possible values along a single parameter
    param_name: the name of the parameter that xs corresponds to
    true_ys: true cost function corresponding to xs (None to omit)
    gp_model: the gaussian process used in the Bayesian optimisation (already fitted to the data) 
    sx: sample inputs, shape = (num_samples, num_attribs)
    sy: sample objective values, shape = (num_samples, 1)
    next_x: config (dict) for the next sample to take
    next_ac: (positive) acquisition function value for next_x 
    '''
    plt.figure(figsize=(16,8))
    plt.margins(0.1, 0.1)
    plt.subplots_adjust(hspace=0.3)

    plt.subplot(2, 1, 1) # nrows, ncols, plot_number
    plt.xlabel('paramater: ' + param_name)
    plt.ylabel('cost')
    plt.title('Surrogate objective function')

    if true_ys is not None:
        plt.plot(xs, true_ys, 'g-')
        
    plt.plot([input_to_config(x)[param_name] for x in sx], sy, 'bo')

    # take the next_x configuration and peterb the parameter `param_name` while leaving the others constant
    def mks(x):
        c = next_x.copy()
        c[param_name] = x
        return config_to_input(c)
    inputs_ = np.vstack([mks(x) for x in xs])
    
    mu, sigma = gp_model.predict(inputs_, return_std=True)
    mu = mu.flatten()
    
    plt.plot(xs, mu, 'm-')
    plt.fill_between(xs, mu - 3*sigma, mu + 3*sigma, alpha=0.3, color='y')
    #plt.plot(sample.config[param_name], sample.cost, 'y*', markersize=30, alpha=0.8)
    plt.axvline(x=next_x[param_name])

    plt.subplot(2, 1, 2) # nrows, ncols, plot_number
    plt.xlabel('paramater: ' + param_name)
    plt.ylabel('EI')
    plt.title('acquisition function')
    ac = acquisition_function(inputs_, gp_model, best_sample.cost, maximising_cost)
    plt.plot(xs, ac, 'r-')
    plt.axvline(x=next_x[param_name])
    plt.plot(next_x[param_name], next_ac, 'b*', markersize=20, alpha=0.8)

    plt.show()

In [ ]:
for i in range(max_iterations):
    gp_model.fit(sx, sy)
    
    # maximise the acquisition function to obtain the next configuration to test
    # scipy has no maximise function, so instead minimise the negation of the acquisition function 
    # reshape(1,-1) => 1 sample (row) with N attributes (cols). Needed because x is passed as shape (N,)
    neg_acquisition_function = lambda x: -acquisition_function(x.reshape(1,-1), gp_model, best_sample.cost, maximising_cost)
    
    # minimise the negative acquisition function
    best_next_x = None
    best_neg_ac = 0 # negative acquisition function value for best_next_x
    for j in range(num_restarts):
        starting_point = config_to_input(random_sample())
        
        # result is an OptimizeResult object
        result = scipy.optimize.minimize(
            fun=neg_acquisition_function,
            x0=starting_point,
            bounds=[(x_min, x_max), (0, 2)],
            method='L-BFGS-B',
            options=dict(maxiter=15000) # maxiter=15000 is default
        )
        assert result.success
        
        # result.fun == negative acquisition function evaluated at result.x
        if result.fun < best_neg_ac:
            best_next_x = result.x
            best_neg_ac = result.fun
    
    # acquisition function optimisation finished:
    # best_next_x = argmax(acquisition_function)
    
    if np.random.uniform() < prob_random:
        print('choosing random sample because prob_random={}'.format(prob_random))
        best_next_x = random_sample()
        best_neg_ac = 1
    elif best_next_x is None:
        print('choosing random sample because no sample was found with acquisition value >0')
        best_next_x = random_sample()
        best_neg_ac = 1
    elif np.any(np.linalg.norm(best_next_x - sx) <= 1e-7):
        print('choosing random sample to avoid samples being too close')
        best_next_x = random_sample() # choose randomly instead
        best_neg_ac = 1
    else:
        best_next_x = input_to_config(best_next_x)
    
    #plot_bayes_step_1D(xs, 'x', ys, gp_params, samples)
    plot_bayes_step_slice(xs, 'x', ys, gp_model, sx, sy, best_next_x, -best_neg_ac)
    
    
    # having two samples too close together will 'break' the GP
    # will assume that randomly chosen samples and the pre-samples are unlikely to ever be too close
        
    sample = Sample(best_next_x, test_config(best_next_x))
    samples.append(sample)
    # config and cost should be numpy arrays
    sx = np.append(sx, config_to_input(sample.config), axis=0)
    sy = np.append(sy, make2D(sample.cost), axis=0)
    
    # update current best
    if ((maximising_cost and sample.cost > best_sample.cost) or
       (not maximising_cost and sample.cost < best_sample.cost)):
        best_sample = sample
    
    print('iteration done')

In [ ]: