In [1]:
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
In [2]:
class Model(object):
def __init__(self, algo, algo_kwargs, weights=[0.1, 0.1], size=100, repeats=200):
self.algo = algo
self.algo_kwargs = algo_kwargs
self.weights = weights
self.size = size
self.repeats = repeats
def run_simulation(self):
"""Run a single simulation, recording the performance"""
algo = self.algo(**self.algo_kwargs)
arm_choice_record = []
reward_record = []
for i in range(self.size):
arm = algo.choose_arm()
arm_choice_record.append(arm)
reward = np.random.random() < self.weights[arm]
reward_record.append(reward)
algo.update(arm, reward)
return arm_choice_record, reward_record
def repeat_simulation(self):
"""Run multiple simulations, recording the performance of each"""
arm_choice = []
reward = []
for i in range(self.repeats):
arm_choice_record, reward_record = self.run_simulation()
arm_choice.append(arm_choice_record)
reward.append(reward_record)
self.arm_choice = np.array(arm_choice)
self.reward = np.array(reward)
def plot_arm_frequency(self, ax, color="k"):
"""Plot the frequency with which the second arm is chosen
NOTE: Currently only works for two arms"""
ax.plot(self.arm_choice.mean(axis=0), color)
ax.set_title('Frequency of arm choice')
ax.set_xlabel('Trial')
ax.set_ylabel('Frequency')
return ax
def plot_reward(self, ax, color='-k'):
"""Plot the average reward for each trial across all simulations"""
ax.plot(self.reward.mean(axis=0), color)
ax.set_title('Reward')
ax.set_xlabel('Trial')
ax.set_ylabel('Reward')
return ax
def plot_cumulative_reward(self, ax, color='-k'):
"""Plot the cumulative reward across all simulations"""
ax.plot(np.cumsum(self.reward, axis=1).mean(axis=0), color)
ax.set_title('Cumulative Reward')
ax.set_xlabel('Trial')
ax.set_ylabel('Cumulative Reward')
return ax
In [3]:
class RCT(object):
def __init__(self, n_arms, epsilon):
self._epsilon = epsilon
self.counts = [0] * n_arms
self.values = [0] * n_arms
self.n_arms = n_arms
def choose_arm(self):
"""Choose an arm"""
if np.random.random() > self.epsilon:
weights = np.array(self.values)
weights = weights == weights.max()
weights = weights / weights.sum()
return np.random.choice(np.arange(self.n_arms), p=weights)
else:
return np.random.randint(self.n_arms)
def update(self, arm, reward):
"""Update an arm with the reward"""
self.counts[arm] = self.counts[arm] + 1
n = self.counts[arm]
value = self.values[arm]
# Running product
self.values[arm] = ((n - 1) / n) * self.values[arm] + (1/n) * reward
@property
def epsilon(self):
if sum(self.counts) < 100:
return 1
else:
return 0
In [4]:
model = Model(RCT, {'n_arms': 2, 'epsilon':0.05}, weights=[0.1, 0.2])
model.repeat_simulation()
fig, axes = plt.subplots(1,3, figsize=(18,6))
model.plot_arm_frequency(ax=axes[0])
model.plot_reward(ax=axes[1])
model.plot_cumulative_reward(ax=axes[2])
plt.show()
In [5]:
class EpsilonGreedy(RCT):
@property
def epsilon(self):
return self._epsilon
model = Model(EpsilonGreedy, {'n_arms': 2, 'epsilon':0.05}, weights=[0.1, 0.2])
model.repeat_simulation()
fig, axes = plt.subplots(1,3, figsize=(18,6))
model.plot_arm_frequency(ax=axes[0])
model.plot_reward(ax=axes[1])
model.plot_cumulative_reward(ax=axes[2])
plt.show()
The same code can be used for both #2 and #3 so only #3 will be covered:
In [6]:
def plot_arm_frequency(simulation, ax, marker='.', linestyle='', color='k', label=''):
"""Plot the frequency with which the second arm is chosen
NOTE: Currently only works for two arms"""
ax.plot(simulation.arm_choice.mean(axis=0),
marker=marker, linestyle=linestyle, color=color, label=label)
ax.set_title('Frequency of arm choice')
ax.set_xlabel('Trial')
ax.set_ylabel('Frequency')
return ax
def plot_reward(simulation, ax, marker='.', linestyle='-', color='k', label=''):
"""Plot the average reward for each trial across all simulations"""
ax.plot(simulation.reward.mean(axis=0),
marker=marker, linestyle=linestyle, color=color, label=label)
ax.set_title('Reward')
ax.set_xlabel('Trial')
ax.set_ylabel('Reward')
return ax
def plot_cumulative_reward(simulation, ax, marker='', linestyle='-', color='k', label=''):
"""Plot the cumulative reward across all simulations"""
ax.plot(np.cumsum(simulation.reward, axis=1).mean(axis=0),
marker=marker, linestyle=linestyle, color=color, label=label)
ax.set_title('Cumulative Reward')
ax.set_xlabel('Trial')
ax.set_ylabel('Cumulative Reward')
return ax
def plot_summary(model, axes, color='', label=''):
plot_arm_frequency(model, ax=axes[0], color=color, label=label)
plot_reward(model, ax=axes[1], color=color, label=label)
plot_cumulative_reward(model, ax=axes[2], color=color, label=label)
for ax in axes:
ax.legend(loc=4)
return axes
fig, axes = plt.subplots(1,3, figsize=(18,6))
model = Model(EpsilonGreedy, {'n_arms': 2, 'epsilon':0.05}, weights=[0.1, 0.2])
model.repeat_simulation()
plot_summary(model, axes, color='k', label='epsilon=0.05')
model = Model(EpsilonGreedy, {'n_arms': 2, 'epsilon':0.25}, weights=[0.1, 0.2])
model.repeat_simulation()
plot_summary(model, axes, color='b', label='epsilon=0.25')
model = Model(EpsilonGreedy, {'n_arms': 2, 'epsilon':0.5}, weights=[0.1, 0.2])
model.repeat_simulation()
plot_summary(model, axes, color='c', label='epsilon=0.5')
plt.show()
The Adaptive Epsilon Greedy algorithm for #4 should change the value of epsilon (and the likelihood of choosing randomly) as the number of trials increase. There are many ways this could be implemented:
In [12]:
t = np.arange(1000)
plt.plot(t, (1+t)**-0.5, label='1/sqrt')
plt.plot(t, (1+t)**-0.2, label='1/5th-root')
plt.plot(t, np.exp(-(t/200.)), label='exp^-t/200')
u = np.concatenate((np.ones(100),
np.ones(200) * 0.75,
np.ones(200) * 0.5,
np.ones(200) * 0.25,
np.ones(300) * 0.05))
plt.plot(t,u, label='Steps')
plt.legend()
plt.show()
In [7]:
class AdaptiveEpsilonGreedy(RCT):
@property
def epsilon(self):
return self._epsilon * np.exp(-(sum(self.counts)/200.))
fig, axes = plt.subplots(1,3, figsize=(18,6))
model = Model(EpsilonGreedy, {'n_arms': 2, 'epsilon':0.05}, weights=[0.1, 0.2], size=1000)
model.repeat_simulation()
plot_summary(model, axes, color='k', label='epsilon=0.05')
model = Model(AdaptiveEpsilonGreedy, {'n_arms': 2, 'epsilon':1.00}, weights=[0.1, 0.2], size=1000)
model.repeat_simulation()
plot_summary(model, axes, color='c', label='AdaptiveEpsilonGreedy')
plt.show()
The falling epsilon value means the AdaptiveEpsilonGreedy algorithm will explore less and less of the time, instead exploiting the arm it knows to be best. This has advantages and disadvantages. If the environment is stable the cumulative reward will be higher but it will be very slow to respond to any changes.
For example, if the rewards for the two arms were to switch:
In [13]:
class DynamicModel(object):
def __init__(self, algo, algo_kwargs, weights=[0.1, 0.1], size=100, repeats=200):
self.algo = algo
self.algo_kwargs = algo_kwargs
self.weights = weights
self.size = size
self.repeats = repeats
def run_simulation(self):
"""Run a single simulation, recording the performance"""
algo = self.algo(**self.algo_kwargs)
arm_choice_record = []
reward_record = []
weights = self.weights[:]
for i in range(self.size):
arm = algo.choose_arm()
arm_choice_record.append(arm)
reward = np.random.random() < weights[arm]
reward_record.append(reward)
algo.update(arm, reward)
if i == self.size / 2:
#print('Switching rewards')
weights[0], weights[1] = weights[1], weights[0]
return arm_choice_record, reward_record
def repeat_simulation(self):
"""Run multiple simulations, recording the performance of each"""
arm_choice = []
reward = []
for i in range(self.repeats):
arm_choice_record, reward_record = self.run_simulation()
arm_choice.append(arm_choice_record)
reward.append(reward_record)
self.arm_choice = np.array(arm_choice)
self.reward = np.array(reward)
fig, axes = plt.subplots(1,3, figsize=(18,6))
model = DynamicModel(EpsilonGreedy, {'n_arms': 2, 'epsilon':0.05}, weights=[0.1, 0.2], size=2000)
model.repeat_simulation()
plot_summary(model, axes, color='k', label='epsilon=0.05')
model = DynamicModel(AdaptiveEpsilonGreedy, {'n_arms': 2, 'epsilon':1.00}, weights=[0.1, 0.2], size=2000)
model.repeat_simulation()
plot_summary(model, axes, color='c', label='AdaptiveEpsilonGreedy')
plt.show()
There are two reasons why these algorithms are so slow to respond:
This second issue can be addressed by giving recent experience greater value than experience from further in the past.
In [14]:
class DynamicEpsilonGreedy(AdaptiveEpsilonGreedy):
def update(self, arm, reward):
"""Update an arm with the reward"""
self.counts[arm] = self.counts[arm] + 1
n = self.counts[arm]
# New experiences will represent at least 100th of the running value estimation
if n > 100:
n = 100
value = self.values[arm]
self.values[arm] = ((n - 1) / n) * self.values[arm] + (1/n) * reward
fig, axes = plt.subplots(1,3, figsize=(18,6))
model = DynamicModel(EpsilonGreedy, {'n_arms': 2, 'epsilon':0.05}, weights=[0.1, 0.2], size=2000)
model.repeat_simulation()
plot_summary(model, axes, color='k', label='epsilon=0.05')
model = DynamicModel(AdaptiveEpsilonGreedy, {'n_arms': 2, 'epsilon':1.00}, weights=[0.1, 0.2], size=2000)
model.repeat_simulation()
plot_summary(model, axes, color='c', label='AdaptiveEpsilonGreedy')
model = DynamicModel(DynamicEpsilonGreedy, {'n_arms': 2, 'epsilon':1.00}, weights=[0.1, 0.2], size=2000)
model.repeat_simulation()
plot_summary(model, axes, color='r', label='DynamicEpsilonGreedy')
plt.show()
In [ ]: