In [ ]:
import sys, os
if 'google.colab' in sys.modules:
%tensorflow_version 1.x
if not os.path.exists('.setup_complete'):
!wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/spring20/setup_colab.sh -O- | bash
!touch .setup_complete
# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
!bash ../xvfb start
os.environ['DISPLAY'] = ':1'
In [ ]:
import gym
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
A caveat: we have received reports that the following cell may crash with NameError: name 'base' is not defined
. The suggested workaround is to install gym==0.14.0
and pyglet==1.3.2
.
In [ ]:
env = gym.make("CartPole-v0")
# gym compatibility: unwrap TimeLimit
if hasattr(env, '_max_episode_steps'):
env = env.env
env.reset()
n_actions = env.action_space.n
state_dim = env.observation_space.shape
plt.imshow(env.render("rgb_array"))
For REINFORCE algorithm, we'll need a model that predicts action probabilities given states.
For numerical stability, please do not include the softmax layer into your network architecture. We'll use softmax or log-softmax where appropriate.
In [ ]:
import tensorflow as tf
sess = tf.InteractiveSession()
In [ ]:
# create input variables. We only need <s, a, r> for REINFORCE
ph_states = tf.placeholder('float32', (None,) + state_dim, name="states")
ph_actions = tf.placeholder('int32', name="action_ids")
ph_cumulative_rewards = tf.placeholder('float32', name="cumulative_returns")
In [ ]:
from keras.models import Sequential
from keras.layers import Dense
<YOUR CODE: define network graph using raw TF, Keras, or any other library you prefer>
logits = <YOUR CODE: symbolic outputs of your network _before_ softmax>
policy = tf.nn.softmax(logits)
log_policy = tf.nn.log_softmax(logits)
In [ ]:
# Initialize model parameters
sess.run(tf.global_variables_initializer())
In [ ]:
def predict_probs(states):
"""
Predict action probabilities given states.
:param states: numpy array of shape [batch, state_shape]
:returns: numpy array of shape [batch, n_actions]
"""
return policy.eval({ph_states: [states]})[0]
In [ ]:
def generate_session(env, t_max=1000):
"""
Play a full session with REINFORCE agent.
Returns sequences of states, actions, and rewards.
"""
# arrays to record session
states, actions, rewards = [], [], []
s = env.reset()
for t in range(t_max):
# action probabilities array aka pi(a|s)
action_probs = predict_probs(s)
# Sample action with given probabilities.
a = <YOUR CODE>
new_s, r, done, info = env.step(a)
# record session history to train later
states.append(s)
actions.append(a)
rewards.append(r)
s = new_s
if done:
break
return states, actions, rewards
In [ ]:
# test it
states, actions, rewards = generate_session(env)
In [ ]:
def get_cumulative_rewards(rewards, # rewards at each step
gamma=0.99 # discount for reward
):
"""
Take a list of immediate rewards r(s,a) for the whole session
and compute cumulative returns (a.k.a. G(s,a) in Sutton '16).
G_t = r_t + gamma*r_{t+1} + gamma^2*r_{t+2} + ...
A simple way to compute cumulative rewards is to iterate from the last
to the first timestep and compute G_t = r_t + gamma*G_{t+1} recurrently
You must return an array/list of cumulative rewards with as many elements as in the initial rewards.
"""
<YOUR CODE>
return <YOUR CODE: array of cumulative rewards>
In [ ]:
assert len(get_cumulative_rewards(range(100))) == 100
assert np.allclose(
get_cumulative_rewards([0, 0, 1, 0, 0, 1, 0], gamma=0.9),
[1.40049, 1.5561, 1.729, 0.81, 0.9, 1.0, 0.0])
assert np.allclose(
get_cumulative_rewards([0, 0, 1, -2, 3, -4, 0], gamma=0.5),
[0.0625, 0.125, 0.25, -1.5, 1.0, -4.0, 0.0])
assert np.allclose(
get_cumulative_rewards([0, 0, 1, 2, 3, 4, 0], gamma=0),
[0, 0, 1, 2, 3, 4, 0])
print("looks good!")
We now need to define objective and update over policy gradient.
Our objective function is
$$ J \approx { 1 \over N } \sum_{s_i,a_i} G(s_i,a_i) $$REINFORCE defines a way to compute the gradient of the expected reward with respect to policy parameters. The formula is as follows:
$$ \nabla_\theta \hat J(\theta) \approx { 1 \over N } \sum_{s_i, a_i} \nabla_\theta \log \pi_\theta (a_i \mid s_i) \cdot G_t(s_i, a_i) $$We can abuse Tensorflow's capabilities for automatic differentiation by defining our objective function as follows:
$$ \hat J(\theta) \approx { 1 \over N } \sum_{s_i, a_i} \log \pi_\theta (a_i \mid s_i) \cdot G_t(s_i, a_i) $$When you compute the gradient of that function with respect to network weights $\theta$, it will become exactly the policy gradient.
In [ ]:
# This code selects the log-probabilities (log pi(a_i|s_i)) for those actions that were actually played.
indices = tf.stack([tf.range(tf.shape(log_policy)[0]), ph_actions], axis=-1)
log_policy_for_actions = tf.gather_nd(log_policy, indices)
In [ ]:
# Policy objective as in the last formula. Please use reduce_mean, not reduce_sum.
# You may use log_policy_for_actions to get log probabilities for actions taken.
# Also recall that we defined ph_cumulative_rewards earlier.
J = <YOUR CODE>
As a reminder, for a discrete probability distribution (like the one our policy outputs), entropy is defined as:
$$ \operatorname{entropy}(p) = -\sum_{i = 1}^n p_i \cdot \log p_i $$
In [ ]:
# Entropy regularization. If you don't add it, the policy will quickly deteriorate to
# being deterministic, harming exploration.
entropy = <YOUR CODE: compute entropy. Do not forget the sign!>
In [ ]:
# # Maximizing X is the same as minimizing -X, hence the sign.
loss = -(J + 0.1 * entropy)
update = tf.train.AdamOptimizer().minimize(loss)
In [ ]:
def train_on_session(states, actions, rewards, t_max=1000):
"""given full session, trains agent with policy gradient"""
cumulative_rewards = get_cumulative_rewards(rewards)
update.run({
ph_states: states,
ph_actions: actions,
ph_cumulative_rewards: cumulative_rewards,
})
return sum(rewards)
In [ ]:
# Initialize optimizer parameters
sess.run(tf.global_variables_initializer())
In [ ]:
for i in range(100):
rewards = [train_on_session(*generate_session(env)) for _ in range(100)] # generate new sessions
print("mean reward: %.3f" % (np.mean(rewards)))
if np.mean(rewards) > 300:
print("You Win!") # but you can train even further
break
In [ ]:
# Record sessions
import gym.wrappers
with gym.wrappers.Monitor(gym.make("CartPole-v0"), directory="videos", force=True) as env_monitor:
sessions = [generate_session(env_monitor) for _ in range(100)]
In [ ]:
# Show video. This may not work in some setups. If it doesn't
# work for you, you can download the videos and view them locally.
from pathlib import Path
from IPython.display import HTML
video_names = sorted([s for s in Path('videos').iterdir() if s.suffix == '.mp4'])
HTML("""
<video width="640" height="480" controls>
<source src="{}" type="video/mp4">
</video>
""".format(video_names[-1])) # You can also try other indices