In [1]:
from abc import ABCMeta, abstractmethod, abstractproperty
import enum
import numpy as np
np.set_printoptions(precision=3)
np.set_printoptions(suppress=True)
import pandas
from matplotlib import pyplot as plt
%matplotlib inline
We are going to implement several exploration strategies for simplest problem - bernoulli bandit.
The bandit has $K$ actions. Action produce 1.0 reward $r$ with probability $0 \le \theta_k \le 1$ which is unknown to agent, but fixed over time. Agent's objective is to minimize regret over fixed number $T$ of action selections:
$$\rho = T\theta^* - \sum_{t=1}^T r_t$$Where $\theta^* = \max_k\{\theta_k\}$
Real-world analogy:
Clinical trials - we have $K$ pills and $T$ ill patient. After taking pill, patient is cured with probability $\theta_k$. Task is to find most efficient pill.
A research on clinical trials - https://arxiv.org/pdf/1507.08025.pdf
In [2]:
class BernoulliBandit:
def __init__(self, n_actions=5):
self._probs = np.random.random(n_actions)
np.random.seed(1234)
@property
def action_count(self):
return len(self._probs)
def pull(self, action):
if np.any(np.random.random() > self._probs[action]):
return 0.0
return 1.0
def optimal_reward(self):
""" Used for regret calculation
"""
return np.max(self._probs)
def step(self):
""" Used in nonstationary version
"""
pass
def reset(self):
""" Used in nonstationary version
"""
In [3]:
class AbstractAgent(metaclass=ABCMeta):
def init_actions(self, n_actions):
self._successes = np.zeros(n_actions)
self._failures = np.zeros(n_actions)
self._total_pulls = 0
@abstractmethod
def get_action(self, trial):
"""
Get current best action
:rtype: int
"""
pass
def update(self, action, reward):
"""
Observe reward from action and update agent's internal parameters
:type action: int
:type reward: int
"""
self._total_pulls += 1
if reward == 1:
self._successes[action] += 1
else:
self._failures[action] += 1
@property
def name(self):
return self.__class__.__name__
class RandomAgent(AbstractAgent):
def get_action(self):
return np.random.randint(0, len(self._successes))
for $t = 1,2,...$ do
for $k = 1,...,K$ do
$\hat\theta_k \leftarrow \alpha_k / (\alpha_k + \beta_k)$
end for
$x_t \leftarrow argmax_{k}\hat\theta$ with probability $1 - \epsilon$ or random action with probability $\epsilon$
Apply $x_t$ and observe $r_t$
$(\alpha_{x_t}, \beta_{x_t}) \leftarrow (\alpha_{x_t}, \beta_{x_t}) + (r_t, 1-r_t)$
end for
Implement the algorithm above in the cell below:
In [4]:
class EpsilonGreedyAgent(AbstractAgent):
def __init__(self, epsilon = 0.01):
self._epsilon = epsilon
def get_action(self):
# YOUR CODE HERE
p_random = np.random.sample()
if p_random < self._epsilon:
xt = np.random.randint(0, 1)
else:
n_actions = len(self._successes) #K
theta = np.zeros(n_actions)
for k in range(n_actions):
theta[k] = self._successes[k] / (self._successes[k] + self._failures[k] + 0.1)
xt = np.argmax(theta)
return xt
@property
def name(self):
return self.__class__.__name__ + "(epsilon={})".format(self._epsilon)
Epsilon-greedy strategy heve no preference for actions. It would be better to select among actions that are uncertain or have potential to be optimal. One can come up with idea of index for each action that represents otimality and uncertainty at the same time. One efficient way to do it is to use UCB1 algorithm:
for $t = 1,2,...$ do
for $k = 1,...,K$ do
$w_k \leftarrow \alpha_k / (\alpha_k + \beta_k) + \sqrt{2log\ t \ / \ (\alpha_k + \beta_k)}$
end for
$x_t \leftarrow argmax_{k}w$
Apply $x_t$ and observe $r_t$
$(\alpha_{x_t}, \beta_{x_t}) \leftarrow (\alpha_{x_t}, \beta_{x_t}) + (r_t, 1-r_t)$
end for
Note: in practice, one can multiply $\sqrt{2log\ t \ / \ (\alpha_k + \beta_k)}$ by some tunable parameter to regulate agent's optimism and wilingness to abandon non-promising actions.
More versions and optimality analysis - https://homes.di.unimi.it/~cesabian/Pubblicazioni/ml-02.pdf
In [5]:
class UCBAgent(AbstractAgent):
def get_action(self):
# YOUR CODE HERE
n_actions = len(self._successes) #K
w = np.zeros(n_actions)
for k in range(n_actions):
denom = (self._successes[k] + self._failures[k] + 0.1)
factor1 = self._successes[k] / denom
factor2 = np.sqrt(2 * np.log(self._total_pulls + 0.1) / denom)
w[k] = factor1 + factor2
xt = np.argmax(w)
return xt
@property
def name(self):
return self.__class__.__name__
UCB1 algorithm does not take into account actual distribution of rewards. If we know the distribution - we can do much better by using Thompson sampling:
for $t = 1,2,...$ do
for $k = 1,...,K$ do
Sample $\hat\theta_k \sim beta(\alpha_k, \beta_k)$
end for
$x_t \leftarrow argmax_{k}\hat\theta$
Apply $x_t$ and observe $r_t$
$(\alpha_{x_t}, \beta_{x_t}) \leftarrow (\alpha_{x_t}, \beta_{x_t}) + (r_t, 1-r_t)$
end for
More on Tompson Sampling: https://web.stanford.edu/~bvr/pubs/TS_Tutorial.pdf
In [10]:
#from numpy.random import beta
from scipy.stats import beta
class ThompsonSamplingAgent(AbstractAgent):
def get_action(self):
# YOUR CODE HERE
n_actions = len(self._successes) #K
theta = np.zeros(n_actions)
for k in range(n_actions):
if (self._successes[k] == 0) or (self._failures[k] == 0):
theta[k] = np.random.randint(0, 1)
else:
#theta[k] = beta.rvs(self._successes[k], self._failures[k], size=1)
theta[k] = beta.median(self._successes[k], self._failures[k])
xt = np.argmax(theta)
return xt
@property
def name(self):
return self.__class__.__name__
In [11]:
from collections import OrderedDict
def get_regret(env, agents, n_steps=5000, n_trials=50):
scores = OrderedDict({
agent.name: [0.0 for step in range(n_steps)] for agent in agents
})
for trial in range(n_trials):
env.reset()
for a in agents:
a.init_actions(env.action_count)
for i in range(n_steps):
optimal_reward = env.optimal_reward()
for agent in agents:
action = agent.get_action()
reward = env.pull(action)
agent.update(action, reward)
scores[agent.name][i] += optimal_reward - reward
env.step() # change bandit's state if it is unstationary
for agent in agents:
scores[agent.name] = np.cumsum(scores[agent.name]) / n_trials
return scores
def plot_regret(agents, scores):
for agent in agents:
plt.plot(scores[agent.name])
plt.legend([agent.name for agent in agents])
plt.ylabel("regret")
plt.xlabel("steps")
plt.show()
In [12]:
# Uncomment agents
agents = [
EpsilonGreedyAgent(),
UCBAgent(),
ThompsonSamplingAgent()
]
regret = get_regret(BernoulliBandit(), agents, n_steps=10000, n_trials=10)
plot_regret(agents, regret)
In [13]:
from submit import submit_bandits
submit_bandits(agents, regret, "tonatiuh_rangel@hotmail.com", "mPIwUddSXNqZ1NsH")
In [ ]: