In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
A common pattern in reinforcement learning is to execute a policy in an environment for a specified number of steps or episodes. This happens, for example, during data collection, evaluation and generating a video of the agent.
While this is relatively straightforward to write in python, it is much more complex to write and debug in TensorFlow because it involves tf.while
loops, tf.cond
and tf.control_dependencies
. Therefore we abstract this notion of a run loop into a class called driver
, and provide well tested implementations both in Python and TensorFlow.
Additionally, the data encountered by the driver at each step is saved in a named tuple called Trajectory and broadcast to a set of observers such as replay buffers and metrics. This data includes the observation from the environment, the action recommended by the policy, the reward obtained, the type of the current and the next step, etc.
If you haven't installed tf-agents or gym yet, run:
In [0]:
!pip install --pre tf-agents[reverb]
!pip install gym
In [0]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from tf_agents.environments import suite_gym
from tf_agents.environments import tf_py_environment
from tf_agents.policies import random_py_policy
from tf_agents.policies import random_tf_policy
from tf_agents.metrics import py_metrics
from tf_agents.metrics import tf_metrics
from tf_agents.drivers import py_driver
from tf_agents.drivers import dynamic_episode_driver
tf.compat.v1.enable_v2_behavior()
The PyDriver
class takes a python environment, a python policy and a list of observers to update at each step. The main method is run()
, which steps the environment using actions from the policy until at least one of the following termination criteria is met: The number of steps reaches max_steps
or the number of episodes reaches max_episodes
.
The implementation is roughly as follows:
class PyDriver(object):
def __init__(self, env, policy, observers, max_steps=1, max_episodes=1):
self._env = env
self._policy = policy
self._observers = observers or []
self._max_steps = max_steps or np.inf
self._max_episodes = max_episodes or np.inf
def run(self, time_step, policy_state=()):
num_steps = 0
num_episodes = 0
while num_steps < self._max_steps and num_episodes < self._max_episodes:
# Compute an action using the policy for the given time_step
action_step = self._policy.action(time_step, policy_state)
# Apply the action to the environment and get the next step
next_time_step = self._env.step(action_step.action)
# Package information into a trajectory
traj = trajectory.Trajectory(
time_step.step_type,
time_step.observation,
action_step.action,
action_step.info,
next_time_step.step_type,
next_time_step.reward,
next_time_step.discount)
for observer in self._observers:
observer(traj)
# Update statistics to check termination
num_episodes += np.sum(traj.is_last())
num_steps += np.sum(~traj.is_boundary())
time_step = next_time_step
policy_state = action_step.state
return time_step, policy_state
Now, let us run through the example of running a random policy on the CartPole environment, saving the results to a replay buffer and computing some metrics.
In [0]:
env = suite_gym.load('CartPole-v0')
policy = random_py_policy.RandomPyPolicy(time_step_spec=env.time_step_spec(),
action_spec=env.action_spec())
replay_buffer = []
metric = py_metrics.AverageReturnMetric()
observers = [replay_buffer.append, metric]
driver = py_driver.PyDriver(
env, policy, observers, max_steps=20, max_episodes=1)
initial_time_step = env.reset()
final_time_step, _ = driver.run(initial_time_step)
print('Replay Buffer:')
for traj in replay_buffer:
print(traj)
print('Average Return: ', metric.result())
We also have drivers in TensorFlow which are functionally similar to Python drivers, but use TF environments, TF policies, TF observers etc. We currently have 2 TensorFlow drivers: DynamicStepDriver
, which terminates after a given number of (valid) environment steps and DynamicEpisodeDriver
, which terminates after a given number of episodes. Let us look at an example of the DynamicEpisode in action.
In [0]:
env = suite_gym.load('CartPole-v0')
tf_env = tf_py_environment.TFPyEnvironment(env)
tf_policy = random_tf_policy.RandomTFPolicy(action_spec=tf_env.action_spec(),
time_step_spec=tf_env.time_step_spec())
num_episodes = tf_metrics.NumberOfEpisodes()
env_steps = tf_metrics.EnvironmentSteps()
observers = [num_episodes, env_steps]
driver = dynamic_episode_driver.DynamicEpisodeDriver(
tf_env, tf_policy, observers, num_episodes=2)
# Initial driver.run will reset the environment and initialize the policy.
final_time_step, policy_state = driver.run()
print('final_time_step', final_time_step)
print('Number of Steps: ', env_steps.result().numpy())
print('Number of Episodes: ', num_episodes.result().numpy())
In [0]:
# Continue running from previous state
final_time_step, _ = driver.run(final_time_step, policy_state)
print('final_time_step', final_time_step)
print('Number of Steps: ', env_steps.result().numpy())
print('Number of Episodes: ', num_episodes.result().numpy())