In [ ]:
import sys, os
if 'google.colab' in sys.modules and not os.path.exists('.setup_complete'):
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/spring20/setup_colab.sh -O- | bash
!touch .setup_complete
# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
!bash ../xvfb start
os.environ['DISPLAY'] = ':1'
In [ ]:
import gym
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
In [ ]:
env = gym.make("CartPole-v0").env
env.reset()
n_actions = env.action_space.n
state_dim = env.observation_space.shape
plt.imshow(env.render("rgb_array"))
env.close()
To train a neural network policy one must have a neural network policy. Let's build it.
Since we're working with a pre-extracted features (cart positions, angles and velocities), we don't need a complicated network yet. In fact, let's build something like this for starters:
For your first run, please only use linear layers (nn.Linear
) and activations. Stuff like batch normalization or dropout may ruin everything if used haphazardly.
Also please avoid using nonlinearities like sigmoid & tanh: since agent's observations are not normalized, sigmoids might be saturated at initialization. Instead, use non-saturating nonlinearities like ReLU.
Ideally you should start small with maybe 1-2 hidden layers with < 200 neurons and then increase network size if agent doesn't beat the target score.
In [ ]:
import torch
import torch.nn as nn
import torch.nn.functional as F
In [ ]:
network = nn.Sequential()
network.add_module('layer1', <YOUR CODE>)
<YOUR CODE: stack layers!!!1>
# hint: use state_dim[0] as input size
In [ ]:
def get_action(state, epsilon=0):
"""
sample actions with epsilon-greedy policy
recap: with p = epsilon pick random action, else pick action with highest Q(s,a)
"""
state = torch.tensor(state[None], dtype=torch.float32)
q_values = network(state).detach().numpy()
<YOUR CODE>
return int( <YOUR CODE: epsilon-greedily selected action> )
In [ ]:
s = env.reset()
assert tuple(network(torch.tensor([s]*3, dtype=torch.float32)).size()) == (
3, n_actions), "please make sure your model maps state s -> [Q(s,a0), ..., Q(s, a_last)]"
assert isinstance(list(network.modules(
))[-1], nn.Linear), "please make sure you predict q-values without nonlinearity (ignore if you know what you're doing)"
assert isinstance(get_action(
s), int), "get_action(s) must return int, not %s. try int(action)" % (type(get_action(s)))
# test epsilon-greedy exploration
for eps in [0., 0.1, 0.5, 1.0]:
state_frequencies = np.bincount(
[get_action(s, epsilon=eps) for i in range(10000)], minlength=n_actions)
best_action = state_frequencies.argmax()
assert abs(state_frequencies[best_action] -
10000 * (1 - eps + eps / n_actions)) < 200
for other_action in range(n_actions):
if other_action != best_action:
assert abs(state_frequencies[other_action] -
10000 * (eps / n_actions)) < 200
print('e=%.1f tests passed' % eps)
We shall now train our agent's Q-function by minimizing the TD loss: $$ L = { 1 \over N} \sum_i (Q_{\theta}(s,a) - [r(s,a) + \gamma \cdot max_{a'} Q_{-}(s', a')]) ^2 $$
Where
The tricky part is with $Q_{-}(s',a')$. From an engineering standpoint, it's the same as $Q_{\theta}$ - the output of your neural network policy. However, when doing gradient descent, we won't propagate gradients through it to make training more stable (see lectures).
To do so, we shall use x.detach()
function which basically says "consider this thing constant when doingbackprop".
In [ ]:
def compute_td_loss(states, actions, rewards, next_states, is_done, gamma=0.99, check_shapes=False):
""" Compute td loss using torch operations only. Use the formula above. """
states = torch.tensor(
states, dtype=torch.float32) # shape: [batch_size, state_size]
actions = torch.tensor(actions, dtype=torch.long) # shape: [batch_size]
rewards = torch.tensor(rewards, dtype=torch.float32) # shape: [batch_size]
# shape: [batch_size, state_size]
next_states = torch.tensor(next_states, dtype=torch.float32)
is_done = torch.tensor(is_done, dtype=torch.uint8) # shape: [batch_size]
# get q-values for all actions in current states
predicted_qvalues = network(states)
# select q-values for chosen actions
predicted_qvalues_for_actions = predicted_qvalues[
range(states.shape[0]), actions
]
# compute q-values for all actions in next states
predicted_next_qvalues = <YOUR CODE>
# compute V*(next_states) using predicted next q-values
next_state_values = <YOUR CODE>
assert next_state_values.dtype == torch.float32
# compute "target q-values" for loss - it's what's inside square parentheses in the above formula.
target_qvalues_for_actions = <YOUR CODE>
# at the last state we shall use simplified formula: Q(s,a) = r(s,a) since s' doesn't exist
target_qvalues_for_actions = torch.where(
is_done, rewards, target_qvalues_for_actions)
# mean squared error loss to minimize
loss = torch.mean((predicted_qvalues_for_actions -
target_qvalues_for_actions.detach()) ** 2)
if check_shapes:
assert predicted_next_qvalues.data.dim(
) == 2, "make sure you predicted q-values for all actions in next state"
assert next_state_values.data.dim(
) == 1, "make sure you computed V(s') as maximum over just the actions axis and not all axes"
assert target_qvalues_for_actions.data.dim(
) == 1, "there's something wrong with target q-values, they must be a vector"
return loss
In [ ]:
# sanity checks
s = env.reset()
a = env.action_space.sample()
next_s, r, done, _ = env.step(a)
loss = compute_td_loss([s], [a], [r], [next_s], [done], check_shapes=True)
loss.backward()
assert len(loss.size()) == 0, "you must return scalar loss - mean over batch"
assert np.any(next(network.parameters()).grad.detach().numpy() !=
0), "loss must be differentiable w.r.t. network weights"
In [ ]:
opt = torch.optim.Adam(network.parameters(), lr=1e-4)
In [ ]:
def generate_session(env, t_max=1000, epsilon=0, train=False):
"""play env with approximate q-learning agent and train it at the same time"""
total_reward = 0
s = env.reset()
for t in range(t_max):
a = get_action(s, epsilon=epsilon)
next_s, r, done, _ = env.step(a)
if train:
opt.zero_grad()
compute_td_loss([s], [a], [r], [next_s], [done]).backward()
opt.step()
total_reward += r
s = next_s
if done:
break
return total_reward
In [ ]:
epsilon = 0.5
In [ ]:
for i in range(1000):
session_rewards = [generate_session(env, epsilon=epsilon, train=True) for _ in range(100)]
print("epoch #{}\tmean reward = {:.3f}\tepsilon = {:.3f}".format(i, np.mean(session_rewards), epsilon))
epsilon *= 0.99
assert epsilon >= 1e-4, "Make sure epsilon is always nonzero during training"
if np.mean(session_rewards) > 300:
print("You Win!")
break
Welcome to the f.. world of deep f...n reinforcement learning. Don't expect agent's reward to smoothly go up. Hope for it to go increase eventually. If it deems you worthy.
Seriously though,
As usual, we now use gym.wrappers.Monitor
to record a video of our agent playing the game. Unlike our previous attempts with state binarization, this time we expect our agent to act (or fail) more smoothly since there's no more binarization error at play.
As you already did with tabular q-learning, we set epsilon=0 for final evaluation to prevent agent from exploring himself to death.
In [ ]:
# Record sessions
import gym.wrappers
with gym.wrappers.Monitor(gym.make("CartPole-v0"), directory="videos", force=True) as env_monitor:
sessions = [generate_session(env_monitor, epsilon=0, train=False) for _ in range(100)]
In [ ]:
# Show video. This may not work in some setups. If it doesn't
# work for you, you can download the videos and view them locally.
from pathlib import Path
from IPython.display import HTML
video_names = sorted([s for s in Path('videos').iterdir() if s.suffix == '.mp4'])
HTML("""
<video width="640" height="480" controls>
<source src="{}" type="video/mp4">
</video>
""".format(video_names[-1])) # You can also try other indices