If any part of this notebook is used in your research, please cite with the reference found in README.md.
pysal/spaghetti
and pulp for optimal routingAuthor: James D. Gaboardi jgaboardi@gmail.com
This notebook provides a use case for:
In [1]:
%load_ext watermark
%watermark
In [2]:
import geopandas
from libpysal import examples
import matplotlib
import matplotlib_scalebar
from matplotlib_scalebar.scalebar import ScaleBar
import numpy
import pulp
import spaghetti
%matplotlib inline
%watermark -w
%watermark -iv
In [3]:
try:
from IPython.display import set_matplotlib_formats
set_matplotlib_formats("retina")
except ImportError:
pass
Detective George B. Königsberg thought he needed to visit 7 crimes scenes in one area of City X this afternoon in order to collect evidence. However, his lieutenant, Anna Nagurney just told him he needs to double that to 14. He really wants to wrap up early so he can get home to watch the 2012 mathematical thriller, Travelling Salesman by Timothy Lanzone), with his cat and dog, Euler and Hamilton. Therefore, he decides on calculating an optimal route so that he can visit all 14 crime scenes in one tour while covering the shortest distance. Det. Königsberg utilizes an integer linear programming formulation of the traveling salesperson problem (TSP) to find his best route.
$\begin{array} \displaystyle \normalsize \textrm{Minimize} & \displaystyle \normalsize \sum_{0 \leq i \\ i \neq j}^n \sum_{j \leq n \\ j \neq i}^n c_{ij}x_{ij} & & & & \normalsize (1) \\ \normalsize \textrm{Subject To} & \displaystyle \normalsize \sum_{i=0}^n x_{ij}=1 & \normalsize j=1,...,n, & \normalsize j\neq i; & &\normalsize (2)\\ & \displaystyle \normalsize \sum_{j=0}^n x_{ij}=1 & \normalsize i=1,...,n, & \normalsize i\neq j; & &\normalsize (3) \\ & \displaystyle \normalsize u_i - u_j + p x_{ij} \leq p - 1 & \normalsize i=1,...,n, & \normalsize 1 \leq i \neq j \leq n; & &\normalsize (4) \\ & \normalsize x_{ij} \in \{0,1\} & \normalsize i=1,...,n, & \normalsize j=1,...,n; & &\normalsize (5)\\ & \normalsize u_{i} \in \mathbb{Z} & \normalsize i=1,...,n. & & &\normalsize (6)\\ \end{array}$
$\begin{array} \displaystyle \normalsize \textrm{Where} & \small x_{ij} & \small = & \small \begin{cases} 1, & \textrm{if node } i \textrm{ immediately precedes node } j \textrm{ in the tour}\\ 0, & \textrm{otherwise} \end{cases} &&&&\\ & \small c_{ij} & \small = & \small \textrm{distance matrix between all } i,j \textrm{ pairs} &&&& \\ & \small n & \small = & \small \textrm{the total number of nodes in the tour} &&&&\\ & \small i & \small = & \small \textrm{each potential origin node} &&&&\\ & \small j & \small = & \small \textrm{each potential destination node} &&&&\\ & \small u_i & \small = & \small \textrm{continuous, non-negative real numbers} &&&&\\ & \small p & \small = & \small \textrm{allowed visits prior to return (}n = p \textrm{ in this formulation)} &&&&\\ \end{array}$
References
In [4]:
class MTZ_TSP:
def __init__(self, nodes, cij, xij_tag="x_%s,%s", ui_tag="u_%s", display=True):
"""Instantiate and solve the Traveling Salesperson Problem (TSP)
based the formulation from Miller, Tucker, and Zemlin (1960).
Parameters
----------
nodes : geopandas.GeoSeries
All nodes to be visited in the tour.
cij : numpy.array
All-to-all distance matrix for nodes.
xij_tag : str
Tour decision variable names within the model. Default is
'x_%s,%s' where %s indicates string formatting.
ui_tag : str
Arbitrary real number decision variable names within the model.
Default is 'u_%s' where %s indicates string formatting.
display : bool
Print out solution results.
Attributes
----------
nodes : geopandas.GeoSeries
See description in above.
p : int
The number of nodes in the set.
rp_0n : range
Range of node IDs in ``nodes`` from 0,...,``p``.
rp_1n : range
Range of node IDs in ``nodes`` from 1,...,``p``.
id : str
Column name for ``nodes``.
cij : numpy.array
See description in above.
xij_tag : str
See description in above.
ui_tag : str
See description in above.
tsp : pulp.LpProblem
Integer Linear Programming problem instance.
xij : numpy.array
Binary tour decision variables (``pulp.LpVariable``).
ui : numpy.array
Continuous arbitrary real number decision variables
(``pulp.LpVariable``).
cycle_ods : dict
Cycle origin-destination lookup keyed by origin with
destination as the value.
tour_pairs : list
OD pairs comprising each abstract tour arc.
"""
# all nodes to be visited and the distance matrix
self.nodes, self.cij = nodes, cij
# number of nodes in the set
self.p = self.nodes.shape[0]
# full and truncated range of nodes (p) in the set
self.rp_0n, self.rp_1n = range(self.p), range(1, self.p)
# column name for node IDs
self.id = self.nodes.name
# alpha tag for decision and dummy variable prefixes
self.xij_tag, self.ui_tag = xij_tag, ui_tag
# instantiate a model
self.tsp = pulp.LpProblem("MTZ_TSP", pulp.LpMinimize)
# create and set the tour decision variables
self.tour_dvs()
# create and set the arbitraty real number decision variables
self.arn_dvs()
# set the objective function
self.objective_func()
# node entry constraints
self.entry_exit_constrs(entry=True)
# node exit constraints
self.entry_exit_constrs(entry=False)
# subtour prevention constraints
self.prevent_subtours()
# solve
self.tsp.solve()
# origin-destination lookup
self.get_decisions(display=display)
# extract the sequence of nodes to construct the optimal tour
self.construct_tour()
def tour_dvs(self):
"""Create the tour decision variables - eq (5)."""
def _name(_x):
"""Helper for naming variables"""
return self.nodes[_x].split("_")[-1]
xij = numpy.array(
[
[
pulp.LpVariable(self.xij_tag % (_name(i), _name(j)), cat="Binary")
for j in self.rp_0n
]
for i in self.rp_0n
]
)
self.xij = xij
def arn_dvs(self):
"""Create arbitrary real number decision variables - eq (6)."""
ui = numpy.array(
[pulp.LpVariable(self.ui_tag % (i), lowBound=0) for i in self.rp_0n]
)
self.ui = ui
def objective_func(self):
"""Add the objective function - eq (1)."""
self.tsp += pulp.lpSum(
[
self.cij[i, j] * self.xij[i, j]
for i in self.rp_0n
for j in self.rp_0n
if i != j
]
)
def entry_exit_constrs(self, entry=True):
"""Add entry and exit constraints - eq (2) and (3)."""
if entry:
for i in self.rp_0n:
self.tsp += (
pulp.lpSum([self.xij[i, j] for j in self.rp_0n if i != j]) == 1
)
# exit constraints
else:
for j in self.rp_0n:
self.tsp += (
pulp.lpSum([self.xij[i, j] for i in self.rp_0n if i != j]) == 1
)
def prevent_subtours(self):
"""Add subtour prevention constraints - eq (4)."""
for i in self.rp_1n:
for j in self.rp_1n:
if i != j:
self.tsp += (
self.ui[i] - self.ui[j] + self.p * self.xij[i, j] <= self.p - 1
)
def get_decisions(self, display=True):
"""Fetch the selected decision variables."""
cycle_ods = {}
for var in self.tsp.variables():
if var.name.startswith(self.ui_tag[0]):
continue
if var.varValue > 0:
if display:
print("%s: %s" % (var.name, var.varValue))
od = var.name.split("_")[-1]
o, d = [int(tf) for tf in od.split(",")]
cycle_ods[o] = d
if display:
print("Status: %s" % pulp.LpStatus[self.tsp.status])
self.cycle_ods = cycle_ods
def construct_tour(self):
"""Construct the tour."""
tour_pairs = []
for origin in self.rp_0n:
tour_pairs.append([])
try:
tour_pairs[origin].append(next_origin)
next_origin = self.cycle_ods[next_origin]
tour_pairs[origin].append(next_origin)
except NameError:
next_origin = self.cycle_ods[origin]
tour_pairs[origin].append(origin)
tour_pairs[origin].append(next_origin)
tour_pairs = {idx: sorted(tp) for idx, tp in enumerate(tour_pairs)}
self.tour_pairs = tour_pairs
def extract_tour(self, paths, id_col, leg_label="leg"):
"""Extract the tour (the legs in the journey) as a
``geopandas.GeoDataFrame`` of ``shapely.geometry.LineString`` objects.
Parameters
----------
paths : geopandas.GeoDataFrame
Shortest-path routes between all observations.
id_col : str
ID column name.
leg_label : str
Column name for the tour sequence. Default is 'leg'.
Returns
-------
tour : geopandas.GeoDataFrame
Optimal tour of ``self.nodes`` sequenced by ``leg_label`` that
retains the original index of ``paths``.
"""
paths[leg_label] = int
# set label of journey leg for each OD pair.
for leg, tp in self.tour_pairs.items():
paths.loc[paths[id_col] == tuple(tp), leg_label] = leg
# extract only paths in the tour
tour = paths[paths[leg_label] != int].copy()
tour.sort_values(by=[leg_label], inplace=True)
return tour
In [5]:
streets = geopandas.read_file(examples.get_path("streets.shp"))
streets.crs = "epsg:2223"
streets = streets.to_crs("epsg:2762")
streets.head()
Out[5]:
In [6]:
all_crimes = geopandas.read_file(examples.get_path("crimes.shp"))
all_crimes.crs = "epsg:2223"
all_crimes = all_crimes.to_crs("epsg:2762")
all_crimes.head()
Out[6]:
In [7]:
numpy.random.seed(1960)
koenigsberg_cases = 7 * 2
subset_idx = numpy.random.choice(all_crimes.index, koenigsberg_cases, replace=False)
crimes_scenes = all_crimes[all_crimes.index.isin(subset_idx)].copy()
crimes_scenes
Out[7]:
In [8]:
ntw = spaghetti.Network(in_data=streets)
vertices, arcs = spaghetti.element_as_gdf(ntw, vertices=True, arcs=True)
vertices.head()
Out[8]:
In [9]:
arcs.head()
Out[9]:
In [10]:
base = arcs.plot(linewidth=3, alpha=0.25, color="k", zorder=0, figsize=(10, 10))
vertices.plot(ax=base, markersize=2, color="red", zorder=1)
all_crimes.plot(ax=base, markersize=5, color="k", zorder=2)
crimes_scenes.plot(ax=base, markersize=100, alpha=0.25, color="blue", zorder=2)
# add scale bar
scalebar = ScaleBar(3, units="m", location="lower left")
base.add_artist(scalebar);
In [11]:
ntw.snapobservations(crimes_scenes, "crime_scenes")
pp_obs = spaghetti.element_as_gdf(ntw, pp_name="crime_scenes")
pp_obs_snapped = spaghetti.element_as_gdf(ntw, pp_name="crime_scenes", snapped=True)
pp_obs_snapped
Out[11]:
In [12]:
base = arcs.plot(linewidth=3, alpha=0.25, color="k", zorder=0, figsize=(10, 10))
vertices.plot(ax=base, markersize=5, color="r", zorder=1)
pp_obs.plot(ax=base, markersize=20, color="k", zorder=2)
pp_obs_snapped.plot(ax=base, markersize=20, marker="x", color="k", zorder=2)
# add scale bar
scalebar = ScaleBar(3, units="m", location="lower left")
base.add_artist(scalebar);
In [13]:
d2d_dist, tree = ntw.allneighbordistances("crime_scenes", gen_tree=True)
d2d_dist[:3, :3]
Out[13]:
In [14]:
list(tree.items())[:4], list(tree.items())[-4:]
Out[14]:
In [15]:
pp_obs["dv"] = pp_obs["id"].apply(lambda _id: "x_%s" % _id)
pp_obs
Out[15]:
In [16]:
mtz_tsp = MTZ_TSP(pp_obs["dv"], d2d_dist)
In [17]:
paths = ntw.shortest_paths(tree, "crime_scenes")
paths_gdf = spaghetti.element_as_gdf(ntw, routes=paths)
paths_gdf.head()
Out[17]:
In [18]:
tour = mtz_tsp.extract_tour(paths_gdf.copy(), "id")
tour.head()
Out[18]:
In [19]:
def tour_labels(t, b):
"""Label each leg of the tour."""
def _lab_loc(_x):
"""Helper for labeling location."""
return _x.geometry.interpolate(0.5, normalized=True).coords[0]
kws = {"size": 20, "ha": "center", "va": "bottom", "weight": "bold"}
t.apply(lambda x: b.annotate(s=x.leg, xy=_lab_loc(x), **kws), axis=1)
def obs_labels(o, b):
"""Label each point pattern observation."""
def _lab_loc(_x):
"""Helper for labeling observations."""
return _x.geometry.coords[0]
kws = {"size": 14, "ha": "left", "va": "bottom", "style": "oblique", "color": "k"}
o.apply(lambda x: b.annotate(s=x.id, xy=_lab_loc(x), **kws), axis=1)
In [20]:
base = arcs.plot(alpha=0.2, linewidth=1, color="k", figsize=(10, 10), zorder=0)
tour.plot(ax=base, column="leg", cmap="tab20", alpha=0.50, linewidth=10, zorder=1)
vertices.plot(ax=base, markersize=1, color="r", zorder=2)
pp_obs.plot(ax=base, markersize=20, color="k", zorder=3)
pp_obs_snapped.plot(ax=base, markersize=20, color="k", marker="x", zorder=2)
# tour leg labels
tour_labels(tour, base)
# crime scene labels
obs_labels(pp_obs, base)
# add scale bar
scalebar = ScaleBar(3, units="m", location="lower left")
base.add_artist(scalebar);