This tutorial is a Jupyter notebook version of PyTorch's original example code, made to run inside a Docker container.
The agent has to decide between two actions - moving the cart left or right - so that the pole attached to it stays upright. You can find an official leaderboard with various algorithms and visualizations at the Gym website.
As the agent observes the current state of the environment and chooses an action, the environment transitions to a new state, and also returns a reward that indicates the consequences of the action. In this task, the environment terminates if the pole falls over too far.
The CartPole task is designed so that the inputs to the agent are 4 real values representing the environment state (position, velocity, etc.). This is a much simpler task, compared to one where the input is a raw input from the game screen - which allows us to quickly experience the agent's improvement on our screen.
Nothing really interesting here;
torch is the main PyTorch moduletorch.nn for Neural Networkstorch.nn.functional torch.optim is an optimization packagetorch.autograd for auto differentiationtorch.autograd.Variable an auto-differentiable Variable (Tensor)
In [16]:
import argparse
import gym
import numpy as np
from itertools import count
from collections import namedtuple
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.autograd as autograd
from torch.autograd import Variable
# Now the jupyter/gym render part comes in
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline
# iPython
from IPython import display, get_ipython
In [17]:
def parse_args():
# parser = argparse.ArgumentParser(description='PyTorch REINFORCE example')
# parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
# help='discount factor (default: 0.99)')
# parser.add_argument('--seed', type=int, default=543, metavar='N',
# help='random seed (default: 543)')
# parser.add_argument('--render', action='store_true',
# help='render the environment', default=False)
# parser.add_argument('--log_interval', type=int, default=10, metavar='N',
# help='interval between training status logs (default: 10)')
# args = parser.parse_args()
dictionary = {'gamma': 0.99, 'seed': 543, 'render': False, 'log_interval': 10}
args = namedtuple('GenericDict', dictionary.keys())(**dictionary)
return args
In [18]:
class Policy(nn.Module):
"""
This defines the Policy Network
"""
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.affine2 = nn.Linear(128, 2)
self.saved_actions = []
self.rewards = []
def forward(self, x):
"""
This is our network's forward pass; Backward pass is created implicitly
:param x:
:return:
"""
x = F.relu(self.affine1(x))
action_scores = self.affine2(x)
return F.softmax(action_scores)
A state is given, for which the network (policy) computes the next action probability distribution From this, a new action is created and is also appended to saved_actions
reward will denote a vector $\in\mathbb{R}^T$, such that $\mathrm{reward}_t$ is a normalized sum of rewards up to timestep $t$. Dicount factor $\gamma$ is taken into account.
Following this, the REINFORCE algorithm is performed for every action. As we've seen in the theoretical part, this is an accepted way of differentiating stochastic units. reinforce is a method of pytorch.autograd.Variable. Note that action instances are probability distributions, and each one will be differentiated w.r.t. the entire vector of rewards.
Following this, the optimizer is advanced as usual per neural networks.
In [19]:
def select_action(state, policy):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = policy(Variable(state))
action = probs.multinomial()
policy.saved_actions.append(action)
return action.data
In [20]:
def finish_episode(policy, optimizer, gamma):
R = 0
rewards = []
for r in policy.rewards[::-1]:
R = r + gamma * R
rewards.insert(0, R)
rewards = torch.Tensor(rewards)
rewards = (rewards - rewards.mean()) / (rewards.std() + np.finfo(np.float32).eps)
for action, r in zip(policy.saved_actions, rewards):
action.reinforce(r)
optimizer.zero_grad()
autograd.backward(policy.saved_actions, [None for _ in policy.saved_actions])
optimizer.step()
del policy.rewards[:]
del policy.saved_actions[:]
We first load the gym environment, initialize a Policy instance and create an optimizer for it.
Following this, we iterate for a number of episodes. As we've seen before, episodes are standalone interactions with the environment - each composed of $T$ timesteps. An environment interaction is roughly:
We will stop when 'running_reward' raises above some predefined threshold.
In [21]:
def main():
args = parse_args()
env = gym.make('CartPole-v0')
env.seed(args.seed)
torch.manual_seed(args.seed)
policy = Policy()
optimizer = optim.Adam(policy.parameters(), lr=1e-2)
def show_state(env, step=0, episode=0, info=""):
plt.figure(3)
plt.clf()
plt.imshow(env.render(mode='rgb_array'))
plt.title("{} | Episode: {:3d}, Step: {:4d}\n{}".format(env.spec.id, episode, step, info))
plt.axis('off')
display.clear_output(wait=True)
display.display(plt.gcf())
running_reward = 10
msgs = ['']
for i_episode in count(1):
state = env.reset()
frames = []
for t in range(10000): # Don't infinite loop while learning
action = select_action(state, policy)
state, reward, done, _ = env.step(action[0,0])
if args.render:
show_state(env, step=t, episode=i_episode, info='\n'.join(msgs))
policy.rewards.append(reward)
if done:
break
running_reward = running_reward * 0.99 + t * 0.01
finish_episode(policy, optimizer, args.gamma)
if i_episode % args.log_interval == 0:
msgs.append('Episode {}\tLast length: {:5d}\tAverage length: {:.2f}\n'.format(
i_episode, t, running_reward))
if not args.render:
print(msgs[-1])
if running_reward > env.spec.reward_threshold:
print("Solved! Running reward is now {} and "
"the last episode runs to {} time steps!".format(running_reward, t))
break
env.render(close=True)
env.close()
if __name__ == '__main__':
main()
In [ ]: