This tutorial walks you through the process of creating custom environments in Flow. Custom environments contain specific methods that define the problem space of a task, such as the state and action spaces of the RL agent and the signal (or reward) that the RL algorithm will optimize over. By specifying a few methods within a custom environment, individuals can use Flow to design traffic control tasks of various types, such as optimal traffic light signal timing and flow regulation via mixed autonomy traffic (see the figures below). Finally, these environments are compatible with OpenAI Gym.
The rest of the tutorial is organized as follows: in section 1 walks through the process of creating an environment for mixed autonomy vehicle control where the autonomous vehicles perceive all vehicles in the network, and section two implements the environment in simulation.
In this exercise we will create an environment in which the accelerations of a handful of vehicles in the network are specified by a single centralized agent, with the objective of the agent being to improve the average speed of all vehicle in the network. In order to create this environment, we begin by inheriting the base environment class located in flow.envs:
In [1]:
# import the base environment class
from flow.envs import Env
# define the environment class, and inherit properties from the base environment class
class myEnv(Env):
pass
Env
provides the interface for running and modifying a SUMO simulation. Using this class, we are able to start sumo, provide a scenario to specify a configuration and controllers, perform simulation steps, and reset the simulation to an initial configuration.
By inheriting Flow's base environment, a custom environment for varying control tasks can be created by adding the following functions to the child class:
Each of these components are covered in the next few subsections.
The features used to parametrize components of the state/action space as well as the reward function are specified within the EnvParams
input, as discussed in tutorial 1. Specifically, for the sake of our environment, the additional_params
attribute within EnvParams
will be responsible for storing information on the maximum possible accelerations and decelerations by the autonomous vehicles in the network. Accordingly, for this problem, we define an ADDITIONAL_ENV_PARAMS
variable of the form:
In [2]:
ADDITIONAL_ENV_PARAMS = {
"max_accel": 1,
"max_decel": 1,
}
All environments presented in Flow provide a unique ADDITIONAL_ENV_PARAMS
component containing the information needed to properly define some environment-specific parameters. We assume that these values are always provided by the user, and accordingly can be called from env_params
. For example, if we would like to call the "max_accel" parameter, we simply type:
max_accel = env_params.additional_params["max_accel"]
The action_space
method defines the number and bounds of the actions provided by the RL agent. In order to define these bounds with an OpenAI gym setting, we use several objects located within gym.spaces. For instance, the Box
object is used to define a bounded array of values in $\mathbb{R}^n$.
In [3]:
from gym.spaces.box import Box
In addition, Tuple
objects (not used by this exercise) allow users to combine multiple Box
elements together.
In [4]:
from gym.spaces.tuple_space import Tuple
Once we have imported the above objects, we are ready to define the bounds of our action space. Given that our actions consist of a list of n real numbers (where n is the number of autonomous vehicles) bounded from above and below by "max_accel" and "max_decel" respectively (see section 1.1), we can define our action space as follows:
In [5]:
class myEnv(myEnv):
@property
def action_space(self):
num_actions = self.initial_vehicles.num_rl_vehicles
accel_ub = self.env_params.additional_params["max_accel"]
accel_lb = - abs(self.env_params.additional_params["max_decel"])
return Box(low=accel_lb,
high=accel_ub,
shape=(num_actions,))
The observation space of an environment represents the number and types of observations that are provided to the reinforcement learning agent. For this example, we will be observe two values for each vehicle: its position and speed. Accordingly, we need a observation space that is twice the size of the number of vehicles in the network.
In [6]:
class myEnv(myEnv): # update my environment class
@property
def observation_space(self):
return Box(
low=0,
high=float("inf"),
shape=(2*self.initial_vehicles.num_vehicles,),
)
The function apply_rl_actions
is responsible for transforming commands specified by the RL agent into actual actions performed within the simulator. The vehicle kernel within the environment class contains several helper methods that may be of used to facilitate this process. These functions include:
For our example we consider a situation where the RL agent can only specify accelerations for the RL vehicles; accordingly, the actuation method for the RL agent is defined as follows:
In [7]:
class myEnv(myEnv): # update my environment class
def _apply_rl_actions(self, rl_actions):
# the names of all autonomous (RL) vehicles in the network
rl_ids = self.k.vehicle.get_rl_ids()
# use the base environment method to convert actions into accelerations for the rl vehicles
self.k.vehicle.apply_acceleration(rl_ids, rl_actions)
The get_state
method extracts features from within the environments and provides then as inputs to the policy provided by the RL agent. Several helper methods exist within flow to help facilitate this process. Some useful helper method can be accessed from the following objects:
In order to model global observability within the network, our state space consists of the speeds and positions of all vehicles (as mentioned in section 1.3). This is implemented as follows:
In [8]:
import numpy as np
class myEnv(myEnv): # update my environment class
def get_state(self, **kwargs):
# the get_ids() method is used to get the names of all vehicles in the network
ids = self.k.vehicle.get_ids()
# we use the get_absolute_position method to get the positions of all vehicles
pos = [self.k.vehicle.get_x_by_id(veh_id) for veh_id in ids]
# we use the get_speed method to get the velocities of all vehicles
vel = [self.k.vehicle.get_speed(veh_id) for veh_id in ids]
# the speeds and positions are concatenated to produce the state
return np.concatenate((pos, vel))
The compute_reward
method returns the reward associated with any given state. These value may encompass returns from values within the state space (defined in section 1.5) or may contain information provided by the environment but not immediately available within the state, as is the case in partially observable tasks (or POMDPs).
For this exercise, we choose the reward function to be the average speed of all vehicles currently in the network. In order to extract this information from the environment, we use the get_speed
method within the Vehicle kernel class to collect the current speed of all vehicles in the network, and return the average of these speeds as the reward. This is done as follows:
In [9]:
import numpy as np
class myEnv(myEnv): # update my environment class
def compute_reward(self, rl_actions, **kwargs):
# the get_ids() method is used to get the names of all vehicles in the network
ids = self.k.vehicle.get_ids()
# we next get a list of the speeds of all vehicles in the network
speeds = self.k.vehicle.get_speed(ids)
# finally, we return the average of all these speeds as the reward
return np.mean(speeds)
Now that we have successfully created our new environment, we are ready to test this environment in simulation. We begin by running this environment in a non-RL based simulation. The return provided at the end of the simulation is indicative of the cumulative expected reward when jam-like behavior exists within the netowrk.
In [10]:
from flow.controllers import IDMController, ContinuousRouter
from flow.core.experiment import Experiment
from flow.core.params import SumoParams, EnvParams, \
InitialConfig, NetParams
from flow.core.params import VehicleParams
from flow.scenarios.loop import LoopScenario, ADDITIONAL_NET_PARAMS
sumo_params = SumoParams(sim_step=0.1, render=True)
vehicles = VehicleParams()
vehicles.add(veh_id="idm",
acceleration_controller=(IDMController, {}),
routing_controller=(ContinuousRouter, {}),
num_vehicles=22)
env_params = EnvParams(additional_params=ADDITIONAL_ENV_PARAMS)
additional_net_params = ADDITIONAL_NET_PARAMS.copy()
net_params = NetParams(additional_params=additional_net_params)
initial_config = InitialConfig(bunching=20)
scenario = LoopScenario(name="sugiyama",
vehicles=vehicles,
net_params=net_params,
initial_config=initial_config)
#############################################################
######## using my new environment for the simulation ########
#############################################################
env = myEnv(env_params, sumo_params, scenario)
#############################################################
exp = Experiment(env)
_ = exp.run(1, 1500)
Next, we wish to train this environment in the presence of the autonomous vehicle agent to reduce the formation of waves in the network, thereby pushing the performance of vehicles in the network past the above expected return.
In order for an environment to be trainable in either RLLib for rllab (as we have shown in tutorials 2 and 3), the environment must be acccessable via import from flow.envs. In order to do so, copy the above envrionment onto a .py and import the environment in flow.envs.__init__.py
. You can ensure that the process was successful by running the following command:
In [ ]:
# NOTE: only runs if the above procedure have been performed
from flow.envs import myEnv
Once this is done, the below code block may be used to train the above environment using the Trust Region Policy Optimization (TRPO) algorithm provided by rllab. We do not recommend training this environment to completion within a jupyter notebook setting; however, once training is complete, visualization of the resulting policy should show that the autonomous vehicle learns to dissipate the formation and propagation of waves in the network.
In [ ]:
from rllab.envs.normalized_env import normalize
from rllab.misc.instrument import run_experiment_lite
from rllab.algos.trpo import TRPO
from rllab.baselines.linear_feature_baseline import LinearFeatureBaseline
from rllab.policies.gaussian_gru_policy import GaussianGRUPolicy
from flow.scenarios.loop import LoopScenario
from flow.controllers import RLController, IDMController, ContinuousRouter
from flow.core.params import VehicleParams
from flow.core.params import SumoParams, EnvParams, NetParams, InitialConfig
from rllab.envs.gym_env import GymEnv
HORIZON = 1500
def run_task(*_):
sumo_params = SumoParams(sim_step=0.1, render=False)
vehicles = VehicleParams()
vehicles.add(veh_id="rl",
acceleration_controller=(RLController, {}),
routing_controller=(ContinuousRouter, {}),
num_vehicles=1)
vehicles.add(veh_id="idm",
acceleration_controller=(IDMController, {}),
routing_controller=(ContinuousRouter, {}),
num_vehicles=21)
env_params = EnvParams(horizon=HORIZON,
additional_params=ADDITIONAL_ENV_PARAMS)
additional_net_params = ADDITIONAL_NET_PARAMS.copy()
net_params = NetParams(additional_params=additional_net_params)
initial_config = InitialConfig(bunching=20)
scenario = LoopScenario(name="sugiyama-training",
vehicles=vehicles,
net_params=net_params,
initial_config=initial_config)
#######################################################
######## using my new environment for training ########
#######################################################
env_name = "myEnv"
#######################################################
pass_params = (env_name, sumo_params, vehicles, env_params, net_params,
initial_config, scenario)
env = GymEnv(env_name, record_video=False, register_params=pass_params)
horizon = env.horizon
env = normalize(env)
policy = GaussianGRUPolicy(
env_spec=env.spec,
hidden_sizes=(5,),
)
baseline = LinearFeatureBaseline(env_spec=env.spec)
algo = TRPO(
env=env,
policy=policy,
baseline=baseline,
batch_size=30000,
max_path_length=horizon,
n_itr=500,
discount=0.999,
)
algo.train(),
exp_tag = "stabilizing-the-ring"
for seed in [5]: # , 20, 68]:
run_experiment_lite(
run_task,
n_parallel=1,
snapshot_mode="all",
seed=seed,
mode="local",
exp_prefix=exp_tag,
)
In [ ]: