Tutorial 09: Creating Custom Environments

This tutorial walks you through the process of creating custom environments in Flow. Custom environments contain specific methods that define the problem space of a task, such as the state and action spaces of the RL agent and the signal (or reward) that the RL algorithm will optimize over. By specifying a few methods within a custom environment, individuals can use Flow to design traffic control tasks of various types, such as optimal traffic light signal timing and flow regulation via mixed autonomy traffic (see the figures below). Finally, these environments are compatible with OpenAI Gym.

The rest of the tutorial is organized as follows: in section 1 walks through the process of creating an environment for mixed autonomy vehicle control where the autonomous vehicles perceive all vehicles in the network, and section two implements the environment in simulation.

1. Creating an Environment Class

In this exercise we will create an environment in which the accelerations of a handful of vehicles in the network are specified by a single centralized agent, with the objective of the agent being to improve the average speed of all vehicle in the network. In order to create this environment, we begin by inheriting the base environment class located in flow.envs:


In [1]:
# import the base environment class
from flow.envs import Env

# define the environment class, and inherit properties from the base environment class
class myEnv(Env):
    pass

Env provides the interface for running and modifying a SUMO simulation. Using this class, we are able to start sumo, provide a scenario to specify a configuration and controllers, perform simulation steps, and reset the simulation to an initial configuration.

By inheriting Flow's base environment, a custom environment for varying control tasks can be created by adding the following functions to the child class:

  • action_space
  • observation_space
  • apply_rl_actions
  • get_state
  • compute_reward

Each of these components are covered in the next few subsections.

1.1 ADDITIONAL_ENV_PARAMS

The features used to parametrize components of the state/action space as well as the reward function are specified within the EnvParams input, as discussed in tutorial 1. Specifically, for the sake of our environment, the additional_params attribute within EnvParams will be responsible for storing information on the maximum possible accelerations and decelerations by the autonomous vehicles in the network. Accordingly, for this problem, we define an ADDITIONAL_ENV_PARAMS variable of the form:


In [2]:
ADDITIONAL_ENV_PARAMS = {
    "max_accel": 1,
    "max_decel": 1,
}

All environments presented in Flow provide a unique ADDITIONAL_ENV_PARAMS component containing the information needed to properly define some environment-specific parameters. We assume that these values are always provided by the user, and accordingly can be called from env_params. For example, if we would like to call the "max_accel" parameter, we simply type:

max_accel = env_params.additional_params["max_accel"]

1.2 action_space

The action_space method defines the number and bounds of the actions provided by the RL agent. In order to define these bounds with an OpenAI gym setting, we use several objects located within gym.spaces. For instance, the Box object is used to define a bounded array of values in $\mathbb{R}^n$.


In [3]:
from gym.spaces.box import Box

In addition, Tuple objects (not used by this exercise) allow users to combine multiple Box elements together.


In [4]:
from gym.spaces.tuple_space import Tuple

Once we have imported the above objects, we are ready to define the bounds of our action space. Given that our actions consist of a list of n real numbers (where n is the number of autonomous vehicles) bounded from above and below by "max_accel" and "max_decel" respectively (see section 1.1), we can define our action space as follows:


In [5]:
class myEnv(myEnv):

    @property
    def action_space(self):
        num_actions = self.initial_vehicles.num_rl_vehicles
        accel_ub = self.env_params.additional_params["max_accel"]
        accel_lb = - abs(self.env_params.additional_params["max_decel"])

        return Box(low=accel_lb,
                   high=accel_ub,
                   shape=(num_actions,))

1.3 observation_space

The observation space of an environment represents the number and types of observations that are provided to the reinforcement learning agent. For this example, we will be observe two values for each vehicle: its position and speed. Accordingly, we need a observation space that is twice the size of the number of vehicles in the network.


In [6]:
class myEnv(myEnv):  # update my environment class

    @property
    def observation_space(self):
        return Box(
            low=0,
            high=float("inf"),
            shape=(2*self.initial_vehicles.num_vehicles,),
        )

1.4 apply_rl_actions

The function apply_rl_actions is responsible for transforming commands specified by the RL agent into actual actions performed within the simulator. The vehicle kernel within the environment class contains several helper methods that may be of used to facilitate this process. These functions include:

  • apply_acceleration (list of str, list of float) -> None: converts an action, or a list of actions, into accelerations to the specified vehicles (in simulation)
  • apply_lane_change (list of str, list of {-1, 0, 1}) -> None: converts an action, or a list of actions, into lane change directions for the specified vehicles (in simulation)
  • choose_route (list of str, list of list of str) -> None: converts an action, or a list of actions, into rerouting commands for the specified vehicles (in simulation)

For our example we consider a situation where the RL agent can only specify accelerations for the RL vehicles; accordingly, the actuation method for the RL agent is defined as follows:


In [7]:
class myEnv(myEnv):  # update my environment class

    def _apply_rl_actions(self, rl_actions):
        # the names of all autonomous (RL) vehicles in the network
        rl_ids = self.k.vehicle.get_rl_ids()

        # use the base environment method to convert actions into accelerations for the rl vehicles
        self.k.vehicle.apply_acceleration(rl_ids, rl_actions)

1.5 get_state

The get_state method extracts features from within the environments and provides then as inputs to the policy provided by the RL agent. Several helper methods exist within flow to help facilitate this process. Some useful helper method can be accessed from the following objects:

  • self.k.vehicle: provides current state information for all vehicles within the network
  • self.k.traffic_light: provides state information on the traffic lights
  • self.k.scenario: information on the scenario, which unlike the vehicles and traffic lights is static
  • More accessor objects and methods can be found within the Flow documentation at: http://berkeleyflow.readthedocs.io/en/latest/

In order to model global observability within the network, our state space consists of the speeds and positions of all vehicles (as mentioned in section 1.3). This is implemented as follows:


In [8]:
import numpy as np

class myEnv(myEnv):  # update my environment class

    def get_state(self, **kwargs):
        # the get_ids() method is used to get the names of all vehicles in the network
        ids = self.k.vehicle.get_ids()

        # we use the get_absolute_position method to get the positions of all vehicles
        pos = [self.k.vehicle.get_x_by_id(veh_id) for veh_id in ids]

        # we use the get_speed method to get the velocities of all vehicles
        vel = [self.k.vehicle.get_speed(veh_id) for veh_id in ids]

        # the speeds and positions are concatenated to produce the state
        return np.concatenate((pos, vel))

1.6 compute_reward

The compute_reward method returns the reward associated with any given state. These value may encompass returns from values within the state space (defined in section 1.5) or may contain information provided by the environment but not immediately available within the state, as is the case in partially observable tasks (or POMDPs).

For this exercise, we choose the reward function to be the average speed of all vehicles currently in the network. In order to extract this information from the environment, we use the get_speed method within the Vehicle kernel class to collect the current speed of all vehicles in the network, and return the average of these speeds as the reward. This is done as follows:


In [9]:
import numpy as np

class myEnv(myEnv):  # update my environment class

    def compute_reward(self, rl_actions, **kwargs):
        # the get_ids() method is used to get the names of all vehicles in the network
        ids = self.k.vehicle.get_ids()

        # we next get a list of the speeds of all vehicles in the network
        speeds = self.k.vehicle.get_speed(ids)

        # finally, we return the average of all these speeds as the reward
        return np.mean(speeds)

2. Testing the New Environment

2.1 Testing in Simulation

Now that we have successfully created our new environment, we are ready to test this environment in simulation. We begin by running this environment in a non-RL based simulation. The return provided at the end of the simulation is indicative of the cumulative expected reward when jam-like behavior exists within the netowrk.


In [10]:
from flow.controllers import IDMController, ContinuousRouter
from flow.core.experiment import Experiment
from flow.core.params import SumoParams, EnvParams, \
    InitialConfig, NetParams
from flow.core.params import VehicleParams
from flow.scenarios.loop import LoopScenario, ADDITIONAL_NET_PARAMS

sumo_params = SumoParams(sim_step=0.1, render=True)

vehicles = VehicleParams()
vehicles.add(veh_id="idm",
             acceleration_controller=(IDMController, {}),
             routing_controller=(ContinuousRouter, {}),
             num_vehicles=22)

env_params = EnvParams(additional_params=ADDITIONAL_ENV_PARAMS)

additional_net_params = ADDITIONAL_NET_PARAMS.copy()
net_params = NetParams(additional_params=additional_net_params)

initial_config = InitialConfig(bunching=20)

scenario = LoopScenario(name="sugiyama",
                        vehicles=vehicles,
                        net_params=net_params,
                        initial_config=initial_config)

#############################################################
######## using my new environment for the simulation ########
#############################################################
env = myEnv(env_params, sumo_params, scenario)
#############################################################

exp = Experiment(env)

_ = exp.run(1, 1500)


Round 0, return: 4253.8940561797535
Average, std return: 4253.8940561797535, 0.0
Average, std speed: 2.8359293707865048, 0.0
Closing connection to TraCI and stopping simulation.
Note, this may print an error message when it closes.

2.2 Training the New Environment

Next, we wish to train this environment in the presence of the autonomous vehicle agent to reduce the formation of waves in the network, thereby pushing the performance of vehicles in the network past the above expected return.

In order for an environment to be trainable in either RLLib for rllab (as we have shown in tutorials 2 and 3), the environment must be acccessable via import from flow.envs. In order to do so, copy the above envrionment onto a .py and import the environment in flow.envs.__init__.py. You can ensure that the process was successful by running the following command:


In [ ]:
# NOTE: only runs if the above procedure have been performed
from flow.envs import myEnv

Once this is done, the below code block may be used to train the above environment using the Trust Region Policy Optimization (TRPO) algorithm provided by rllab. We do not recommend training this environment to completion within a jupyter notebook setting; however, once training is complete, visualization of the resulting policy should show that the autonomous vehicle learns to dissipate the formation and propagation of waves in the network.


In [ ]:
from rllab.envs.normalized_env import normalize
from rllab.misc.instrument import run_experiment_lite
from rllab.algos.trpo import TRPO
from rllab.baselines.linear_feature_baseline import LinearFeatureBaseline
from rllab.policies.gaussian_gru_policy import GaussianGRUPolicy

from flow.scenarios.loop import LoopScenario
from flow.controllers import RLController, IDMController, ContinuousRouter
from flow.core.params import VehicleParams
from flow.core.params import SumoParams, EnvParams, NetParams, InitialConfig
from rllab.envs.gym_env import GymEnv

HORIZON = 1500


def run_task(*_):
    sumo_params = SumoParams(sim_step=0.1, render=False)

    vehicles = VehicleParams()
    vehicles.add(veh_id="rl",
                 acceleration_controller=(RLController, {}),
                 routing_controller=(ContinuousRouter, {}),
                 num_vehicles=1)
    vehicles.add(veh_id="idm",
                 acceleration_controller=(IDMController, {}),
                 routing_controller=(ContinuousRouter, {}),
                 num_vehicles=21)

    env_params = EnvParams(horizon=HORIZON,
                           additional_params=ADDITIONAL_ENV_PARAMS)

    additional_net_params = ADDITIONAL_NET_PARAMS.copy()
    net_params = NetParams(additional_params=additional_net_params)

    initial_config = InitialConfig(bunching=20)

    scenario = LoopScenario(name="sugiyama-training",
                            vehicles=vehicles,
                            net_params=net_params,
                            initial_config=initial_config)

    #######################################################
    ######## using my new environment for training ########
    #######################################################
    env_name = "myEnv"
    #######################################################
    pass_params = (env_name, sumo_params, vehicles, env_params, net_params,
                   initial_config, scenario)

    env = GymEnv(env_name, record_video=False, register_params=pass_params)
    horizon = env.horizon
    env = normalize(env)

    policy = GaussianGRUPolicy(
        env_spec=env.spec,
        hidden_sizes=(5,),
    )

    baseline = LinearFeatureBaseline(env_spec=env.spec)

    algo = TRPO(
        env=env,
        policy=policy,
        baseline=baseline,
        batch_size=30000,
        max_path_length=horizon,
        n_itr=500,
        discount=0.999,
    )
    algo.train(),


exp_tag = "stabilizing-the-ring"

for seed in [5]:  # , 20, 68]:
    run_experiment_lite(
        run_task,
        n_parallel=1,
        snapshot_mode="all",
        seed=seed,
        mode="local",
        exp_prefix=exp_tag,
    )

In [ ]: