Introduction

The problem of surrogate modelling is as follows. There are several control variables that can be changed by a decision maker and the goal is to find optimal vector of control variables' values, but a loss to be minimized is hard to measure and so its values are available only on limited amount of control variables' vectors. The approach of surrogate modelling lies in building a based on available data model that approximates the loss function on the whole set of available inputs. Then a minimum point of loss function's approximation is regarded as minimum point of real loss.

Let us provide an example of situation where surrogate modelling is appropriate. Suppose that engineers are going to create new alloy of two metals and these engineers have two control variables: proportion of one metal to the other and intensity of heat treatment. Assume also that a loss is fragility of alloy. Engineers can produce pieces of various alloys that are obtained with several values of control variables and test them empirically, but this procedure is expensive. After some data are gathered, it is possible to build an approximation of fragility function and then test an alloy that corresponds to its minimum point.

In this notebook, machine learning approach is used for surrogate modelling of some functions of two variables. Note that usage of tree-based ensembles is not interesting in this context, because they can not make a prediction that is below target's minimum over training sample or above target's maximum over training sample and, in particular, they can not extrapolate trends. Neural networks are a more intriguing choice.

General Preparations


In [1]:
from functools import partial

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score

import keras
from keras.layers import Dense, Dropout
from keras.layers.advanced_activations import LeakyReLU
from keras.initializers import RandomNormal
from keras.callbacks import EarlyStopping

from tensorflow import set_random_seed as set_random_seed_for_tf

from tqdm import tqdm


Using TensorFlow backend.

In [2]:
sns.set()  # Make all graphs prettier.

In [3]:
np.random.seed(361)
set_random_seed_for_tf(361)

Template of Experiments

Designs of Experiments

Since functions of just two arguments are studied here, there is no curse of dimensionality. Hence, complex designs such as Latin hypercubes, Sobol sequences, or Halton sequences are not necessary. Only two designs are used:

  • Random sampling from uniform distribution on a constrained segment of domain;
  • Grid of particular size and equal steps.

In [4]:
def draw_from_uniform_distribution(n_samples, x_borders, y_borders):
    """
    Draws `n_samples` from a uniform
    distribution on a rectangle that is
    a Cartesian product of intervals with
    ends represented by `x_borders` and
    `y_borders` respectively.
    
    @type n_samples: int
    @type x_borders: tuple(float)
    @type y_borders: tuple(float)
    @rtype: numpy.ndarray
    """
    xs = np.random.uniform(x_borders[0], x_borders[1], n_samples).reshape((-1, 1))
    ys = np.random.uniform(y_borders[0], y_borders[1], n_samples).reshape((-1, 1))
    return np.hstack((xs, ys))

In [5]:
def create_grid(step, left_bottom_corner, x_n_steps, y_n_steps):
    """
    Returns array of points from a rectangular
    grid with `step` as vertical or horizontal
    distance between adjacent nodes. Size of
    grid is determined by `x_n_steps` and
    `y_n_steps`, while its location is determined
    via `left_bottom_corner`.
    
    @type step: float
    @type left_bottom_corner: tuple(float)
    @type x_n_steps: int
    @type y_n_steps: int
    @rtype: numpy.ndarray
    """
    xs = [left_bottom_corner[0] + i * step for i in range(x_n_steps)]
    ys = [left_bottom_corner[1] + i * step for i in range(y_n_steps)]
    return np.transpose([np.tile(xs, len(ys)), np.repeat(ys, len(xs))])

Actually, brute force over points from above designs is used for surrogate loss optimization. More complex techniques (e.g. gradient-based optimization or Nelder-Mead method) are not used, because brute force is computationally feasible due to low dimensionality.

Runner


In [6]:
class RunnerOfSurrogateModelling(object):
    """
    Structurizes utilities that are useful
    for the following experiment. Given a
    function of two real-valued arguments
    `func` (actually, `func` must have also
    an optional argument `noise_stddev`),
    learning sample for this function recovery
    is drawn from an experiment that is
    designed in accordance with `measurement_doe`.
    Then a neural network returned by
    `build_model` is trained to approximate `func`
    and its optimum is found in accordance
    with `optimization_doe`.
    """
    
    def __init__(self, func, noise_stddev, build_model,
                 measurement_doe, optimization_doe):
        """
        @type func: function
        @type noise_stddev: float
        @type build_model: function
        @type measurement_doe: str
        @type optimization_doe: str
        """
        self.func = func
        self.noise_stddev = noise_stddev
        self.build_model = build_model
        
        if measurement_doe == 'random':
            self.draw_measurements = draw_from_uniform_distribution
        elif measurement_doe == 'grid':
            self.draw_measurements = create_grid
        else:
            raise ValueError("Unknown `measurement_doe`: {}".format(measurement_doe))
            
        if optimization_doe == 'random':
            self.draw_candidates = draw_from_uniform_distribution
        elif optimization_doe == 'grid':
            self.draw_candidates = create_grid
        else:
            raise ValueError("Unknown `optimization_doe`: {}".format(optimization_doe))
        
        self.train_data = None
        self.model = None
        self.goodness_of_fit = None
        self.minimum_point = None
        self.actual_value_at_minimum_point = None
        self.benchmark = None
        
    def _draw_train_data(self, **kwargs):
        """
        Draws data according to design of
        experiment specified by `kwargs` and
        `self.draw_measurements`.
        
        @rtype: NoneType
        """
        train_inputs = self.draw_measurements(**kwargs)
        self.train_data = pd.DataFrame(train_inputs, columns=['x', 'y'])
        self.train_data['target'] = self.train_data.apply(
            lambda row: self.func(row['x'], row['y'], self.noise_stddev), axis=1)
        
    def _train_surrogate_model(self, **kwargs):
        """
        Fits `self.model` to `self.train_data`
        with specified hyperparameters.
        
        @rtype: NoneType
        """
        # In Keras, there is no easy way to reset weights without recompiling a model.
        self.model = self.build_model()
        X_train, X_test, y_train, y_test = \
            train_test_split(self.train_data[['x', 'y']].as_matrix(),
                             self.train_data['target'].as_matrix(),
                             random_state=361)
        hst = self.model.fit(X_train, y_train, validation_data=(X_test, y_test),
                             callbacks=[EarlyStopping(patience=10)], verbose=0,
                             **kwargs)
        self.goodness_of_fit = {'train_mse': hst.history['loss'][-1],
                                'val_mse': hst.history['val_loss'][-1]}
        
    def _compare_target_vs_surrogate(self, **kwargs):
        """
        Plots two graphs near each other
        and computes R^2 coefficient of
        determination.
        
        @rtype: NoneType
        """
        df = pd.DataFrame(create_grid(**kwargs), columns=['x', 'y'])
        df['target'] = df.apply(
            lambda row: self.func(row['x'], row['y'], noise_stddev=0),
            axis=1)
        df['surrogate'] = df.apply(
            lambda row: self.model.predict(row[['x', 'y']].as_matrix().reshape((1, 2)))[0, 0],
            axis=1)
        self.goodness_of_fit['r2'] = r2_score(df['target'], df['surrogate'])
        tqdm.write("Evaluation: overall R^2 is {:1.3f}".format(self.goodness_of_fit['r2']))
        
        fig = plt.figure(figsize=(15, 7))
        
        ax_one = fig.add_subplot(121)
        ax_one.scatter(df['x'], df['y'], c=df['target'], cmap='coolwarm')
        ax_one.set_title('Actual function')
        ax_one.set_xlabel('x')
        ax_one.set_ylabel('y')
        ax_one.set_aspect('equal')
        
        # TODO: unify colormap scale.
        ax_two = fig.add_subplot(122)
        ax_two.scatter(df['x'], df['y'], c=df['surrogate'], cmap='coolwarm')
        ax_two.set_title('Surrogate function')
        ax_two.set_xlabel('x')
        ax_two.set_ylabel('y')
        ax_two.set_aspect('equal')
        
    def _compute_benchmark(self):
        """
        Computes benchmark that is a score at
        a minimum point of (in general, noisy)
        measured target. Ties in values of the
        measured target are broken at random.
        
        @rtype: NoneType
        """
        cond = self.train_data['target'] == self.train_data['target'].min()
        potential_inputs = self.train_data.loc[cond, ['x', 'y']]
        inputs = potential_inputs.sample(random_state=361).iloc[0, :]
        self.benchmark = self.func(inputs['x'], inputs['y'], noise_stddev=0)
    
    def _find_minimum_of_surrogate_target(self, **kwargs):
        """
        Finds a point with the lowest prediction
        of `self.model` amongst points that are
        obtained in accordance with
        `self.optimization_doe`.
        Ties are broken by random choice.
        
        @rtype: NoneType
        """
        candidates = self.draw_candidates(**kwargs)
        candidates = pd.DataFrame(candidates, columns=['x', 'y'])
        candidates['surrogate_target'] = candidates.apply(
            lambda row: self.model.predict(row.as_matrix().reshape((1, 2)))[0, 0],
            axis=1)
        
        cond = candidates['surrogate_target'] == candidates['surrogate_target'].min()
        minimum_points = candidates.loc[cond, ['x', 'y']]
        self.minimum_point = minimum_points.sample(random_state=361).iloc[0, :]
        self.minimum_point = self.minimum_point.as_matrix().reshape((1, 2))
        self.actual_value_at_minimum_point = self.func(
            self.minimum_point[0, 0], self.minimum_point[0, 1], noise_stddev=0)
        
    def run_experiment(self, n_runs, runs_to_be_evaluated, evaluation_settings,
                       measurement_settings, train_settings, optimization_settings,
                       same_train_sample_for_all_runs=True, verbose=False):
        """
        Launchs `n_runs` experiments defined by
        `measurement_settings`, `train_settings`,
        and `optimization_settings`.
        For experiments that are in
        `runs_to_be_evaluated`, learnt surrogate
        function is plotted against the actual
        function with 'evaluation_settings`
        and R^2 score is computed. To avoid
        granularity of graphs, one should pass
        fine grid and it makes computations
        hard, so it is recommended to include
        in `runs_to_be_evaluated` only small
        proportion of runs.
        
        @type n_runs: int
        @type runs_to_be_evaluated: list(int)
        @type evaluation_settings: dict(str -> any)
        @type measurement_settings: dict(str -> any)
        @type train_settings: dict(str -> any)
        @type optimization_settings: dict(str -> any)
        @type same_train_sample_for_all_runs: bool
        @type verbose: bool
        @rtype: pandas.DataFrame
        """
        results = []
        runs = tqdm(range(n_runs)) if verbose else range(n_runs)
        for i in runs:
            if self.train_data is None:
                self._draw_train_data(**measurement_settings)
            
            # Even if `same_train_sample_for_all_runs` is `True`,
            # benchmark can vary due to ties.
            self._compute_benchmark()
            
            self._train_surrogate_model(**train_settings)
            if i in runs_to_be_evaluated:
                self._compare_target_vs_surrogate(**evaluation_settings)
            self._find_minimum_of_surrogate_target(**optimization_settings)
            
            benchmark = self.benchmark
            score = self.actual_value_at_minimum_point
            minimum_point_x = self.minimum_point[0, 0]
            minimum_point_y = self.minimum_point[0, 1]
            results.append([benchmark, score, minimum_point_x, minimum_point_y])
            
            if not same_train_sample_for_all_runs:
                self.train_data = None
        result = pd.DataFrame(results, columns=['benchmark', 'score', 'x', 'y'])
        return result

In [7]:
def build_mlp(hidden_layers_widths, keep_prob, learning_rate):
    """
    Builds Multi-Layer Perceptron (MLP).
    
    @type hidden_layers_widths: list(int)
    @type keep_prob: float
    @type learning_rate: float
    @rtype: keras.Model
    """
    model = keras.models.Sequential()
    model.add(
        Dense(hidden_layers_widths[0], input_dim=2,
              kernel_initializer=RandomNormal(stddev=0.01, seed=361)))
    model.add(LeakyReLU(alpha=0.003))
    model.add(Dropout(keep_prob))
    for i in range(1, len(hidden_layers_widths)):
        model.add(
            Dense(hidden_layers_widths[i],
                  kernel_initializer=RandomNormal(stddev=0.01, seed=361)))
        model.add(LeakyReLU(alpha=0.001))
        model.add(Dropout(keep_prob))
    model.add(
        Dense(1, kernel_initializer=RandomNormal(stddev=0.01, seed=361)))
    model.compile(loss='mean_squared_error',
                  optimizer=keras.optimizers.Adam(lr=learning_rate))
    return model

Experiments

Himmelblau's Function

As an example of non-convex function with multiple global minima, Himmelblau's function is chosen here.

It is: $$f(x, y) = (x^2+y-11)^2 + (x+y^2-7)^2.$$

Himmelblau's function is of particular interest, because there is a region where it looks like plateau.


In [8]:
def noisy_himmelblaus_function(x, y, noise_stddev=0):
    """
    Computes Himmelblau's function at
    point (`x`, `y`) and adds some
    Gaussian noise.
    
    @type x: float
    @type y: float
    @type noise_stddev: float
    @rtype: float
    """
    himmelblaus_value = (x**2 + y - 11)**2 + (x + y**2 - 7)**2
    noise = np.random.normal(scale=noise_stddev, size=1)[0]
    return himmelblaus_value + noise

In [9]:
# Target definition.
func = noisy_himmelblaus_function
noise_stddev = 0.01

# Model settings.
hidden_layers_widths = [400, 200]
keep_prob = 0.95
learning_rate = 0.001
train_settings = {'epochs': 100,
                  'batch_size': 32}

# Design of experiment.
measurement_doe = 'random'
measurement_settings = {'n_samples': 20000,
                        'x_borders': (-7, 7),
                        'y_borders': (-7, 7)}
optimization_doe = 'grid'
optimization_settings = {'step': 0.1,
                         'left_bottom_corner': (-7, -7),
                         'x_n_steps': 141,
                         'y_n_steps': 141}

In [10]:
build_model = partial(build_mlp, *[hidden_layers_widths, keep_prob, learning_rate])
runner = RunnerOfSurrogateModelling(
    func, noise_stddev, build_model, measurement_doe, optimization_doe)
df = runner.run_experiment(n_runs=20, runs_to_be_evaluated=[0],
                           evaluation_settings=optimization_settings,
                           measurement_settings=measurement_settings,
                           train_settings=train_settings,
                           optimization_settings=optimization_settings)


Evaluation: overall R^2 is 0.884

Surrogate function has low variance within a region that is coloured in deep blue on the right graph. Does it result in wrong placement of minimum points?


In [11]:
df[['benchmark', 'score']].describe()


Out[11]:
benchmark score
count 2.000000e+01 20.000000
mean 5.799082e-03 31.099370
std 8.898944e-19 18.078997
min 5.799082e-03 5.801600
25% 5.799082e-03 23.049700
50% 5.799082e-03 26.027050
75% 5.799082e-03 35.289100
max 5.799082e-03 89.880200

In [12]:
fig = plt.figure(figsize=(9, 9))
ax = fig.add_subplot(111)
ax.set_xlim(-7.5, 7.5)
ax.set_ylim(-7.5, 7.5)
ax.set_title('Surrogate minimum points and actual minimum points')
ax.set_xlabel('x')
ax.set_ylabel('y')

sns.kdeplot(df['x'], df['y'], cmap='Reds', shade=True, shade_lowest=False, ax=ax)
ax.scatter(df['x'], df['y'])

himmelblaus_roots = pd.DataFrame([[3, 2],
                                  [-2.805118, 3.131312],
                                  [-3.77931, -3.283186],
                                  [3.584428, -1.848126]],
                                  columns=['x', 'y'])
_ = ax.scatter(himmelblaus_roots['x'], himmelblaus_roots['y'], marker='x', s=50, c='black')


Surrogate minimum points (blue points) are far enough from the nearest to them actual minimum point (represented as a black cross).

Rosenbrock Function

Now let us try another non-convex function. Namely, it is Rosenbrock function.

Its definition is: $$f(x, y) = (a - x)^2 + b(y - x^2)^2.$$

Rosenbrock function is of particular interest, because it has a valley where its values are close to its global minimum.


In [13]:
def noisy_rosenbrock_function(x, y, noise_stddev=0, a=1, b=100):
    """
    Computes Rosenbrock function
    with parameters `a` and `b`at
    point (`x`, `y`) and adds some
    Gaussian noise.
    
    @type x: float
    @type y: float
    @type noise_stddev: float
    @type a: float
    @type b: float
    @rtype: float
    """
    rosenbrock_value = (a - x)**2 + b * (y - x**2)**2
    noise = np.random.normal(scale=noise_stddev, size=1)[0]
    return rosenbrock_value + noise

In [14]:
# Target definition.
func = noisy_rosenbrock_function
noise_stddev = 0.01

# Model settings.
hidden_layers_widths = [400, 200]
keep_prob = 0.95
learning_rate = 0.001
train_settings = {'epochs': 100,
                  'batch_size': 32}

# Design of experiment.
measurement_doe = 'random'
measurement_settings = {'n_samples': 20000,
                       'x_borders': (-2, 2),
                       'y_borders': (-1, 3)}
optimization_doe = 'grid'
optimization_settings = {'step': 0.05,
                         'left_bottom_corner': (-2, -1),
                         'x_n_steps': 81,
                         'y_n_steps': 81}

# Making plots less granular.
evaluation_settings = {'step': 0.025,
                       'left_bottom_corner': (-2, -1),
                       'x_n_steps': 161,
                       'y_n_steps': 161}

In [15]:
build_model = partial(build_mlp, *[hidden_layers_widths, keep_prob, learning_rate])
runner = RunnerOfSurrogateModelling(
    func, noise_stddev, build_model, measurement_doe, optimization_doe)
df = runner.run_experiment(n_runs=20, runs_to_be_evaluated=[0],
                           evaluation_settings=evaluation_settings,
                           measurement_settings=measurement_settings,
                           train_settings=train_settings,
                           optimization_settings=optimization_settings)


Evaluation: overall R^2 is 0.803

In [16]:
df[['benchmark', 'score']].describe()


Out[16]:
benchmark score
count 2.000000e+01 20.000000
mean 1.857491e-03 126.321094
std 2.224736e-19 14.071122
min 1.857491e-03 91.220000
25% 1.857491e-03 120.770000
50% 1.857491e-03 128.361562
75% 1.857491e-03 134.434844
max 1.857491e-03 150.173125

In [17]:
fig = plt.figure(figsize=(9, 9))
ax = fig.add_subplot(111)
ax.set_xlim(-2.5, 2.5)
ax.set_ylim(-1.5, 3.5)
ax.set_title('Surrogate minimum points and actual minimum point')
ax.set_xlabel('x')
ax.set_ylabel('y')

sns.kdeplot(df['x'], df['y'], cmap='Reds', shade=True, shade_lowest=False, ax=ax)
ax.scatter(df['x'], df['y'])

rosenbrock_roots = pd.DataFrame([[1, 1]],
                                 columns=['x', 'y'])
_ = ax.scatter(rosenbrock_roots['x'], rosenbrock_roots['y'], marker='x', s=50, c='black')


It looks like surrogate minimum points are less stable than in case of Himmelblau's function's modelling, does not it?

Conclusion

One might think that surrogate modelling does not give any advantages over naive approach at least in 2D case. Although surrogate modelling is an approach suitable for high-dimensional data where grids' sizes grow too fast to allow having fine enough steps, there is an obvious improvement of surrogate modelling's scores. Himmelblau's function is a polynomial and Rosenbrock function is a polynomial as well. Thus, linear regression with squared terms included can model these functions better than the neural networks that are trained above. If you are wondering why neural networks are chosen here as a tool, please note that this notebook is placed in a directory named hard_use_and_abuse. Besides that, probably, functions that are not polynomials will be studied here in the future.

The last, but not the least, all above experiments are run with quite low standard deviation of noise. An interested reader can increase this parameter and look how performances of both benchmarks and surrogate models degrade.