This colab provides an overview of how Acme's modules can be stacked together to create reinforcement learning agents. It shows how to fit networks to environment specs, create actors, learners, replay buffers, datasets, adders, and full agents. It also highlights where you can swap out certain modules to create your own Acme based agents.
In [0]:
#@title Install necessary dependencies.
!sudo apt-get install -y xvfb ffmpeg
!pip install 'gym==0.10.11'
!pip install imageio
!pip install PILLOW
!pip install 'pyglet==1.3.2'
!pip install pyvirtualdisplay
!pip install dm-acme
!pip install dm-acme[reverb]
!pip install dm-acme[tf]
!pip install dm-acme[envs]
from IPython.display import clear_output
clear_output()
The next cell will install environments provided by dm_control
if you have an institutional MuJoCo license. This is not necessary, but without this you won't be able to use the dm_cartpole
environment below and can instead follow this colab using gym
environments. To do so simply expand the following cell, paste in your license file, and run the cell.
Alternatively, Colab supports using a Jupyter kernel on your local machine which can be accomplished by following the guidelines here: https://research.google.com/colaboratory/local-runtimes.html. This will allow you to install dm_control
by following instructions in https://github.com/deepmind/dm_control and using a personal MuJoCo license.
In [0]:
#@title Add your License
#@test {"skip": true}
mjkey = """
""".strip()
mujoco_dir = "$HOME/.mujoco"
# Install OpenGL dependencies
!apt-get update && apt-get install -y --no-install-recommends \
libgl1-mesa-glx libosmesa6 libglew2.0
# Get MuJoCo binaries
!wget -q https://www.roboti.us/download/mujoco200_linux.zip -O mujoco.zip
!unzip -o -q mujoco.zip -d "$mujoco_dir"
# Copy over MuJoCo license
!echo "$mjkey" > "$mujoco_dir/mjkey.txt"
# Install dm_control
!pip install dm_control
# Configure dm_control to use the OSMesa rendering backend
%env MUJOCO_GL=osmesa
# Check that the installation succeeded
try:
from dm_control import suite
env = suite.load('cartpole', 'swingup')
pixels = env.physics.render()
except Exception as e:
raise e from RuntimeError(
'Something went wrong during installation. Check the shell output above '
'for more information.')
else:
from IPython.display import clear_output
clear_output()
del suite, env, pixels
In [0]:
#@title Import modules.
#python3
%%capture
import copy
import pyvirtualdisplay
import imageio
import base64
import IPython
from acme import environment_loop
from acme.tf import networks
from acme.adders import reverb as adders
from acme.agents.tf import actors as actors
from acme.datasets import reverb as datasets
from acme.wrappers import gym_wrapper
from acme import specs
from acme import wrappers
from acme.agents.tf import d4pg
from acme.agents import agent
from acme.tf import utils as tf2_utils
from acme.utils import loggers
import gym
import dm_env
import matplotlib.pyplot as plt
import numpy as np
import reverb
import sonnet as snt
import tensorflow as tf
# Import dm_control if it exists.
try:
from dm_control import suite
except (ModuleNotFoundError, OSError):
pass
# Set up a virtual display for rendering OpenAI gym environments.
display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()
In [0]:
environment_name = 'dm_cartpole' # @param ['dm_cartpole', 'gym_mountaincar']
# task_name = 'balance' # @param ['swingup', 'balance']
def make_environment(domain_name='cartpole', task='balance'):
env = suite.load(domain_name, task)
env = wrappers.SinglePrecisionWrapper(env)
return env
if 'dm_cartpole' in environment_name:
environment = make_environment('cartpole')
def render(env):
return env._physics.render(camera_id=0) #pylint: disable=protected-access
elif 'gym_mountaincar' in environment_name:
environment = gym_wrapper.GymWrapper(gym.make('MountainCarContinuous-v0'))
environment = wrappers.SinglePrecisionWrapper(environment)
def render(env):
return env.environment.render(mode='rgb_array')
else:
raise ValueError('Unknown environment: {}.'.format(environment_name))
# Show the frame.
frame = render(environment)
plt.imshow(frame)
plt.axis('off')
We will later interact with the environment in a loop corresponding to the following diagram:
But before we start building an agent to interact with this environment, let's first look at the types of objects the environment either returns (e.g. observations) or consumes (e.g. actions). The environment_spec
will show you the form of the observations, rewards and discounts that the environment exposes and the form of the actions that can be taken.
In [0]:
environment_spec = specs.make_environment_spec(environment)
print('actions:\n', environment_spec.actions, '\n')
print('observations:\n', environment_spec.observations, '\n')
print('rewards:\n', environment_spec.rewards, '\n')
print('discounts:\n', environment_spec.discounts, '\n')
The most important part of a reinforcement learning algorithm is potentially the policy that maps environment observations to actions. We can use a simple neural network to create a policy, in this case a simple feedforward MLP with layer norm. For our TensorFlow agents we make use of the sonnet
library to specify networks or modules; all of the networks we will work with also have an initial batch dimension which allows for batched inference/learning.
It is possible that the the observations returned by the environment are nested in some way: e.g. environments from the dm_control
suite are frequently returned as dictionaries containing position
and velocity
entries. Our network is allowed to arbitarily map this dictionary to produce an action, but in this case we will simply concatenate these observations before feeding it through the MLP. We can do so using Acme's batch_concat
utility to flatten the nested observation into a single dimension for each batch. If the observation is already flat this will be a no-op.
Similarly, the output of the MLP is may have a different range of values than the action spec dictates. For this, we can use Acme's TanhToSpec
module to rescale our actions to meet the spec.
In [0]:
# Calculate how big the last layer should be based on total # of actions.
action_spec = environment_spec.actions
action_size = np.prod(action_spec.shape, dtype=int)
exploration_sigma = 0.3
# In order the following modules:
# 1. Flatten the observations to be [B, ...] where B is the batch dimension.
# 2. Define a simple MLP which is the guts of this policy.
# 3. Make sure the output action matches the spec of the actions.
policy_modules = [
tf2_utils.batch_concat,
networks.LayerNormMLP(layer_sizes=(300, 200, action_size)),
networks.TanhToSpec(spec=environment_spec.actions)]
policy_network = snt.Sequential(policy_modules)
# We will also create a version of this policy that uses exploratory noise.
behavior_network = snt.Sequential(
policy_modules + [networks.ClippedGaussian(exploration_sigma),
networks.ClipToSpec(action_spec)])
An Actor
is the part of our framework that directly interacts with an environment by generating actions. In more detail the earlier diagram can be expanded to show exactly how this interaction occurs:
While you can always write your own actor, in Acme we also provide a number of useful premade versions. For the network we specified above we will make use of a FeedForwardActor
that wraps a single feed forward network and knows how to do things like handle any batch dimensions or record observed transitions.
In [0]:
actor = actors.FeedForwardActor(policy_network)
All actors have the following public methods and attributes:
In [0]:
[method_or_attr for method_or_attr in dir(actor) # pylint: disable=expression-not-assigned
if not method_or_attr.startswith('_')]
Although we have instantiated an actor with a policy, the policy has not yet learned to achieve any task reward, and is essentially just acting randomly. However this is a perfect opportunity to see how the actor and environment interact. Below we define a simple helper function to display a video given frames from this interaction, and we show 500 steps of the actor taking actions in the world.
In [0]:
def display_video(frames, filename='temp.mp4'):
"""Save and display video."""
# Write video
with imageio.get_writer(filename, fps=60) as video:
for frame in frames:
video.append_data(frame)
# Read video and display the video
video = open(filename, 'rb').read()
b64_video = base64.b64encode(video)
video_tag = ('<video width="320" height="240" controls alt="test" '
'src="data:video/mp4;base64,{0}">').format(b64_video.decode())
return IPython.display.HTML(video_tag)
In [0]:
# Run the actor in the environment for desired number of steps.
frames = []
num_steps = 500
timestep = environment.reset()
for _ in range(num_steps):
frames.append(render(environment))
action = actor.select_action(timestep.observation)
timestep = environment.step(action)
# Save video of the behaviour.
display_video(np.array(frames))
Many RL agents utilize a data structure such as a replay buffer to store data from the environment (e.g. observations) along with actions taken by the actor. This data will later be fed into a learning process in order to update the policy. Again we can expand our earlier diagram to include this step:
In order to make this possible, Acme leverages Reverb which is an efficient and easy-to-use data storage and transport system designed for Machine Learning research. Below we will create the replay buffer before interacting with it.
In [0]:
# Create a table with the following attributes:
# 1. when replay is full we remove the oldest entries first.
# 2. to sample from replay we will do so uniformly at random.
# 3. before allowing sampling to proceed we make sure there is at least
# one sample in the replay table.
# 4. we use a default table name so we don't have to repeat it many times below;
# if we left this off we'd need to feed it into adders/actors/etc. below.
replay_buffer = reverb.Table(
name=adders.DEFAULT_PRIORITY_TABLE,
max_size=1000000,
remover=reverb.selectors.Fifo(),
sampler=reverb.selectors.Uniform(),
rate_limiter=reverb.rate_limiters.MinSize(min_size_to_sample=1))
# Get the server and address so we can give it to the modules such as our actor
# that will interact with the replay buffer.
replay_server = reverb.Server([replay_buffer], port=None)
replay_server_address = 'localhost:%d' % replay_server.port
We could interact directly with Reverb in order to add data to replay. However in Acme we have an additional layer on top of this data-storage that allows us to use the same interface no matter what kind of data we are inserting.
This layer in Acme corresponds to an Adder
which adds experience to a data table. We provide several adders that differ based on the type of information that is desired to be stored in the table, however in this case we will make use of an NStepTransitionAdder
which stores simple transitions (if N=1) or accumulates N-steps to form an aggregated transition.
In [0]:
# Create a 5-step transition adder where in between those steps a discount of
# 0.99 is used (which should be the same discount used for learning).
adder = adders.NStepTransitionAdder(
client=reverb.Client(replay_server_address),
n_step=5,
discount=0.99)
We can either directly use the adder to add transitions to replay directly using the add()
and add_first()
methods as follows:
In [0]:
num_episodes = 2 #@param
for episode in range(num_episodes):
timestep = environment.reset()
adder.add_first(timestep)
while not timestep.last():
action = actor.select_action(timestep.observation)
timestep = environment.step(action)
adder.add(action=action, next_timestep=timestep)
Since this is a common enough way to observe data, Actor
s in Acme generally take an Adder
instance that they use to define their observation methods. We saw earlier that the FeedForwardActor
like all Actor
s defines observe
and observe_first
methods. If we give the actor an Adder
instance at init then it will use this adder to make observations.
In [0]:
actor = actors.FeedForwardActor(policy_network=behavior_network, adder=adder)
Below we repeat the same process, but using actor
and its observe
methods. We note these subtle changes below.
In [0]:
num_episodes = 2 #@param
for episode in range(num_episodes):
timestep = environment.reset()
actor.observe_first(timestep) # Note: observe_first.
while not timestep.last():
action = actor.select_action(timestep.observation)
timestep = environment.step(action)
actor.observe(action=action, next_timestep=timestep) # Note: observe.
In [0]:
# This connects to the created reverb server; also note that we use a transition
# adder above so we'll tell the dataset function that so that it knows the type
# of data that's coming out.
dataset = datasets.make_dataset(
client=reverb.TFClient(server_address=replay_server_address),
batch_size=256,
environment_spec=environment_spec,
transition_adder=True)
In what follows we'll make use of D4PG, an actor-critic learning algorithm. D4PG is a somewhat complicated algorithm, so we'll leave a full explanation of this method to the accompanying paper (see the documentation).
However, since D4PG is an actor-critic algorithm we will have to specify a critic for it (a value function). In this case D4PG uses a distributional critic as well. D4PG also makes use of online and target networks so we need to create copies of both the policy_network (from earlier) and the new critic network we are about to create.
To build our critic networks, we use a multiplexer, which is simply a neural network module that takes multiple inputs and processes them in different ways before combining them and processing further. In the case of Acme's CriticMultiplexer
, the inputs are observations and actions, each with their own network torso. There is then a critic network module that processes the outputs of the observation network and the action network and outputs a tensor.
Finally, in order to optimize these networks the learner must receive networks with the variables created. We have utilities in Acme to handle exactly this, and we do so in the final lines of the following code block.
In [0]:
critic_network = snt.Sequential([
networks.CriticMultiplexer(
observation_network=tf2_utils.batch_concat,
action_network=tf.identity,
critic_network=networks.LayerNormMLP(
layer_sizes=(400, 300),
activate_final=True)),
# Value-head gives a 51-atomed delta distribution over state-action values.
networks.DiscreteValuedHead(vmin=-150., vmax=150., num_atoms=51)])
# Create the target networks
target_policy_network = copy.deepcopy(policy_network)
target_critic_network = copy.deepcopy(critic_network)
# We must create the variables in the networks before passing them to learner.
tf2_utils.create_variables(network=policy_network,
input_spec=[environment_spec.observations])
tf2_utils.create_variables(network=critic_network,
input_spec=[environment_spec.observations,
environment_spec.actions])
tf2_utils.create_variables(network=target_policy_network,
input_spec=[environment_spec.observations])
tf2_utils.create_variables(network=target_critic_network,
input_spec=[environment_spec.observations,
environment_spec.actions])
We can now create a learner that uses these networks. Note that here we're using the same discount factor as was used in the transition adder. The rest of the parameters are reasonable defaults.
Note however that we will log output to the terminal at regular intervals. We have also turned off checkpointing of the network weights (i.e. saving them). This is usually used by default but can cause issues with interactive colab sessions.
In [0]:
learner = d4pg.D4PGLearner(policy_network=policy_network,
critic_network=critic_network,
target_policy_network=target_policy_network,
target_critic_network=target_critic_network,
dataset=dataset,
discount=0.99,
target_update_period=100,
policy_optimizer=snt.optimizers.Adam(1e-4),
critic_optimizer=snt.optimizers.Adam(1e-4),
# Log learner updates to console every 10 seconds.
logger=loggers.TerminalLogger(time_delta=10.),
checkpoint=False)
Inspecting the learner's public methods we see that it primarily exists to expose its variables and update them. IE this looks remarkably similar to supervised learning.
In [0]:
[method_or_attr for method_or_attr in dir(learner) # pylint: disable=expression-not-assigned
if not method_or_attr.startswith('_')]
The learner's step()
method samples a batch of data from the replay dataset given to it, and performs optimization using the optimizer, logging loss metrics along the way. Note: in order to sample from the replay dataset, there must be at least 1000 elements in the replay buffer (which should already have from the actor's added experiences.)
In [0]:
learner.step()
Finally, we can put all of the pieces together and run some training steps in the environment, alternating the actor's experience gathering with learner's learning.
This is a simple training loop that runs for num_training_episodes
episodes where the actor and learner take turns generating and learning from experience respectively:
min_actor_steps_per_iteration
stepsnum_learner_steps_per_iteration
stepsNote: Since the learner and actor are sharing a policy network, any learning done on the learner, automatically is transferred to the actor's policy.
In [0]:
num_training_episodes = 10 # @param {type: "integer"}
min_actor_steps_before_learning = 1000 # @param {type: "integer"}
num_actor_steps_per_iteration = 100 # @param {type: "integer"}
num_learner_steps_per_iteration = 1 # @param {type: "integer"}
learner_steps_taken = 0
actor_steps_taken = 0
for episode in range(num_training_episodes):
timestep = environment.reset()
actor.observe_first(timestep)
episode_return = 0
while not timestep.last():
# Get an action from the agent and step in the environment.
action = actor.select_action(timestep.observation)
next_timestep = environment.step(action)
# Record the transition.
actor.observe(action=action, next_timestep=next_timestep)
# Book-keeping.
episode_return += next_timestep.reward
actor_steps_taken += 1
timestep = next_timestep
# See if we have some learning to do.
if (actor_steps_taken >= min_actor_steps_before_learning and
actor_steps_taken % num_actor_steps_per_iteration == 0):
# Learn.
for learner_step in range(num_learner_steps_per_iteration):
learner.step()
learner_steps_taken += num_learner_steps_per_iteration
# Log quantities.
print('Episode: %d | Return: %f | Learner steps: %d | Actor steps: %d'%(
episode, episode_return, learner_steps_taken, actor_steps_taken))
Now that we've used all of the pieces and seen how they can interact, there's one more way we can put it all together. In the Acme design scheme, an agent is an entity with both a learner and an actor component that will piece together their interactions internally. An agent handles the interchange between actor adding experiences to the replay buffer and learner sampling from it and learning and in turn, sharing its weights back with the actor.
Similar to how we used num_actor_steps_per_iteration
and num_learner_steps_per_iteration
parameters in our custom training loop above, the agent parameters min_observations
and observations_per_step
specify the structure of the agent's training loop.
min_observations
specifies how many actor steps need to have happened to start learning.observations_per_step
specifies how many actor steps should occur in between each learner step.
In [0]:
d4pg_agent = agent.Agent(actor=actor,
learner=learner,
min_observations=1000,
observations_per_step=8.)
Of course we could have just used the agents.D4PG
agent directly which sets
all of this up for us. We'll stick with this agent we've just created, but most of the steps outlined in this tutorial can be skipped by just making use of a
prebuilt agent and the environment loop.
To simplify collecting and storing experiences, you can also directly use Acme's EnvironmentLoop
which runs the environment loop for a specified number of episodes. Each episode is itself a loop which interacts first with the environment to get an observation and then give that observation to the agent in order to retrieve an action. Upon termination of an episode a new episode will be started. If the number of episodes is not given then this will interact with the environment infinitely.
In [0]:
# This may be necessary if any of the episodes were cancelled above.
adder.reset()
# We also want to make sure the logger doesn't write to disk because that can
# cause issues in colab on occasion.
logger = loggers.TerminalLogger(time_delta=10.)
In [0]:
loop = environment_loop.EnvironmentLoop(environment, d4pg_agent, logger=logger)
loop.run(num_episodes=50)
We can now evaluate the agent. Note that this will use the noisy behavior policy, and so won't quite be optimal. If we wanted to be absolutely precise we could easily replace this with the noise-free policy. Note that the optimal policy can get about 1000 reward in this environment. D4PG should generally get to that within 50-100 learner steps. We've cut it off at 50 and not dropped the behavior noise just to simplify this tutorial.
In [0]:
# Run the actor in the environment for desired number of steps.
frames = []
num_steps = 500
timestep = environment.reset()
for _ in range(num_steps):
frames.append(render(environment))
action = d4pg_agent.select_action(timestep.observation)
timestep = environment.step(action)
# Save video of the behaviour.
display_video(np.array(frames))