In [1]:
    
import gym
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
# This code creates a virtual display to draw game images on. 
# If you are running locally, just ignore it
import os
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
    !bash ../xvfb start
    os.environ['DISPLAY'] = ':1'
    
    
In [2]:
    
from collections import namedtuple
from pickle import dumps, loads
from gym.core import Wrapper
# a container for get_result function below. Works just like tuple, but prettier
ActionResult = namedtuple(
    "action_result", ("snapshot", "observation", "reward", "is_done", "info"))
class WithSnapshots(Wrapper):
    """
    Creates a wrapper that supports saving and loading environemnt states.
    Required for planning algorithms.
    This class will have access to the core environment as self.env, e.g.:
    - self.env.reset()           #reset original env
    - self.env.ale.cloneState()  #make snapshot for atari. load with .restoreState()
    - ...
    You can also use reset, step and render directly for convenience.
    - s, r, done, _ = self.step(action)   #step, same as self.env.step(action)
    - self.render(close=True)             #close window, same as self.env.render(close=True)
    """
    def get_snapshot(self, render=False):
        """
        :returns: environment state that can be loaded with load_snapshot 
        Snapshots guarantee same env behaviour each time they are loaded.
        Warning! Snapshots can be arbitrary things (strings, integers, json, tuples)
        Don't count on them being pickle strings when implementing MCTS.
        Developer Note: Make sure the object you return will not be affected by 
        anything that happens to the environment after it's saved.
        You shouldn't, for example, return self.env. 
        In case of doubt, use pickle.dumps or deepcopy.
        """
        if render:
            self.render()  # close popup windows since we can't pickle them
            self.close()
        if self.unwrapped.viewer is not None:
            self.unwrapped.viewer.close()
            self.unwrapped.viewer = None
        return dumps(self.env)
    def load_snapshot(self, snapshot, render=False):
        """
        Loads snapshot as current env state.
        Should not change snapshot inplace (in case of doubt, deepcopy).
        """
        assert not hasattr(self, "_monitor") or hasattr(
            self.env, "_monitor"), "can't backtrack while recording"
        if render:
            self.render()  # close popup windows since we can't load into them
            self.close()
        self.env = loads(snapshot)
    def get_result(self, snapshot, action):
        """
        A convenience function that 
        - loads snapshot, 
        - commits action via self.step,
        - and takes snapshot again :)
        :returns: next snapshot, next_observation, reward, is_done, info
        Basically it returns next snapshot and everything that env.step would have returned.
        """
        #<your code here load, commit, take snapshot >
        self.load_snapshot(snapshot, render=False)
        next_observation, reward, is_done, info = self.step(action)
        next_snapshot = self.get_snapshot()
        
        return ActionResult(next_snapshot, next_observation, reward, is_done, info) #fill in variables
    
In [3]:
    
# make env
env = WithSnapshots(gym.make("CartPole-v0"))
env.reset()
n_actions = env.action_space.n
    
In [4]:
    
print("initial_state:")
plt.imshow(env.render('rgb_array'))
env.close()
# create first snapshot
snap0 = env.get_snapshot()
    
    
    
In [5]:
    
# play without making snapshots (faster)
while True:
    is_done = env.step(env.action_space.sample())[2]
    if is_done:
        print("Whoops! We died!")
        break
print("final state:")
plt.imshow(env.render('rgb_array'))
env.close()
    
    
    
In [6]:
    
# reload initial state
env.load_snapshot(snap0)
print("\n\nAfter loading snapshot")
plt.imshow(env.render('rgb_array'))
env.close()
    
    
    
In [7]:
    
# get outcome (snapshot, observation, reward, is_done, info)
res = env.get_result(snap0, env.action_space.sample())
snap1, observation, reward = res[:3]
# second step
res2 = env.get_result(snap1, env.action_space.sample())
    
In this section, we'll implement the vanilla MCTS algorithm with UCB1-based node selection.
We will start by implementing the Node class - a simple class that acts like MCTS node and supports some of the MCTS algorithm steps.
This MCTS implementation makes some assumptions about the environment, you can find those in the notes section at the end of the notebook.
In [8]:
    
assert isinstance(env,WithSnapshots)
    
In [24]:
    
class Node:
    """ a tree node for MCTS """
    
    #metadata:
    parent = None          #parent Node
    value_sum = 0.         #sum of state values from all visits (numerator)
    times_visited = 0      #counter of visits (denominator)
    
    def __init__(self,parent,action,):
        """
        Creates and empty node with no children.
        Does so by commiting an action and recording outcome.
        
        :param parent: parent Node
        :param action: action to commit from parent Node
        
        """
        
        self.parent = parent
        self.action = action        
        self.children = set()       #set of child nodes
        #get action outcome and save it
        res = env.get_result(parent.snapshot, action)
        self.snapshot, self.observation, self.immediate_reward, self.is_done,_ = res
        
        
    def is_leaf(self):
        return len(self.children)==0
    
    def is_root(self):
        return self.parent is None
    
    def get_mean_value(self):
        return self.value_sum / self.times_visited if self.times_visited !=0 else 0
    
    def ucb_score(self, scale=10, max_value=1e100):
        """
        Computes ucb1 upper bound using current value and visit counts for node and it's parent.
        
        :param scale: Multiplies upper bound by that. From hoeffding inequality, assumes reward range to be [0,scale].
        :param max_value: a value that represents infinity (for unvisited nodes)
        
        """
        
        if self.times_visited == 0:
            return max_value
        
        #compute ucb-1 additive component (to be added to mean value)
        #hint: you can use self.parent.times_visited for N times node was considered,
        # and self.times_visited for n times it was visited
        
        U = np.sqrt(2 * np.log(self.parent.times_visited) / self.times_visited) # <your code here>
        
        return self.get_mean_value() + scale * U
    
    
    #MCTS steps
    
    def select_best_leaf(self):
        """
        Picks the leaf with highest priority to expand
        Does so by recursively picking nodes with best UCB-1 score until it reaches the leaf.
        
        """
        if self.is_leaf():
            return self
        
        children = self.children
        
        #<select best child node in terms of node.ucb_score()>
        best_i = np.argmax([child.ucb_score() for child in children])
        best_child = list(children)[best_i]
        
        return best_child.select_best_leaf()
    
    def expand(self):
        """
        Expands the current node by creating all possible child nodes.
        Then returns one of those children.
        """
        
        assert not self.is_done, "can't expand from terminal state"
        for action in range(n_actions):
            self.children.add(Node(self, action))
        
        return self.select_best_leaf()
    
    def rollout(self, t_max=10**4):
        """
        Play the game from this state to the end (done) or for t_max steps.
        
        On each step, pick action at random (hint: env.action_space.sample()).
        
        Compute sum of rewards from current state till 
        Note 1: use env.action_space.sample() for random action
        Note 2: if node is terminal (self.is_done is True), just return 0
        
        """
            
        #set env into the appropriate state
        env.load_snapshot(self.snapshot)
        obs = self.observation
        is_done = self.is_done
        # If node is terminal retur 0
        if is_done:
            return 0
        
        #<your code here - rollout and compute reward>
        rollout_reward = 0
        # get outcome (snapshot, observation, reward, is_done, info)
        
        snapshot = self.snapshot
        for t in range(t_max):
            res = env.get_result(snapshot, env.action_space.sample())
            snapshot, observation, reward, is_done, _ = res
            rollout_reward += reward
            if is_done:
                break
            
        return rollout_reward
    
    def propagate(self, child_value):
        """
        Uses child value (sum of rewards) to update parents recursively.
        """
        #compute node value
        my_value = self.immediate_reward + child_value
        
        #update value_sum and times_visited
        self.value_sum+=my_value
        self.times_visited+=1
        
        #propagate upwards
        if not self.is_root():
            self.parent.propagate(my_value)
        
    def safe_delete(self):
        """safe delete to prevent memory leak in some python versions"""
        del self.parent
        for child in self.children:
            child.safe_delete()
            del child
    
In [25]:
    
class Root(Node):
    def __init__(self,snapshot,observation):
        """
        creates special node that acts like tree root
        :snapshot: snapshot (from env.get_snapshot) to start planning from
        :observation: last environment observation
        """
        
        self.parent = self.action = None
        self.children = set()       #set of child nodes
        
        #root: load snapshot and observation
        self.snapshot = snapshot
        self.observation = observation
        self.immediate_reward = 0
        self.is_done=False
    
    @staticmethod
    def from_node(node):
        """initializes node as root"""
        root = Root(node.snapshot,node.observation)
        #copy data
        copied_fields = ["value_sum","times_visited","children","is_done"]
        for field in copied_fields:
            setattr(root,field,getattr(node,field))
        return root
    
In [26]:
    
def plan_mcts(root, n_iters=10):
    """
    builds tree with monte-carlo tree search for n_iters iterations
    :param root: tree node to plan from
    :param n_iters: how many select-expand-simulate-propagete loops to make
    """
    for _ in range(n_iters):
        node = root.select_best_leaf() #<select best leaf>
        if node.is_done:
            node.propagate(0)
        else: #node is not terminal
            #<expand-simulate-propagate loop>
            node.expand()
            reward = node.rollout()
            node.propagate(reward)
    
In [31]:
    
env = WithSnapshots(gym.make("CartPole-v0"))
root_observation = env.reset()
root_snapshot = env.get_snapshot()
root = Root(root_snapshot, root_observation)
    
In [32]:
    
#plan from root:
plan_mcts(root,n_iters=1000)
    
In [33]:
    
from IPython.display import clear_output
from itertools import count
from gym.wrappers import Monitor
total_reward = 0                #sum of rewards
test_env = loads(root_snapshot) #env used to show progress
for i in count():
    
    #get best child
    #<select child with highest mean reward>
    best_i = np.argmax([child.get_mean_value() for child in root.children])
    best_child = list(root.children)[best_i]
    
    #take action
    s,r,done,_ = test_env.step(best_child.action)
    
    #show image
    clear_output(True)
    plt.title("step %i"%i)
    plt.imshow(test_env.render('rgb_array'))
    plt.show()
    total_reward += r
    if done:
        print("Finished with reward = ",total_reward)
        break
    
    #discard unrealized part of the tree [because not every child matters :(]
    for child in root.children:
        if child != best_child:
            child.safe_delete()
    #declare best child a new root
    root = Root.from_node(best_child)
    
    assert not root.is_leaf(), "We ran out of tree! Need more planning! Try growing tree right inside the loop."
    
    #you may want to expand tree here
    #<your code here>
    plan_mcts(root,n_iters=100)
    
    
    
In [34]:
    
from submit import submit_mcts
submit_mcts(total_reward, "tonatiuh_rangel@hotmail.com", "pfV2eFsrnmEfih1c")
    
    
There's a few things you might want to try if you want to dig deeper:
"Analyze this" assignment
UCB-1 is a weak bound as it relies on a very general bounds (Hoeffding Inequality, to be exact).
expand call. See the notes below for details.The goal is to find out what gives the optimal performance for CartPole-v0 for different time budgets (i.e. different n_iter in plan_mcts.
Evaluate your results on AcroBot-v1 - do the results change and if so, how can you explain it?
"Build this" assignment
Apply MCTS to play atari games. In particular, let's start with gym.make("MsPacman-ramDeterministic-v0").
This requires two things:
Slightly modify WithSnapshots wrapper to work with atari.
snapshot = self.env.ale.cloneState()
...
self.env.ale.restoreState(snapshot)
Run MCTS on the game above.
Planning on each iteration is a costly thing to do. You can speed things up drastically if you train a classifier to predict which action will turn out to be best according to MCTS.
To do so, just record which action did the MCTS agent take on each step and fit something to [state, mcts_optimal_action]
While CartPole is glorious enough, try expanding this to gym.make("MsPacmanDeterministic-v0")
See previous section on how to wrap atari
Also consider what AlphaGo Zero did in this area.
(this will likely take long time, better consider this as side project when all other deadlines are met)
Incorporate planning into the agent architecture.
The goal is to implement Value Iteration Networks
For starters, remember week5 assignment? If not, use this instead.
You will need to switch it into a maze-like game, consider MsPacman or the games from week7 Bonus: Neural Maps from here.
You will need to implement a special layer that performs value iteration-like update to a recurrent memory. This can be implemented the same way you did attention from week7 or week8.
The full list of assumptions is
expandt_max to something more reasonable.rollout and use my_R = r + gamma*child_R for propagateWithSnapshots.get_best_leaf and expand functionsThis MCTS implementation only selects leaf nodes for expansion.
This doesn't break things down because expand adds all possible actions. Hence, all non-leaf nodes are by design fully expanded and shouldn't be selected.
If you want to only add a few random action on each expand, you will also have to modify get_best_leaf to consider returning non-leafs.
We use a simple uniform policy for rollouts. This introduces a negative bias to good situations that can be messed up completely with random bad action. As a simple example, if you tend to rollout with uniform policy, you better don't use sharp knives and walk near cliffs.
You can improve that by integrating a reinforcement learning algorithm with a computationally light agent. You can even train this agent on optimal policy found by the tree search.
In [ ]: