Tutorial 10: Custom Controllers

This tutorial walks through the process of defining controllers for the lateral and longitudinal movement of human-driven vehicles within a network. Such controllers may be necessary in order to model types of human behavior not already supported in SUMO. Controllers can be defined by adding to the existing controllers defined in the directory flow/controllers/.

Here, we will discuss Flow's BaseController class and then build two controllers: a longitudinal Intelligent Driver Model controller [CITE] and a lateral controller that attempts to move all vehicles into the same lane.

When adding a custom controller, ensure changes are reflected in flow/controllers/__init__.py under the import statements as well as in the list __all__.

1 Longitudinal Controller

1.1 BaseController

Flow's BaseController class is an abstract class to use when implementing longitudinal controllers. It includes failsafe methods and the get_action method called by Flow's core.base_env module. get_action adds noise to actions and runs failsafes, if specified. BaseController does not implement get_accel; that method should be implemented in any controllers that are subclasses of BaseController.

As such, any longitudinal controller must import BaseController. We also import NumPy in order to use some mathematical functions.


In [ ]:
import numpy as np

from flow.controllers.base_controller import BaseController

1.2 Controller Initialization

Here we initialize an IDM controller class and the __init__ function storing class attributes.

The Intelligent Driver Model is a car-following model specifying vehicle dynamics by a differential equation for acceleration $\dot{v}$. The differential equation follows:

$$\dot{v} = a \left[ 1- \left( \frac{v}{v_0} \right)^\delta -\left( \frac{s^*}{h} \right)^2 \right] \textbf{, with } \ s^* := s_0 + \max \left( 0, vT + \frac{v\Delta v}{2\sqrt{ab}} \right)$$

The IDM parameters are: desired speed $v_0$, time gap $T$, min gap $s_0$, acceleration exponent $\delta$, acceleration term $a$, and comfortable deceleration $b$. $h$ is the vehicle headway (the distance to the vehicle ahead) and $\Delta v$ is the velocity difference compared to the lead vehicle (current velocity - lead velocity).


In [ ]:
class IDMController(BaseController):
    def __init__(self, veh_id, v0=30, T=1, a=1, b=1.5, 
                 delta=4, s0=2, s1=0, time_delay=0.0, 
                 dt=0.1, noise=0, fail_safe=None, car_following_params=None):
        """
        veh_id: str
            unique vehicle identifier
        car_following_params: SumoCarFollowingParams
            see parent class
        v0: float, optional
            desirable velocity, in m/s (default: 30)
        T: float, optional
            safe time headway, in s (default: 1)
        b: float, optional
            comfortable deceleration, in m/s2 (default: 1.5)
        delta: float, optional
            acceleration exponent (default: 4)
        s0: float, optional
            linear jam distance, in m (default: 2)
        s1: float, optional
            nonlinear jam distance, in m (default: 0)
        dt: float, optional
            timestep, in s (default: 0.1)
        noise: float, optional
            std dev of normal perturbation to the acceleration (default: 0)
        fail_safe: str, optional
            type of flow-imposed failsafe the vehicle should posses, defaults
            to no failsafe (None)
        """
        
        BaseController.__init__(self, veh_id, car_following_params,
                                delay=time_delay, fail_safe=fail_safe,
                                noise=noise)
        self.v0 = v0
        self.T = T
        self.a = a
        self.b = b
        self.delta = delta
        self.s0 = s0
        self.s1 = s1
        self.dt = dt

1.3 Acceleration Command

Next, we implement the acceleration equation specified by IDM:

$$\dot{v} = a \left[ 1- \left( \frac{v}{v_0} \right)^\delta -\left( \frac{s^*}{h} \right)^2 \right] \textbf{, with } \ s^* := s_0 + \max \left( 0, vT + \frac{v\Delta v}{2\sqrt{ab}} \right)$$

The vehicle's velocity v is fetched by getter method get_speed of the environment's vehicles object, as is the id of the lead vehicle lead_id and headway h.

Input-checking to ensure that overly small headways are not used is performed, as well as a step to set $s^*$ properly when no car is ahead of the vehicle being controlled. If there is a lead vehicle, $s^*$ is calculated as described, and then the IDM acceleration is returned.


In [ ]:
class IDMController(BaseController):
    def __init__(self, veh_id, v0=30, T=1, a=1, b=1.5, 
                 delta=4, s0=2, s1=0, time_delay=0.0, 
                 dt=0.1, noise=0, fail_safe=None, car_following_params=None):
        """Docstring eliminated here for brevity"""
        BaseController.__init__(self, veh_id, car_following_params,
                                delay=time_delay, fail_safe=fail_safe,
                                noise=noise)
        self.v0 = v0
        self.T = T
        self.a = a
        self.b = b
        self.delta = delta
        self.s0 = s0
        self.s1 = s1
        self.dt = dt

        
    ##### Below this is new code #####
    def get_accel(self, env):
        v = env.k.vehicle.get_speed(self.veh_id)
        lead_id = env.k.vehicle.get_leader(self.veh_id)
        h = env.k.vehicle.get_headway(self.veh_id)

        # negative headways may be registered by sumo at intersections/
        # junctions. Setting them to 0 causes vehicles to not move; therefore,
        # we maintain these negative headways to let sumo control the dynamics
        # as it sees fit at these points.
        if abs(h) < 1e-3:
            h = 1e-3

        if lead_id is None or lead_id == '':  # no car ahead
            s_star = 0
        else:
            lead_vel = env.k.vehicle.get_speed(lead_id)
            s_star = self.s0 + max(
                0,
                v * self.T + v*(v-lead_vel) / (2*np.sqrt(self.a*self.b)))

        return self.a * (1 - (v/self.v0)**self.delta - (s_star/h)**2)

2 Lateral Controller

2.1 BaseLaneChangeController

In this section we will implement a lane-change controller that sends lane-change commands to move a vehicle into lane 2. Flow includes a BaseLaneChangeController abstract class that functions similarly to the BaseController class, implementing safety-checking utility methods for control.

First, we import the BaseLaneChangeController object and define a lane-change controller class, but leave method definition until the next step.


In [ ]:
from flow.controllers.base_lane_changing_controller import BaseLaneChangeController

class LaneZeroController(BaseLaneChangeController):
    """A lane-changing model used to move vehicles into lane 0."""
    pass

2.2 Lane-Change Command

Lane-change controllers must implement the method get_lane_change_action. Actions in Flow are specified as directions, which are a number out of [-1, 0, 1]. Lane 0 is the farthest-right, so the direction -1 is a lane change to the right.

This get_lane_change_action implementation fetches the current lane the vehicle is in, using the get_lane method of the Vehicles object and passing in self.veh_id. If the vehicle is in a lane different from lane 0, it must have a lane number above 0, since lane numbers are positive in SUMO. In that case, a lane-change to the right is specified by returning the direction -1. If the vehicle is in lane 0, then the direction 0 is returned.


In [ ]:
class LaneZeroController(BaseLaneChangeController):
    """A lane-changing model used to move vehicles into lane 0."""

    ##### Below this is new code #####
    def get_lane_change_action(self, env):
        current_lane = env.k.vehicle.get_lane(self.veh_id)
        if current_lane > 0:
            return -1
        else:
            return 0