In [1]:
!pip3 install bayesian-optimization
In [2]:
def black_box_function(x, y):
return -x ** 2 - (y - 1) ** 2 + 1
In [4]:
from bayes_opt import BayesianOptimization
In [6]:
# 파라미터 경계 정의
pbounds = {'x': (2, 4), 'y': (-3, 3)}
In [7]:
optimizer = BayesianOptimization(
f=black_box_function,
pbounds=pbounds,
verbose=2, # verbose = 1 prints only when a maximum is observed, verbose = 0 is silent
random_state=1,
)
n_iter
: 수행하려는 베이지안 최적화 단계. 더 많은 단계를 거치면 더 좋은 최대치 얻음init_points
: 수행할 무작위 탐색 단계
In [8]:
optimizer.maximize(
init_points=2,
n_iter=3,
)
In [9]:
optimizer.maximize(
init_points=10,
n_iter=3,
)
In [10]:
optimizer.maximize(
init_points=2,
n_iter=9,
)
optimizer.max
로 확인 가능
In [11]:
print(optimizer.max)
In [12]:
optimizer.set_bounds(new_bounds={"x": (-2, 3)})
In [13]:
optimizer.maximize(
init_points=0,
n_iter=5,
)
In [14]:
optimizer.probe(
params={"x": 0.5, "y": 0.7},
lazy=True,
)
In [15]:
print(optimizer.space.keys)
In [16]:
optimizer.probe(
params=[-0.3, 0.1],
lazy=True,
)
In [17]:
optimizer.maximize(init_points=0, n_iter=0)
In [19]:
from bayes_opt.observer import JSONLogger
from bayes_opt.event import Events
In [20]:
logger = JSONLogger(path="./bayesian_logs.json")
optimizer.subscribe(Events.OPTMIZATION_STEP, logger)
In [21]:
optimizer.maximize(
init_points=2,
n_iter=3,
)
In [22]:
from bayes_opt.util import load_logs
In [23]:
new_optimizer = BayesianOptimization(
f=black_box_function,
pbounds={"x": (-2, 2), "y": (-2, 2)},
verbose=2,
random_state=7,
)
print(len(new_optimizer.space))
In [24]:
load_logs(new_optimizer, logs=["./bayesian_logs.json"]);
In [25]:
print("New optimizer is now aware of {} points.".format(len(new_optimizer.space)))
In [26]:
new_optimizer.maximize(
init_points=0,
n_iter=10,
)
In [27]:
optimizer = BayesianOptimization(
f=black_box_function,
pbounds={'x': (-2, 2), 'y': (-3, 3)},
verbose=2,
random_state=1,
)
In [28]:
from bayes_opt import UtilityFunction
utility = UtilityFunction(kind="ucb", kappa=2.5, xi=0.0)
suggest
는 언제나 호출할 수 있음
In [32]:
next_point_to_probe = optimizer.suggest(utility)
print("Next point to probe is:", next_point_to_probe)
In [33]:
target = black_box_function(**next_point_to_probe)
print("Found the target value to be:", target)
In [34]:
optimizer.register(
params=next_point_to_probe,
target=target,
)
In [35]:
for _ in range(5):
next_point = optimizer.suggest(utility)
target = black_box_function(**next_point)
optimizer.register(params=next_point, target=target)
print(target, next_point)
print(optimizer.max)
In [36]:
def func_with_discrete_params(x, y, d):
# Simulate necessity of having d being discrete.
assert type(d) == int
return ((x + y + d) // (1 + d)) / (1 + (x + y) ** 2)
In [37]:
def function_to_be_optimized(x, y, w):
d = int(w)
return func_with_discrete_params(x, y, d)
In [38]:
optimizer = BayesianOptimization(
f=function_to_be_optimized,
pbounds={'x': (-10, 10), 'y': (-10, 10), 'w': (0, 5)},
verbose=2,
random_state=1,
)
In [39]:
optimizer.maximize(alpha=1e-3)
In [40]:
optimizer = BayesianOptimization(
f=black_box_function,
pbounds={'x': (-2, 2), 'y': (-3, 3)},
verbose=2,
random_state=1,
)
optimizer.maximize(
init_points=1,
n_iter=5,
# What follows are GP regressor parameters
alpha=1e-3,
n_restarts_optimizer=5
)
set_gp_params
In [41]:
optimizer.set_gp_params(normalize_y=True)
In [43]:
from bayes_opt.event import DEFAULT_EVENTS, Events
In [44]:
optimizer = BayesianOptimization(
f=black_box_function,
pbounds={'x': (-2, 2), 'y': (-3, 3)},
verbose=2,
random_state=1,
)
In [45]:
class BasicObserver:
def update(self, event, instance):
"""Does whatever you want with the event and `BayesianOptimization` instance."""
print("Event `{}` was observed".format(event))
In [46]:
my_observer = BasicObserver()
optimizer.subscribe(
event=Events.OPTMIZATION_STEP,
subscriber=my_observer,
callback=None, # Will use the `update` method as callback
)
In [47]:
def my_callback(event, instance):
print("Go nuts here!")
optimizer.subscribe(
event=Events.OPTMIZATION_START,
subscriber="Any hashable object",
callback=my_callback,
)
In [48]:
optimizer.maximize(init_points=1, n_iter=2)
In [49]:
DEFAULT_EVENTS
Out[49]:
In [50]:
from bayes_opt import BayesianOptimization
from bayes_opt import UtilityFunction
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import gridspec
%matplotlib inline
In [51]:
def target(x):
return np.exp(-(x - 2)**2) + np.exp(-(x - 6)**2/10) + 1/ (x**2 + 1)
In [52]:
x = np.linspace(-2, 10, 10000).reshape(-1, 1)
y = target(x)
plt.plot(x, y);
In [53]:
optimizer = BayesianOptimization(target, {'x': (-2, 10)}, random_state=27)
In [54]:
optimizer.maximize(init_points=2, n_iter=0, kappa=5)
In [55]:
def posterior(optimizer, x_obs, y_obs, grid):
optimizer._gp.fit(x_obs, y_obs)
mu, sigma = optimizer._gp.predict(grid, return_std=True)
return mu, sigma
def plot_gp(optimizer, x, y):
fig = plt.figure(figsize=(16, 10))
steps = len(optimizer.space)
fig.suptitle(
'Gaussian Process and Utility Function After {} Steps'.format(steps),
fontdict={'size':30}
)
gs = gridspec.GridSpec(2, 1, height_ratios=[3, 1])
axis = plt.subplot(gs[0])
acq = plt.subplot(gs[1])
x_obs = np.array([[res["params"]["x"]] for res in optimizer.res])
y_obs = np.array([res["target"] for res in optimizer.res])
mu, sigma = posterior(optimizer, x_obs, y_obs, x)
axis.plot(x, y, linewidth=3, label='Target')
axis.plot(x_obs.flatten(), y_obs, 'D', markersize=8, label=u'Observations', color='r')
axis.plot(x, mu, '--', color='k', label='Prediction')
axis.fill(np.concatenate([x, x[::-1]]),
np.concatenate([mu - 1.9600 * sigma, (mu + 1.9600 * sigma)[::-1]]),
alpha=.6, fc='c', ec='None', label='95% confidence interval')
axis.set_xlim((-2, 10))
axis.set_ylim((None, None))
axis.set_ylabel('f(x)', fontdict={'size':20})
axis.set_xlabel('x', fontdict={'size':20})
utility_function = UtilityFunction(kind="ucb", kappa=5, xi=0)
utility = utility_function.utility(x, optimizer._gp, 0)
acq.plot(x, utility, label='Utility Function', color='purple')
acq.plot(x[np.argmax(utility)], np.max(utility), '*', markersize=15,
label=u'Next Best Guess', markerfacecolor='gold', markeredgecolor='k', markeredgewidth=1)
acq.set_xlim((-2, 10))
acq.set_ylim((0, np.max(utility) + 0.5))
acq.set_ylabel('Utility', fontdict={'size':20})
acq.set_xlabel('x', fontdict={'size':20})
axis.legend(loc=2, bbox_to_anchor=(1.01, 1), borderaxespad=0.)
acq.legend(loc=2, bbox_to_anchor=(1.01, 1), borderaxespad=0.)
In [56]:
plot_gp(optimizer, x, y)
In [57]:
optimizer.maximize(init_points=0, n_iter=1, kappa=5)
plot_gp(optimizer, x, y)
In [58]:
optimizer.maximize(init_points=0, n_iter=1, kappa=5)
plot_gp(optimizer, x, y)
In [59]:
optimizer.maximize(init_points=0, n_iter=1, kappa=5)
plot_gp(optimizer, x, y)
In [60]:
optimizer.maximize(init_points=0, n_iter=1, kappa=5)
plot_gp(optimizer, x, y)
In [61]:
optimizer.maximize(init_points=0, n_iter=1, kappa=5)
plot_gp(optimizer, x, y)
In [62]:
optimizer.maximize(init_points=0, n_iter=1, kappa=5)
plot_gp(optimizer, x, y)
In [63]:
optimizer.maximize(init_points=0, n_iter=1, kappa=5)
plot_gp(optimizer, x, y)
In [ ]:
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier as RFC
from sklearn.svm import SVC
from bayes_opt import BayesianOptimization
from bayes_opt.util import Colours
def get_data():
"""Synthetic binary classification dataset."""
data, targets = make_classification(
n_samples=1000,
n_features=45,
n_informative=12,
n_redundant=7,
random_state=134985745,
)
return data, targets
def svc_cv(C, gamma, data, targets):
"""SVC cross validation.
This function will instantiate a SVC classifier with parameters C and
gamma. Combined with data and targets this will in turn be used to perform
cross validation. The result of cross validation is returned.
Our goal is to find combinations of C and gamma that maximizes the roc_auc
metric.
"""
estimator = SVC(C=C, gamma=gamma, random_state=2)
cval = cross_val_score(estimator, data, targets, scoring='roc_auc', cv=4)
return cval.mean()
def rfc_cv(n_estimators, min_samples_split, max_features, data, targets):
"""Random Forest cross validation.
This function will instantiate a random forest classifier with parameters
n_estimators, min_samples_split, and max_features. Combined with data and
targets this will in turn be used to perform cross validation. The result
of cross validation is returned.
Our goal is to find combinations of n_estimators, min_samples_split, and
max_features that minimzes the log loss.
"""
estimator = RFC(
n_estimators=n_estimators,
min_samples_split=min_samples_split,
max_features=max_features,
random_state=2
)
cval = cross_val_score(estimator, data, targets,
scoring='neg_log_loss', cv=4)
return cval.mean()
def optimize_svc(data, targets):
"""Apply Bayesian Optimization to SVC parameters."""
def svc_crossval(expC, expGamma):
"""Wrapper of SVC cross validation.
Notice how we transform between regular and log scale. While this
is not technically necessary, it greatly improves the performance
of the optimizer.
"""
C = 10 ** expC
gamma = 10 ** expGamma
return svc_cv(C=C, gamma=gamma, data=data, targets=targets)
optimizer = BayesianOptimization(
f=svc_crossval,
pbounds={"expC": (-3, 2), "expGamma": (-4, -1)},
random_state=1234,
verbose=2
)
optimizer.maximize(n_iter=10)
print("Final result:", optimizer.max)
def optimize_rfc(data, targets):
"""Apply Bayesian Optimization to Random Forest parameters."""
def rfc_crossval(n_estimators, min_samples_split, max_features):
"""Wrapper of RandomForest cross validation.
Notice how we ensure n_estimators and min_samples_split are casted
to integer before we pass them along. Moreover, to avoid max_features
taking values outside the (0, 1) range, we also ensure it is capped
accordingly.
"""
return rfc_cv(
n_estimators=int(n_estimators),
min_samples_split=int(min_samples_split),
max_features=max(min(max_features, 0.999), 1e-3),
data=data,
targets=targets,
)
optimizer = BayesianOptimization(
f=rfc_crossval,
pbounds={
"n_estimators": (10, 250),
"min_samples_split": (2, 25),
"max_features": (0.1, 0.999),
},
random_state=1234,
verbose=2
)
optimizer.maximize(n_iter=10)
print("Final result:", optimizer.max)
if __name__ == "__main__":
data, targets = get_data()
print(Colours.yellow("--- Optimizing SVM ---"))
optimize_svc(data, targets)
print(Colours.green("--- Optimizing Random Forest ---"))
optimize_rfc(data, targets)
In [ ]:
import time
import random
from bayes_opt import BayesianOptimization
from bayes_opt.util import UtilityFunction, Colours
import asyncio
import threading
try:
import json
import tornado.ioloop
import tornado.httpserver
from tornado.web import RequestHandler
import requests
except ImportError:
raise ImportError(
"In order to run this example you must have the libraries: " +
"`tornado` and `requests` installed."
)
def black_box_function(x, y):
"""Function with unknown internals we wish to maximize.
This is just serving as an example, however, for all intents and
purposes think of the internals of this function, i.e.: the process
which generates its outputs values, as unknown.
"""
time.sleep(random.randint(1, 7))
return -x ** 2 - (y - 1) ** 2 + 1
class BayesianOptimizationHandler(RequestHandler):
"""Basic functionality for NLP handlers."""
_bo = BayesianOptimization(
f=black_box_function,
pbounds={"x": (-4, 4), "y": (-3, 3)}
)
_uf = UtilityFunction(kind="ucb", kappa=3, xi=1)
def post(self):
"""Deal with incoming requests."""
body = tornado.escape.json_decode(self.request.body)
try:
self._bo.register(
params=body["params"],
target=body["target"],
)
print("BO has registered: {} points.".format(len(self._bo.space)), end="\n\n")
except KeyError:
pass
finally:
suggested_params = self._bo.suggest(self._uf)
self.write(json.dumps(suggested_params))
def run_optimization_app():
asyncio.set_event_loop(asyncio.new_event_loop())
handlers = [
(r"/bayesian_optimization", BayesianOptimizationHandler),
]
server = tornado.httpserver.HTTPServer(
tornado.web.Application(handlers)
)
server.listen(9009)
tornado.ioloop.IOLoop.instance().start()
def run_optimizer():
global optimizers_config
config = optimizers_config.pop()
name = config["name"]
colour = config["colour"]
register_data = {}
max_target = None
for _ in range(10):
status = name + " wants to register: {}.\n".format(register_data)
resp = requests.post(
url="http://localhost:9009/bayesian_optimization",
json=register_data,
).json()
target = black_box_function(**resp)
register_data = {
"params": resp,
"target": target,
}
if max_target is None or target > max_target:
max_target = target
status += name + " got {} as target.\n".format(target)
status += name + " will to register next: {}.\n".format(register_data)
print(colour(status), end="\n")
global results
results.append((name, max_target))
print(colour(name + " is done!"), end="\n\n")
if __name__ == "__main__":
ioloop = tornado.ioloop.IOLoop.instance()
optimizers_config = [
{"name": "optimizer 1", "colour": Colours.red},
{"name": "optimizer 2", "colour": Colours.green},
{"name": "optimizer 3", "colour": Colours.blue},
]
app_thread = threading.Thread(target=run_optimization_app)
app_thread.daemon = True
app_thread.start()
targets = (
run_optimizer,
run_optimizer,
run_optimizer
)
optimizer_threads = []
for target in targets:
optimizer_threads.append(threading.Thread(target=target))
optimizer_threads[-1].daemon = True
optimizer_threads[-1].start()
results = []
for optimizer_thread in optimizer_threads:
optimizer_thread.join()
for result in results:
print(result[0], "found a maximum value of: {}".format(result[1]))
ioloop.stop()