In [ ]:
import sys, os
if 'google.colab' in sys.modules and not os.path.exists('.setup_complete'):
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/spring20/setup_colab.sh -O- | bash
!touch .setup_complete
# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
!bash ../xvfb start
os.environ['DISPLAY'] = ':1'
In [ ]:
import gym
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
A caveat: we have received reports that the following cell may crash with NameError: name 'base' is not defined
. The suggested workaround is to install gym==0.14.0
and pyglet==1.3.2
.
In [ ]:
env = gym.make("CartPole-v0")
# gym compatibility: unwrap TimeLimit
if hasattr(env, '_max_episode_steps'):
env = env.env
env.reset()
n_actions = env.action_space.n
state_dim = env.observation_space.shape
plt.imshow(env.render("rgb_array"))
For REINFORCE algorithm, we'll need a model that predicts action probabilities given states.
For numerical stability, please do not include the softmax layer into your network architecture. We'll use softmax or log-softmax where appropriate.
In [ ]:
import torch
import torch.nn as nn
In [ ]:
# Build a simple neural network that predicts policy logits.
# Keep it simple: CartPole isn't worth deep architectures.
model = nn.Sequential(
<YOUR CODE: define a neural network that predicts policy logits>
)
Note: output value of this function is not a torch tensor, it's a numpy array.
So, here gradient calculation is not needed.
Use no_grad
to suppress gradient calculation.
Also, .detach()
(or legacy .data
property) can be used instead, but there is a difference:
With .detach()
computational graph is built but then disconnected from a particular tensor,
so .detach()
should be used if that graph is needed for backprop via some other (not detached) tensor;
In contrast, no graph is built by any operation in no_grad()
context, thus it's preferable here.
In [ ]:
def predict_probs(states):
"""
Predict action probabilities given states.
:param states: numpy array of shape [batch, state_shape]
:returns: numpy array of shape [batch, n_actions]
"""
# convert states, compute logits, use softmax to get probability
<YOUR CODE>
return <YOUR CODE>
In [ ]:
test_states = np.array([env.reset() for _ in range(5)])
test_probas = predict_probs(test_states)
assert isinstance(test_probas, np.ndarray), \
"you must return np array and not %s" % type(test_probas)
assert tuple(test_probas.shape) == (test_states.shape[0], env.action_space.n), \
"wrong output shape: %s" % np.shape(test_probas)
assert np.allclose(np.sum(test_probas, axis=1), 1), "probabilities do not sum to 1"
In [ ]:
def generate_session(env, t_max=1000):
"""
Play a full session with REINFORCE agent.
Returns sequences of states, actions, and rewards.
"""
# arrays to record session
states, actions, rewards = [], [], []
s = env.reset()
for t in range(t_max):
# action probabilities array aka pi(a|s)
action_probs = predict_probs(np.array([s]))[0]
# Sample action with given probabilities.
a = <YOUR CODE>
new_s, r, done, info = env.step(a)
# record session history to train later
states.append(s)
actions.append(a)
rewards.append(r)
s = new_s
if done:
break
return states, actions, rewards
In [ ]:
# test it
states, actions, rewards = generate_session(env)
In [ ]:
def get_cumulative_rewards(rewards, # rewards at each step
gamma=0.99 # discount for reward
):
"""
Take a list of immediate rewards r(s,a) for the whole session
and compute cumulative returns (a.k.a. G(s,a) in Sutton '16).
G_t = r_t + gamma*r_{t+1} + gamma^2*r_{t+2} + ...
A simple way to compute cumulative rewards is to iterate from the last
to the first timestep and compute G_t = r_t + gamma*G_{t+1} recurrently
You must return an array/list of cumulative rewards with as many elements as in the initial rewards.
"""
<YOUR CODE>
return <YOUR CODE: array of cumulative rewards>
In [ ]:
get_cumulative_rewards(rewards)
assert len(get_cumulative_rewards(list(range(100)))) == 100
assert np.allclose(
get_cumulative_rewards([0, 0, 1, 0, 0, 1, 0], gamma=0.9),
[1.40049, 1.5561, 1.729, 0.81, 0.9, 1.0, 0.0])
assert np.allclose(
get_cumulative_rewards([0, 0, 1, -2, 3, -4, 0], gamma=0.5),
[0.0625, 0.125, 0.25, -1.5, 1.0, -4.0, 0.0])
assert np.allclose(
get_cumulative_rewards([0, 0, 1, 2, 3, 4, 0], gamma=0),
[0, 0, 1, 2, 3, 4, 0])
print("looks good!")
We now need to define objective and update over policy gradient.
Our objective function is
$$ J \approx { 1 \over N } \sum_{s_i,a_i} G(s_i,a_i) $$REINFORCE defines a way to compute the gradient of the expected reward with respect to policy parameters. The formula is as follows:
$$ \nabla_\theta \hat J(\theta) \approx { 1 \over N } \sum_{s_i, a_i} \nabla_\theta \log \pi_\theta (a_i \mid s_i) \cdot G_t(s_i, a_i) $$We can abuse PyTorch's capabilities for automatic differentiation by defining our objective function as follows:
$$ \hat J(\theta) \approx { 1 \over N } \sum_{s_i, a_i} \log \pi_\theta (a_i \mid s_i) \cdot G_t(s_i, a_i) $$When you compute the gradient of that function with respect to network weights $\theta$, it will become exactly the policy gradient.
In [ ]:
def to_one_hot(y_tensor, ndims):
""" helper: take an integer vector and convert it to 1-hot matrix. """
y_tensor = y_tensor.type(torch.LongTensor).view(-1, 1)
y_one_hot = torch.zeros(
y_tensor.size()[0], ndims).scatter_(1, y_tensor, 1)
return y_one_hot
In [ ]:
# Your code: define optimizers
optimizer = torch.optim.Adam(model.parameters(), 1e-3)
def train_on_session(states, actions, rewards, gamma=0.99, entropy_coef=1e-2):
"""
Takes a sequence of states, actions and rewards produced by generate_session.
Updates agent's weights by following the policy gradient above.
Please use Adam optimizer with default parameters.
"""
# cast everything into torch tensors
states = torch.tensor(states, dtype=torch.float32)
actions = torch.tensor(actions, dtype=torch.int32)
cumulative_returns = np.array(get_cumulative_rewards(rewards, gamma))
cumulative_returns = torch.tensor(cumulative_returns, dtype=torch.float32)
# predict logits, probas and log-probas using an agent.
logits = model(states)
probs = nn.functional.softmax(logits, -1)
log_probs = nn.functional.log_softmax(logits, -1)
assert all(isinstance(v, torch.Tensor) for v in [logits, probs, log_probs]), \
"please use compute using torch tensors and don't use predict_probs function"
# select log-probabilities for chosen actions, log pi(a_i|s_i)
log_probs_for_actions = torch.sum(
log_probs * to_one_hot(actions, env.action_space.n), dim=1)
# Compute loss here. Don't forgen entropy regularization with `entropy_coef`
entropy = <YOUR CODE>
loss = <YOUR CODE>
# Gradient descent step
<YOUR CODE>
# technical: return session rewards to print them later
return np.sum(rewards)
In [ ]:
for i in range(100):
rewards = [train_on_session(*generate_session(env)) for _ in range(100)] # generate new sessions
print("mean reward:%.3f" % (np.mean(rewards)))
if np.mean(rewards) > 500:
print("You Win!") # but you can train even further
break
In [ ]:
# Record sessions
import gym.wrappers
with gym.wrappers.Monitor(gym.make("CartPole-v0"), directory="videos", force=True) as env_monitor:
sessions = [generate_session(env_monitor) for _ in range(100)]
In [ ]:
# Show video. This may not work in some setups. If it doesn't
# work for you, you can download the videos and view them locally.
from pathlib import Path
from IPython.display import HTML
video_names = sorted([s for s in Path('videos').iterdir() if s.suffix == '.mp4'])
HTML("""
<video width="640" height="480" controls>
<source src="{}" type="video/mp4">
</video>
""".format(video_names[-1])) # You can also try other indices