Tutorial 11: Traffic Lights

This tutorial walks through how to add traffic lights to experiments. This tutorial will use the following files:

  • Experiment script for RL: examples/rllab/green_wave.py
  • Experiment script for non-RL: examples/sumo/grid.py
  • Scenario: grid.py (class SimpleGridScenario)
  • Environment for RL: green_wave_env.py (class TrafficLightGridEnv)
  • Environment for non-RL: loop_accel.py (class AccelEnv)

There are two main classes of traffic lights that Sumo supports: actuated and static traffic lights. This tutorial will cover both types. Over the course of this tutorial, we'll discuss 4 different types of traffic lights to introduce into your road network:

  1. Static Traffic Lights
  2. Actuated (baseline) Traffic Lights
  3. Actuated Traffic Lights
  4. RL Traffic Lights

Let's begin!

First, import all necessary classes.


In [ ]:
from flow.core.params import NetParams
from flow.scenarios.grid import SimpleGridScenario
from flow.core.params import TrafficLightParams
from flow.core.params import SumoParams, EnvParams, InitialConfig, NetParams, \
    InFlows, SumoCarFollowingParams
from flow.core.params import VehicleParams
import numpy as np

1. New parameters in additional_net_params

There are a few unique additions for the grid envs to additional_net_params to be aware of.

grid_array

grid_array passes information on the road network to the scenario, specifying the parameters you see above: row_num, col_num, inner_length, short_length, long_length, cars_top, cars_bot, cars_left, cars_right. This is required for any grid experiment.

tl_logic

tl_logic should be used for users who want to exert more control over individual traffic lights. tl_logic simply tells the env whether the traffic lights are controlled by RL or whether a default pattern or sumo actuation is to be used. Use "actuated" if you want SUMO to control the traffic lights.

For this tutorial, we will assume the following parameters for the grid_array, which specifies a grid network with 2 rows and 3 columns. traffic_lights should be set to True for every experiment in this tutorial.


In [ ]:
inner_length = 300
long_length = 500
short_length = 300
n = 2 # rows
m = 3 # columns
num_cars_left = 20
num_cars_right = 20
num_cars_top = 20
num_cars_bot = 20
tot_cars = (num_cars_left + num_cars_right) * m \
    + (num_cars_top + num_cars_bot) * n

grid_array = {"short_length": short_length, "inner_length": inner_length,
              "long_length": long_length, "row_num": n, "col_num": m,
              "cars_left": num_cars_left, "cars_right": num_cars_right,
              "cars_top": num_cars_top, "cars_bot": num_cars_bot}

2. Defining Traffic Light Phases

To start off, we define how Sumo represents traffic light phases. A phase is defined as the states that the traffic lights around an intersection can take. The typical four-way, traffic-light-controlled intersection is modeled by a string of length 12. Consider the phase "GrGr". Every letter in this phase string ("G", "r", "G", "r") corresponds to an edge in the intersection, in clockwise order. Explicitly, the northern and southern edges of the intersection both have a state of "G", where the eastern and western edges of the intersection both have a state of "r".

Sumo traffic lights are initiated to a default set of phases, and will not deviate from the phases provided in their configuration files. We will describe in this section how to define custom phases for traffic lights.

NOTE: If the API is used at any point to modify the traffic light state, i.e. functions such as setRedYellowGreenState, this will override the traffic light's default phase.

To do anything with traffic lights, you should interface with Flow's TrafficLightParams class

Once the TrafficLightParams class is instantiated, traffic lights can be added via the classes' add function. One prerequisite of using this function is knowing the node id of any node you intend to manipulate. This information is baked into the experiment's scenario class, as well as the experiment's nod.xml file. For the experiment we are using with 2 rows and 3 columns, there are 6 nodes: "center0" to "center5".

In this particular example, each of the 6 traffic light nodes corresponds to the same set of possible phases; in other words, at any time, each node will be at the same phase. You can, however, customize each traffic light node to have different phases.


In [ ]:
tl_logic = TrafficLightParams()

nodes = ["center0", "center1", "center2", "center3", "center4", "center5"]
phases = [{"duration": "31", "state": "GrGr"},
          {"duration": "6", "state": "yryr"},
          {"duration": "31", "state": "rGrG"},
          {"duration": "6", "state": "ryry"}]
for node_id in nodes:
    tl_logic.add(node_id, tls_type="static", programID="1", offset=None, phases=phases)

Following this step, the TrafficLightParams class should be passed into your scenario as element traffic_lights.


In [ ]:
additional_net_params = {"grid_array": grid_array, "speed_limit": 35,
                         "horizontal_lanes": 1, "vertical_lanes": 1,
                         "traffic_lights": True}
net_params = NetParams(additional_params=additional_net_params)

scenario = SimpleGridScenario(name="grid",
                              vehicles=VehicleParams(),
                              net_params=net_params,
                              initial_config=InitialConfig(),
                              traffic_lights=tl_logic)

That's it! The traffic light logic will be passed into Flow's internals, which will generate an additional-file containing all of the information needed to generate the traffic lights you specified in the simulation.

3. Using the Default Traffic Light Baseline

We have developed a traffic light baseline that can be used for any experiments on a grid. This baseline uses actuated traffic lights (section 5), and has been fine-tuned on many iterations of experiments with varying parameters. The actual parameters are located in the TrafficLightParams class under the getter function actuated_default(). For reference, these values are:


In [ ]:
tl_type = "actuated"
program_id = 1
max_gap = 3.0
detector_gap = 0.8
show_detectors = True
phases = [{"duration": "31", "minDur": "8", "maxDur": "45", "state": "GrGr"},
        {"duration": "6", "minDur": "3", "maxDur": "6", "state": "yryr"},
        {"duration": "31", "minDur": "8", "maxDur": "45", "state": "rGrG"},
        {"duration": "6", "minDur": "3", "maxDur": "6", "state": "ryry"}]

To view the baseline in action, simply initialize the TrafficLightParams class with the baseline argument set to True, and pass it into your additional_net_params. Nothing else needs to be done; no traffic lights need to be added.


In [ ]:
tl_logic = TrafficLightParams(baseline=True)
additional_net_params = {"grid_array": grid_array, 
                         "speed_limit": 35,
                         "horizontal_lanes": 1, 
                         "vertical_lanes": 1,
                         "traffic_lights": True, 
                         "tl_logic": tl_logic}

4. Static Traffic Lights

Static traffic lights are traffic lights with pre-defined phases. They cannot dynamically adjust according to traffic needs; they simply follow the same pattern repeatedly. To see static traffic lights in action, the TrafficLightParams object should be instantiated with baseline=False.

When adding individual traffic lights, the following parameters in addition to node_id are involved:

  • tls_type: [optional] Specifies actuated or static traffic lights, defaults to static
  • programID: [optional] The name for this traffic light program. It cannot be the same ID as your base program, which is 0, defaults to 10
  • offset: [optional] The initial time offset of the program

An example of adding one static traffic light to our system is as follows:


In [ ]:
tl_logic = TrafficLightParams(baseline=False)
phases = [{"duration": "31", "state": "GrGr"},
          {"duration": "6", "state": "yryr"},
          {"duration": "31", "state": "rGrG"},
          {"duration": "6", "state": "ryry"}]
tl_logic.add("center0", phases=phases, programID=1)

5. Actuated Traffic Lights

For more flexibility than the static traffic lights defined above, and more control than RL-controlled traffic lights, actuated traffic lights are a good option to consider.

As an excerpt from Sumo's documentation: "SUMO supports gap-based actuated traffic control This control scheme is common in Germany and works by prolonging traffic phases whenever a continuous stream of traffic is detected. It switches to the next phase after detecting a sufficent time gap between sucessive vehicles. This allows for better distribution of green-time among phases and also affects cycle duration in response to dynamic traffic conditions."

The difference between phases for static and actuated traffic lights is that actuated traffic light phases take in two additional parameters, minDur and maxDur, which describe the allowed range of time durations for each phase.

In addition to these sub-parameters of phases and all the required parameters of static of traffic lights, the following optional parameters are involved, and default to values set by Sumo:

  • maxGap: [optional] int, describes the maximum time gap between successive vehicle that will cause the current phase to be prolonged
  • detectorGap: [optional] int, determines the time distance between the (automatically generated) detector and the stop line in seconds (at each lanes maximum speed)
  • showDetectors: [optional] bool, toggles whether or not detectors are shown in sumo-gui
  • file: [optional] str, which file the detector shall write results into
  • freq: [optional] int, the period over which collected values shall be aggregated

An example of adding two actuated traffic lights to our system is as follows. The first specifies more custom control, while the second specifies minimal control.


In [ ]:
tl_logic = TrafficLightParams(baseline=False)
phases = [{"duration": "31", "minDur": "8", "maxDur": "45", "state": "GrGr"},
          {"duration": "6", "minDur": "3", "maxDur": "6", "state": "yryr"},
          {"duration": "31", "minDur": "8", "maxDur": "45", "state": "rGrG"},
          {"duration": "6", "minDur": "3", "maxDur": "6", "state": "ryry"}]

tl_logic.add("center1", 
             tls_type="actuated", 
             programID="1", 
             phases=phases, 
             maxGap=5.0, 
             detectorGap=0.9, 
             showDetectors=False)

tl_logic.add("center2",
             tls_type="actuated")

6. Controlling Your Traffic Lights via RL

This is where we switch from the grid.py experiment script to green_wave.py.

To control traffic lights via RL, no tl_logic element is necessary. This is because rllab is controlling all the parameters you were able to customize in the prior sections. Your additional_net_params should look something like this:


In [ ]:
additional_net_params = {"speed_limit": 35, "grid_array": grid_array,
                         "horizontal_lanes": 1, "vertical_lanes": 1,
                         "traffic_lights": True}

This will enable the program to recognize all nodes as traffic lights. The experiment then gives control to the environment; we are using TrafficLightGridEnv, which is an environment created for RL that applies RL-specified traffic light actions (e.g. change the state) via TraCI at each timestep.

This is all you need to run an RL experiment! It is worth taking a look at the TrafficLightGridEnv class to further understanding of the experiment internals. The rest of this tutorial is an optional walkthrough through the various components of TrafficLightGridEnv:

Keeping Track of Traffic Light State

Flow keeps track of the traffic light states (i.e. for each intersection, time elapsed since the last change, which direction traffic is flowing, and whether or not the traffic light is currently displaying yellow) in the following variables:


In [ ]:
# keeps track of the last time the traffic lights in an intersection were allowed to change (the last time the lights were allowed to change from a red-green state to a red-yellow state.).
self.last_change = np.zeros((self.rows * self.cols, 1))
# keeps track of the direction of the intersection (the direction that is currently being allowed to flow. 0 indicates flow from top to bottom, and 1 indicates flow from left to right.)
self.direction = np.zeros((self.rows * self.cols, 1))
# value of 0 indicates that the intersection is in a red-yellow state. 1 indicates that the intersection is in a red-green state.
self.currently_yellow = np.zeros((self.rows * self.cols, 1))
  • The variable self.last_change indicates the last time the lights were allowed to change from a red-green state to a red-yellow state.
  • The variable self.direction indicates the direction of the intersection, i.e. the direction that is currently being allowed to flow. 0 indicates flow from top to bottom, and 1 indicates flow from left to right.
  • The variable self.currently_yellow with a value of 0 indicates that the traffic light is in a red-yellow state. 1 indicates that the traffic light is in a red-green state.

self.last_change is contingent on an instance variable self.min_switch_time. This is a variable that can be set in additional_env_params with the key name "switch_time". Setting switch_time enables more control over the RL experiment by preventing traffic lights from switching until switch_time timesteps have occurred. In practice, this can be used to prevent flickering.


In [ ]:
additional_env_params = {"target_velocity": 50, "switch_time": 3.0}

Action Space

The action space for RL-controlled traffic lights directly matches the number of traffic lights in the system. Each traffic light node corresponds to one action. The action space is thus defined as:


In [ ]:
@property
def action_space(self):
    return Box(low=0, high=1, shape=(self.k.traffic_light.num_traffic_lights,),
               dtype=np.float32)

Observation Space

The existing observation space for our existing traffic lights experiments is designed to be a fully observable state space with these metrics in mind. For each vehicle, we want to know its velocity, its distance (in [unit]) from the next intersection, and the unique edge it is traveling on. For each traffic light, we want to know its current state (i.e. what direction it is flowing), when it last changed, and whether it was yellow. Recall that the traffic light states are encoded in self.min_switch_time.


In [ ]:
@property
def observation_space(self):
    speed = Box(low=0, high=1, shape=(self.initial_vehicles.num_vehicles,),
                dtype=np.float32)
    dist_to_intersec = Box(low=0., high=np.inf,
                           shape=(self.initial_vehicles.num_vehicles,),
                           dtype=np.float32)
    edge_num = Box(low=0., high=1, shape=(self.initial_vehicles.num_vehicles,),
                   dtype=np.float32)
    traffic_lights = Box(low=0., high=np.inf,
                         shape=(3 * self.rows * self.cols,),
                         dtype=np.float32)
    return Tuple((speed, dist_to_intersec, edge_num, traffic_lights))

State Space

The state space collects the information that the observation_space specifies. There are helper functions that exist in the TrafficLightGridEnv to construct the state space.


In [ ]:
def get_state(self):
    # compute the normalizers
    max_speed = self.k.scenario.max_speed()
    grid_array = self.net_params.additional_params["grid_array"]
    max_dist = max(grid_array["short_length"], 
                   grid_array["long_length"],
                   grid_array["inner_length"])

    # get the state arrays
    speeds = [self.k.vehicle.get_speed(veh_id) / max_speed for veh_id in
              self.k.vehicle.get_ids()]
    dist_to_intersec = [self.get_distance_to_intersection(veh_id)/max_dist
                        for veh_id in self.k.vehicle.get_ids()]
    edges = [self._convert_edge(self.k.vehicle.get_edge(veh_id)) / (
        self.k.scenario.num_edges - 1) for veh_id in self.k.vehicle.get_ids()]

    state = [speeds, dist_to_intersec, edges,
             self.last_change.flatten().tolist()]
    return np.array(state)

In [ ]: