This notebook builds upon qlearning.ipynb
, or to be exact your implementation of QLearningAgent.
The policy we're gonna use is epsilon-greedy policy, where agent takes optimal action with probability $(1-\epsilon)$, otherwise samples action at random. Note that agent can occasionally sample optimal action during random sampling by pure chance.
In [ ]:
import sys, os
if 'google.colab' in sys.modules and not os.path.exists('.setup_complete'):
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/spring20/setup_colab.sh -O- | bash
!touch .setup_complete
# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
!bash ../xvfb start
os.environ['DISPLAY'] = ':1'
In [ ]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
You can copy your QLearningAgent
implementation from previous notebook.
In [ ]:
from collections import defaultdict
import random
import math
import numpy as np
class QLearningAgent:
def __init__(self, alpha, epsilon, discount, get_legal_actions):
"""
Q-Learning Agent
based on https://inst.eecs.berkeley.edu/~cs188/sp19/projects.html
Instance variables you have access to
- self.epsilon (exploration prob)
- self.alpha (learning rate)
- self.discount (discount rate aka gamma)
Functions you should use
- self.get_legal_actions(state) {state, hashable -> list of actions, each is hashable}
which returns legal actions for a state
- self.get_qvalue(state,action)
which returns Q(state,action)
- self.set_qvalue(state,action,value)
which sets Q(state,action) := value
!!!Important!!!
Note: please avoid using self._qValues directly.
There's a special self.get_qvalue/set_qvalue for that.
"""
self.get_legal_actions = get_legal_actions
self._qvalues = defaultdict(lambda: defaultdict(lambda: 0))
self.alpha = alpha
self.epsilon = epsilon
self.discount = discount
def get_qvalue(self, state, action):
""" Returns Q(state,action) """
return self._qvalues[state][action]
def set_qvalue(self, state, action, value):
""" Sets the Qvalue for [state,action] to the given value """
self._qvalues[state][action] = value
#---------------------START OF YOUR CODE---------------------#
def get_value(self, state):
"""
Compute your agent's estimate of V(s) using current q-values
V(s) = max_over_action Q(state,action) over possible actions.
Note: please take into account that q-values can be negative.
"""
possible_actions = self.get_legal_actions(state)
# If there are no legal actions, return 0.0
if len(possible_actions) == 0:
return 0.0
<YOUR CODE>
return value
def update(self, state, action, reward, next_state):
"""
You should do your Q-Value update here:
Q(s,a) := (1 - alpha) * Q(s,a) + alpha * (r + gamma * V(s'))
"""
# agent parameters
gamma = self.discount
learning_rate = self.alpha
<YOUR CODE>
self.set_qvalue(state, action, <YOUR CODE: Q-value> )
def get_best_action(self, state):
"""
Compute the best action to take in a state (using current q-values).
"""
possible_actions = self.get_legal_actions(state)
# If there are no legal actions, return None
if len(possible_actions) == 0:
return None
<YOUR CODE>
return best_action
def get_action(self, state):
"""
Compute the action to take in the current state, including exploration.
With probability self.epsilon, we should take a random action.
otherwise - the best policy action (self.getPolicy).
Note: To pick randomly from a list, use random.choice(list).
To pick True or False with a given probablity, generate uniform number in [0, 1]
and compare it with your probability
"""
# Pick Action
possible_actions = self.get_legal_actions(state)
action = None
# If there are no legal actions, return None
if len(possible_actions) == 0:
return None
# agent parameters:
epsilon = self.epsilon
<YOUR CODE>
return chosen_action
Now we gonna implement Expected Value SARSA on top of it.
In [ ]:
class EVSarsaAgent(QLearningAgent):
"""
An agent that changes some of q-learning functions to implement Expected Value SARSA.
Note: this demo assumes that your implementation of QLearningAgent.update uses get_value(next_state).
If it doesn't, please add
def update(self, state, action, reward, next_state):
and implement it for Expected Value SARSA's V(s')
"""
def get_value(self, state):
"""
Returns Vpi for current state under epsilon-greedy policy:
V_{pi}(s) = sum _{over a_i} {pi(a_i | s) * Q(s, a_i)}
Hint: all other methods from QLearningAgent are still accessible.
"""
epsilon = self.epsilon
possible_actions = self.get_legal_actions(state)
# If there are no legal actions, return 0.0
if len(possible_actions) == 0:
return 0.0
<YOUR CODE: see docstring>
return state_value
In [ ]:
import gym
import gym.envs.toy_text
env = gym.envs.toy_text.CliffWalkingEnv()
n_actions = env.action_space.n
print(env.__doc__)
In [ ]:
# Our cliffworld has one difference from what's on the image: there is no wall.
# Agent can choose to go as close to the cliff as it wishes. x:start, T:exit, C:cliff, o: flat ground
env.render()
In [ ]:
def play_and_train(env, agent, t_max=10**4):
"""This function should
- run a full game, actions given by agent.getAction(s)
- train agent using agent.update(...) whenever possible
- return total reward"""
total_reward = 0.0
s = env.reset()
for t in range(t_max):
a = agent.get_action(s)
next_s, r, done, _ = env.step(a)
agent.update(s, a, r, next_s)
s = next_s
total_reward += r
if done:
break
return total_reward
In [ ]:
agent_sarsa = EVSarsaAgent(alpha=0.25, epsilon=0.2, discount=0.99,
get_legal_actions=lambda s: range(n_actions))
agent_ql = QLearningAgent(alpha=0.25, epsilon=0.2, discount=0.99,
get_legal_actions=lambda s: range(n_actions))
In [ ]:
from IPython.display import clear_output
import pandas as pd
def moving_average(x, span=100):
return pd.DataFrame({'x': np.asarray(x)}).x.ewm(span=span).mean().values
rewards_sarsa, rewards_ql = [], []
for i in range(5000):
rewards_sarsa.append(play_and_train(env, agent_sarsa))
rewards_ql.append(play_and_train(env, agent_ql))
# Note: agent.epsilon stays constant
if i % 100 == 0:
clear_output(True)
print('EVSARSA mean reward =', np.mean(rewards_sarsa[-100:]))
print('QLEARNING mean reward =', np.mean(rewards_ql[-100:]))
plt.title("epsilon = %s" % agent_ql.epsilon)
plt.plot(moving_average(rewards_sarsa), label='ev_sarsa')
plt.plot(moving_average(rewards_ql), label='qlearning')
plt.grid()
plt.legend()
plt.ylim(-500, 0)
plt.show()
Let's now see what did the algorithms learn by visualizing their actions at every state.
In [ ]:
def draw_policy(env, agent):
""" Prints CliffWalkingEnv policy with arrows. Hard-coded. """
n_rows, n_cols = env._cliff.shape
actions = '^>v<'
for yi in range(n_rows):
for xi in range(n_cols):
if env._cliff[yi, xi]:
print(" C ", end='')
elif (yi * n_cols + xi) == env.start_state_index:
print(" X ", end='')
elif (yi * n_cols + xi) == n_rows * n_cols - 1:
print(" T ", end='')
else:
print(" %s " %
actions[agent.get_best_action(yi * n_cols + xi)], end='')
print()
In [ ]:
print("Q-Learning")
draw_policy(env, agent_ql)
print("SARSA")
draw_policy(env, agent_sarsa)
Here are some of the things you can do if you feel like it:
There's a powerful technique that you can use to improve sample efficiency for off-policy algorithms: [spoiler] Experience replay :)
The catch is that you can train Q-learning and EV-SARSA on <s,a,r,s'>
tuples even if they aren't sampled under current agent's policy. So here's what we're gonna do:
<s,a,r,s'>
.<s,a,r,s'>
.<s,a,r,s'>
transition in a buffer. To enable such training, first we must implement a memory structure that would act like such a buffer.
In [ ]:
import sys, os
if 'google.colab' in sys.modules and not os.path.exists('.setup_complete'):
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/spring20/setup_colab.sh -O- | bash
!touch .setup_complete
# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
!bash ../xvfb start
os.environ['DISPLAY'] = ':1'
In [ ]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from IPython.display import clear_output
In [ ]:
import random
class ReplayBuffer(object):
def __init__(self, size):
"""
Create Replay buffer.
Parameters
----------
size: int
Max number of transitions to store in the buffer. When the buffer
overflows the old memories are dropped.
Note: for this assignment you can pick any data structure you want.
If you want to keep it simple, you can store a list of tuples of (s, a, r, s') in self._storage
However you may find out there are faster and/or more memory-efficient ways to do so.
"""
self._storage = []
self._maxsize = size
# OPTIONAL: YOUR CODE
def __len__(self):
return len(self._storage)
def add(self, obs_t, action, reward, obs_tp1, done):
'''
Make sure, _storage will not exceed _maxsize.
Make sure, FIFO rule is being followed: the oldest examples has to be removed earlier
'''
data = (obs_t, action, reward, obs_tp1, done)
# add data to storage
<YOUR CODE>
def sample(self, batch_size):
"""Sample a batch of experiences.
Parameters
----------
batch_size: int
How many transitions to sample.
Returns
-------
obs_batch: np.array
batch of observations
act_batch: np.array
batch of actions executed given obs_batch
rew_batch: np.array
rewards received as results of executing act_batch
next_obs_batch: np.array
next set of observations seen after executing act_batch
done_mask: np.array
done_mask[i] = 1 if executing act_batch[i] resulted in
the end of an episode and 0 otherwise.
"""
idxes = <YOUR CODE: randomly generate batch_size integers to be used as indexes of samples>
# collect <s,a,r,s',done> for each index
<YOUR CODE>
return (
np.array( <YOUR CODE: states> ),
np.array( <YOUR CODE: actions> ),
np.array( <YOUR CODE: rewards> ),
np.array( <YOUR CODE: next_states> ),
np.array( <YOUR CODE: is_done>,
)
Some tests to make sure your buffer works right
In [ ]:
def obj2arrays(obj):
for x in obj:
yield np.array([x])
def obj2sampled(obj):
return tuple(obj2arrays(obj))
replay = ReplayBuffer(2)
obj1 = (0, 1, 2, 3, True)
obj2 = (4, 5, 6, 7, False)
replay.add(*obj1)
assert replay.sample(1) == obj2sampled(obj1), \
"If there's just one object in buffer, it must be retrieved by buf.sample(1)"
replay.add(*obj2)
assert len(replay) == 2, "Please make sure __len__ methods works as intended."
replay.add(*obj2)
assert len(replay) == 2, "When buffer is at max capacity, replace objects instead of adding new ones."
assert tuple(np.unique(a) for a in replay.sample(100)) == obj2sampled(obj2)
replay.add(*obj1)
assert max(len(np.unique(a)) for a in replay.sample(100)) == 2
replay.add(*obj1)
assert tuple(np.unique(a) for a in replay.sample(100)) == obj2sampled(obj1)
print("Success!")
Now let's use this buffer to improve training:
In [ ]:
import gym
env = gym.make("Taxi-v3")
n_actions = env.action_space.n
In [ ]:
def play_and_train_with_replay(env, agent, replay=None,
t_max=10**4, replay_batch_size=32):
"""
This function should
- run a full game, actions given by agent.getAction(s)
- train agent using agent.update(...) whenever possible
- return total reward
:param replay: ReplayBuffer where agent can store and sample (s,a,r,s',done) tuples.
If None, do not use experience replay
"""
total_reward = 0.0
s = env.reset()
for t in range(t_max):
# get agent to pick action given state s
a = <YOUR CODE>
next_s, r, done, _ = env.step(a)
# update agent on current transition. Use agent.update
<YOUR CODE>
if replay is not None:
# store current <s,a,r,s'> transition in buffer
<YOUR CODE>
# sample replay_batch_size random transitions from replay,
# then update agent on each of them in a loop
s_, a_, r_, next_s_, done_ = replay.sample(replay_batch_size)
for i in range(replay_batch_size):
<YOUR CODE>
s = next_s
total_reward += r
if done:
break
return total_reward
In [ ]:
# Create two agents: first will use experience replay, second will not.
agent_baseline = QLearningAgent(
alpha=0.5, epsilon=0.25, discount=0.99,
get_legal_actions=lambda s: range(n_actions))
agent_replay = QLearningAgent(
alpha=0.5, epsilon=0.25, discount=0.99,
get_legal_actions=lambda s: range(n_actions))
replay = ReplayBuffer(1000)
In [ ]:
from IPython.display import clear_output
import pandas as pd
def moving_average(x, span=100):
return pd.DataFrame({'x': np.asarray(x)}).x.ewm(span=span).mean().values
rewards_replay, rewards_baseline = [], []
for i in range(1000):
rewards_replay.append(
play_and_train_with_replay(env, agent_replay, replay))
rewards_baseline.append(
play_and_train_with_replay(env, agent_baseline, replay=None))
agent_replay.epsilon *= 0.99
agent_baseline.epsilon *= 0.99
if i % 100 == 0:
clear_output(True)
print('Baseline : eps =', agent_replay.epsilon,
'mean reward =', np.mean(rewards_baseline[-10:]))
print('ExpReplay: eps =', agent_baseline.epsilon,
'mean reward =', np.mean(rewards_replay[-10:]))
plt.plot(moving_average(rewards_replay), label='exp. replay')
plt.plot(moving_average(rewards_baseline), label='baseline')
plt.grid()
plt.legend()
plt.show()
Experience replay, if implemented correctly, will improve algorithm's initial convergence a lot, but it shouldn't affect the final performance.
We will use the code you just wrote extensively in the next week of our course. If you're feeling that you need more examples to understand how experience replay works, try using it for binarized state spaces (CartPole or other classic control envs).
Next week we're gonna explore how q-learning and similar algorithms can be applied for large state spaces, with deep learning models to approximate the Q function.
However, the code you've written for this week is already capable of solving many RL problems, and as an added benifit - it is very easy to detach. You can use Q-learning, SARSA and Experience Replay for any RL problems you want to solve - just thow 'em into a file and import the stuff you need.
There's a number of advanced algorithms you can find in week 3 materials (Silver lecture II and/or reading about eligibility traces). One such algorithm is TD(lambda), which is based on the idea of eligibility traces. You can also view it as a combination of N-step updates for alll N.
Here's a practical algorithm you can start with: url
Implementing this algorithm will prove more challenging than Q-learning or SARSA, but doing so will earn you a deeper understanding of how value-based methods work [in addition to some bonus points].
More kudos for comparing and analyzing TD($\lambda$) against Q-learning and EV-SARSA in different setups (taxi vs cartpole, constant epsilon vs decreasing epsilon).
In [ ]: