This tutorial walks you through the process of generating custom scenarios. Scenarios define the network geometry of a task, as well as the constituents of the network, e.g. vehicles, traffic lights, etc... Various scenarios are available in Flow, depicting a diverse set of open and closed traffic networks such as ring roads, intersections/grids, straight highway merges, and more.
In this exercise, we will recreate the ring road network, seen in the figure below.
In order to recreate this scenario, we will design a scenario class. This class creates the configuration files needed to produce a transportation network within the simulator. It also specifies the location of edge nodes in the network, as well as the positioning of vehicles at the start of a run.
We begin by creating a class that inherits the methods of Flow's base scenario class. The separate methods are filled in in later sections.
In [ ]:
# import Flow's base scenario class
from flow.scenarios import Scenario
# define the scenario class, and inherit properties from the base scenario class
class myScenario(Scenario):
pass
The rest of the tutorial is organized as follows: sections 1 and 2 walk through the steps needed to specify custom traffic network geometry features and auxiliary features, respectively, while section 3 implements the new scenario in a simulation for visualization and testing purposes.
One of the core responsibilities of the scenario class is to to generate the necessary xml files needed to initialize a sumo instance. These xml files describe specific network features such as the position and directions of nodes and edges (see the figure above). Once the base scenario has been inherited, specifying these features becomes very systematic. All child classes are required to define at least the following three methods:
Additionally, the following optional functions may also be defined:
All of the functions mentioned above paragraph take in as input net_params
, and output a list of dictionary elements, with each element providing the attributes of the component to be specified.
This tutorial will cover the first three methods. For examples of specify_types
and specify_routes
, refer to source code located in flow/scenarios/loop.py
and flow/scenarios/bridge_toll.py
, respectively.
The features used to parametrize the network are specified within the NetParams
input, as discussed in tutorial 1. Specifically, for the sake of our network, the additional_params
attribute within NetParams
will be responsible for storing information on the radius, number of lanes, and speed limit within each lane, as seen in the figure above. Accordingly, for this problem, we define an ADDITIONAL_NET_PARAMS
variable of the form:
In [ ]:
ADDITIONAL_NET_PARAMS = {
"radius": 40,
"num_lanes": 1,
"speed_limit": 30,
}
All scenarios presented in Flow provide a unique ADDITIONAL_NET_PARAMS
component containing the information needed to properly define the network parameters of the scenario. We assume that these values are always provided by the user, and accordingly can be called from net_params
. For example, if we would like to call the "radius" parameter, we simply type:
radius = net_params.additional_params["radius"]
The nodes of a network are the positions of a select few points in the network. These points are connected together using edges (see section 1.4). In order to specify the location of the nodes that will be placed in the network, the function specify_nodes
is used. This method returns a list of dictionary elements, where each dictionary depicts the attributes of a single node. These node attributes include:
Refering to the figure at the top of this tutorial, we specify four nodes at the bottom (0,-r), top (0,r), left (-r,0), and right (0,r) of the ring. This is done as follows:
In [ ]:
class myScenario(myScenario): # update my scenario class
def specify_nodes(self, net_params):
# one of the elements net_params will need is a "radius" value
r = net_params.additional_params["radius"]
# specify the name and position (x,y) of each node
nodes = [{"id": "bottom", "x": 0, "y": -r},
{"id": "right", "x": r, "y": 0},
{"id": "top", "x": 0, "y": r},
{"id": "left", "x": -r, "y": 0}]
return nodes
Once the nodes are specified, the nodes are linked together using directed edges. This done through the specify_edges
method which, similar to specify_nodes
, returns a list of dictionary elements, with each dictionary specifying the attributes of a single edge. The attributes include:
One useful additional attribute is shape, which specifies the shape of the edge connecting the two nodes. The shape consists of a series of subnodes (internal to sumo) that are connected together by straight lines to create a curved edge. If no shape is specified, the nodes are connected by a straight line. This attribute will be needed to create the circular arcs between the nodes in the system.
We now create four arcs connected the nodes specified in section 1.2, with the direction of the edges directed counter-clockwise:
In [ ]:
# some mathematical operations that may be used
from numpy import pi, sin, cos, linspace
class myScenario(myScenario): # update my scenario class
def specify_edges(self, net_params):
r = net_params.additional_params["radius"]
edgelen = r * pi / 2
# this will let us control the number of lanes in the network
lanes = net_params.additional_params["num_lanes"]
# speed limit of vehicles in the network
speed_limit = net_params.additional_params["speed_limit"]
edges = [
{
"id": "edge0",
"numLanes": lanes,
"speed": speed_limit,
"from": "bottom",
"to": "right",
"length": edgelen,
"shape": [(r*cos(t), r*sin(t)) for t in linspace(-pi/2, 0, 40)]
},
{
"id": "edge1",
"numLanes": lanes,
"speed": speed_limit,
"from": "right",
"to": "top",
"length": edgelen,
"shape": [(r*cos(t), r*sin(t)) for t in linspace(0, pi/2, 40)]
},
{
"id": "edge2",
"numLanes": lanes,
"speed": speed_limit,
"from": "top",
"to": "left",
"length": edgelen,
"shape": [(r*cos(t), r*sin(t)) for t in linspace(pi/2, pi, 40)]},
{
"id": "edge3",
"numLanes": lanes,
"speed": speed_limit,
"from": "left",
"to": "bottom",
"length": edgelen,
"shape": [(r*cos(t), r*sin(t)) for t in linspace(pi, 3*pi/2, 40)]
}
]
return edges
The routes are the sequence of edges vehicles traverse given their current position. For example, a vehicle beginning in the edge titled "edge0" (see section 1.3) must traverse, in sequence, the edges "edge0", "edge1", "edge2", and "edge3", before restarting its path.
In order to specify the routes a vehicle may take, the function specify_routes
is used. The routes in this method can be specified in one of three ways:
1. Single route per edge:
In this case of deterministic routes (as is the case in the ring road scenario), the routes can be specified as dictionary where the key element represents the starting edge and the element is a single list of edges the vehicle must traverse, with the first edge corresponding to the edge the vehicle begins on. Note that the edges must be connected for the route to be valid.
For this network, the available routes under this setting can be defined as follows:
In [ ]:
class myScenario(myScenario): # update my scenario class
def specify_routes(self, net_params):
rts = {"edge0": ["edge0", "edge1", "edge2", "edge3"],
"edge1": ["edge1", "edge2", "edge3", "edge0"],
"edge2": ["edge2", "edge3", "edge0", "edge1"],
"edge3": ["edge3", "edge0", "edge1", "edge2"]}
return rts
2. Multiple routes per edge:
Alternatively, if the routes are meant to be stochastic, each element can consist of a list of (route, probability) tuples, where the first element in the tuple is one of the routes a vehicle can take from a specific starting edge, and the second element is the probability that vehicles will choose that route. Note that, in this case, the sum of probability values for each dictionary key must sum up to one.
For example, modifying the code snippet we presented above, another valid way of representing the route in a more probabilistic setting is:
In [ ]:
class myScenario(myScenario): # update my scenario class
def specify_routes(self, net_params):
rts = {"edge0": [(["edge0", "edge1", "edge2", "edge3"], 1)],
"edge1": [(["edge1", "edge2", "edge3", "edge0"], 1)],
"edge2": [(["edge2", "edge3", "edge0", "edge1"], 1)],
"edge3": [(["edge3", "edge0", "edge1", "edge2"], 1)]}
return rts
3. Per-vehicle routes:
Finally, if you would like to assign a specific starting route to a vehicle with a specific ID, you can do so by adding a element into the dictionary whose key is the name of the vehicle and whose content is the list of edges the vehicle is meant to traverse as soon as it is introduced to the network.
As an example, assume we have a vehicle named "human_0" in the network (as we will in the later sections), and it is initialized in the edge names "edge_0". Then, the route for this edge specifically can be added through the specify_routes
method as follows:
In [ ]:
class myScenario(myScenario): # update my scenario class
def specify_routes(self, net_params):
rts = {"edge0": ["edge0", "edge1", "edge2", "edge3"],
"edge1": ["edge1", "edge2", "edge3", "edge0"],
"edge2": ["edge2", "edge3", "edge0", "edge1"],
"edge3": ["edge3", "edge0", "edge1", "edge2"],
"human_0": ["edge0", "edge1", "edge2", "edge3"]}
return rts
In all three cases, the routes are ultimately represented in the class in the form described under the multiple routes setting, i.e.
>>> print(scenario.rts)
{
"edge0": [
(["edge0", "edge1", "edge2", "edge3"], 1)
],
"edge1": [
(["edge1", "edge2", "edge3", "edge0"], 1)
],
"edge2": [
(["edge2", "edge3", "edge0", "edge1"], 1)
],
"edge3": [
(["edge3", "edge0", "edge1", "edge2"], 1)
],
"human_0": [
(["edge0", "edge1", "edge2", "edge3"], 1)
]
}
where the vehicle-specific route is only included in the third case.
Other auxiliary methods exist within the base scenario class to help support vehicle state initialization and acquisition. Of these methods, the only required abstract method is:
Other optional abstract methods within the base scenario class include:
All of the above functions starting with "specify" receive no inputs, and return a list of tuples in which the first element of the tuple is the name of the edge/intersection/internal_link, and the second value is the distance of the link from some global reference, i.e. [(link_0, pos_0), (link_1, pos_1), ...].
The data specified in specify_edge_starts
is used to provide a "global" sense of the location of vehicles, in one dimension. This is done either through the get_x_by_id
method within an environment, or the get_absolute_position
method in the Vehicles
object within an environment. The specify_internal_edge_starts
allows us to do the same to junctions/internal links when they are also located within the network (this is not the case for the ring road).
In section 1, we created a network with 4 edges named: "edge0", "edge1", "edge2", and "edge3". We assume that the edge titled "edge0" is the origin, and accordingly the position of the edge start of "edge0" is 0. The next edge, "edge1", begins a quarter of the length of the network from the starting point of edge "edge0", and accordingly the position of its edge start is radius * pi/2. This process continues for each of the edges. We can then define the starting position of the edges as follows:
In [ ]:
# import some math functions we may use
from numpy import pi
class myScenario(myScenario): # update my scenario class
def specify_edge_starts(self):
r = self.net_params.additional_params["radius"]
edgestarts = [("edge0", 0),
("edge1", r * 1/2 * pi),
("edge2", r * pi),
("edge3", r * 3/2 * pi)]
return edgestarts
In [ ]:
from flow.core.params import VehicleParams
from flow.controllers import IDMController, ContinuousRouter
from flow.core.params import SumoParams, EnvParams, InitialConfig, NetParams
vehicles = VehicleParams()
vehicles.add(veh_id="human",
acceleration_controller=(IDMController, {}),
routing_controller=(ContinuousRouter, {}),
num_vehicles=22)
sumo_params = SumoParams(sim_step=0.1, render=True)
initial_config = InitialConfig(bunching=40)
For visualizing purposes, we use the environment AccelEnv
, as it works on any given scenario.
In [ ]:
from flow.envs.loop.loop_accel import AccelEnv, ADDITIONAL_ENV_PARAMS
env_params = EnvParams(additional_params=ADDITIONAL_ENV_PARAMS)
Next, using the ADDITIONAL_NET_PARAMS
component see created in section 1.1, we prepare the NetParams
component.
In [ ]:
additional_net_params = ADDITIONAL_NET_PARAMS.copy()
net_params = NetParams(additional_params=additional_net_params)
We are ready now to create and run our scenario. Using the newly defined scenario classes, we create a scenario object and feed it into a Experiment
simulation. Finally, we are able to visually confirm that are network has been properly generated.
In [ ]:
from flow.core.experiment import Experiment
scenario = myScenario( # we use the newly defined scenario class
name="test_scenario",
vehicles=vehicles,
net_params=net_params,
initial_config=initial_config
)
# AccelEnv allows us to test any newly generated scenario quickly
env = AccelEnv(env_params, sumo_params, scenario)
exp = Experiment(env)
# run the sumo simulation for a set number of time steps
_ = exp.run(1, 1500)
In [ ]: