In this assignment you will be implementing a variety of graph search algorithms, with the eventual goal of solving tridirectional search.
Before you start, you will need to install:
networkx, which is a package for processing networks. This assignment will be easier if you take some time to test out and get familiar with the basic methods of networkx.
matplotlib for basic network visualization.
We will be using two undirected networks for this assignment: a simplified map of Romania (from Russell and Norvig) and a full street map of Atlanta.
In [532]:
from __future__ import division
import random
import matplotlib.pyplot as plt
import pickle
import sys
sys.path.append('lib')
import networkx
In [533]:
"""Romania map data from Russell and Norvig, Chapter 3."""
romania = pickle.load(open('romania_graph.pickle', 'rb'))
5 points
In all searches that involve calculating path cost or heuristic (e.g. uniform-cost), we have to order our search frontier. It turns out the way that we do this can impact our overall search runtime.
To show this, you'll implement a priority queue and demonstrate its performance benefits. For large graphs, sorting all input to a priority queue is impractical. As such, the datastructure you implement should have an amortized O(1) insertion and removal time. It should do better than the queue you've been provided in InsertionSortQueue().
Hints:
In [534]:
import heapq
class PriorityQueue():
"""Implementation of a priority queue
to store nodes during search."""
# HINT look up the module heapq.
def __init__(self):
self.queue = []
self.current = 0
def next(self):
if self.current >=len(self.queue):
self.current
raise StopIteration
out = self.queue[self.current]
self.current += 1
return out
def pop(self):
return heapq.heappop(self.queue)
def __iter__(self):
return self
def __str__(self):
return 'PQ:[%s]'%(', '.join([str(i) for i in self.queue]))
def append(self, node):
heapq.heappush(self.queue, node)
#def push(self,cost,node,path):
# TODO: finish this
def __contains__(self, key):
self.current = 0
return key in [n for v,n in self.queue]
def __eq__(self, other):
return self == other
def clear(self):
self.queue = []
#def has_element(self, element):
__next__ = next
In [535]:
from search_tests import priority_queue_tests
priority_queue_tests(PriorityQueue)
5 pts
To get you started with handling graphs in networkx, implement and test breadth-first search over the test network. You'll do this by writing two methods:
Hint: networkx.edges() will return all edges connected to a given node.
In [536]:
pathes = pickle.load(open('romania_test_paths.pickle', 'r'))
In [537]:
pathes[('z', 'z')]
Out[537]:
In [538]:
#pq.length
In [539]:
def breadth_first_search_goal(graph, start, goal):
"""Run a breadth-first search from start
to goal and return the goal as well as
all nodes explored."""
frontiers = [start]
explored = []
while frontiers:
current = frontiers.pop(0)
explored.append(current)
edges = graph.edges(current)
for i in edges:
if i[1]!=goal:
if not(i[1] in explored):
frontiers.append(i[1])
else:
break
return goal, explored
In [540]:
def breadth_first_search(graph, start, goal):
"""Run a breadth-first search from start
to goal and return the path as well as
all nodes explored."""
path = []
frontiers = [start]
explored = []
parent = {}
flag = 0
while frontiers and flag==0:
current = frontiers.pop(0)
if current == goal:
flag = 1
break
explored.append(current)
edges = graph.edges(current)
for i in edges:
if i[1]!=goal:
if not(i[1] in explored) and not(i[1] in frontiers):
parent[i[1]] = current
frontiers.append(i[1])
else:
parent[i[1]] = current
flag=1
break
if start!=goal:
path.append(goal)
else:
return None, explored
while path[-1] != start:
path.append(parent[path[-1]])
path.reverse()
return path, explored
In [541]:
# This function exists to help you visually debug your code.
# Feel free to modify it in any way you like.
# graph should be a networkx graph
# node_positions should be a dictionary mapping nodes to x,y coordinates
def draw_graph(graph, node_positions={}, start=None, goal=None, explored=[], path=[]):
if not node_positions:
node_positions = networkx.spring_layout(graph)
networkx.draw_networkx_nodes(graph, node_positions)
networkx.draw_networkx_edges(graph, node_positions, style='dashed')
networkx.draw_networkx_nodes(graph, node_positions, nodelist=explored, node_color='g')
if path:
edges = [(path[i], path[i+1]) for i in range(0, len(path)-1)]
networkx.draw_networkx_edges(graph, node_positions, edgelist=edges, edge_color='b')
if start:
networkx.draw_networkx_nodes(graph, node_positions, nodelist=[start], node_color='b')
if goal:
networkx.draw_networkx_nodes(graph, node_positions, nodelist=[goal], node_color='y')
labels={}
for node in romania.nodes():
labels[node] = node
networkx.draw_networkx_labels(graph,node_positions,labels,font_size=16)
plt.plot()
plt.show()
In [542]:
%pdb off
#path, explored = breadth_first_search(romania, 'n', 'o')
#print path
#print explored
#pathes[('n', 'o')]
In [543]:
"""Testing and visualizing breadth-first search
in the notebook."""
start = 'n'
goal = 'o'
#%debug
#%debug --breakpoint search_tests.py:58
#%run -d
goal, explored = breadth_first_search_goal(romania, start, goal)
path, explored = breadth_first_search(romania, start, goal)
node_locations = {n: romania.node[n]['position'] for n in romania.node.keys()}
draw_graph(romania, node_locations, start, goal, explored, path)
In [544]:
#print explored
#print path
In [545]:
from search_tests import bfs_tests
bfs_tests(breadth_first_search)
10 points
Implement uniform-cost search using PriorityQueue() as your frontier. From now on, PriorityQueue() should be your default frontier.
uniform_cost_search() should return the same arguments as breadth-first search: the path to the goal node, the set of all nodes explored, and the total cost of the path.
In [546]:
def uniform_cost_search(graph, start, goal):
"""Run uniform-cost search from start
to goal and return the path, the nodes
explored, and the total cost."""
frontiers = PriorityQueue()
path = []
explored = []
parent = {}
flag = 0
j=1
cost = 0
frontiers.append([0,start])
while frontiers and flag==0:
current = frontiers.pop()
if current[1] == goal:
flag = 1
break
explored.append(current[1])
edges = graph.edges(current[1])
for i in edges:
if i[1]!=goal:
if not(i[1] in explored) and not(i[1] in frontiers):
parent[i[1]] = current[1]
frontiers.append([j, i[1]])
j+=1
else:
parent[i[1]] = current[1]
flag=1
break
if start!=goal:
path.append(goal)
else:
return None, explored, 0
while path[-1] != start:
path.append(parent[path[-1]])
cost += 1
path.reverse()
return path, explored, cost
In [547]:
from search_tests import ucs_tests
ucs_tests(uniform_cost_search)
pathes[('z', 'h')][0]
Out[547]:
In [548]:
print romania.edge['a']['s']['weight']
print romania.node['a']['position']
In [549]:
def null_heuristic(graph, u, v, goal):
"""Return 0 for all nodes."""
return 0
In [550]:
import numpy
def heuristic_euclid(graph, u, v, goal):
"""Return the Euclidean distance from
the node to the goal. u is current node,
v is node under consideration."""
currPos = numpy.array(graph.node[u]['position'])
nextPos = numpy.array(graph.node[v]['position'])
goalPos = numpy.array(graph.node[goal]['position'])
#costU2Goal = numpy.linalg.norm(currPos, goalPos)
costU2V = graph.edge[u][v]['edge']
costV2Goal = numpy.linalg.norm(nextPos-goalPos)
return costU2V+costV2Goal
#raise NotImplementedError
In [551]:
def a_star(graph, start, goal, heuristic):
"""Run A* search from the start to
goal using the specified heuristic
function, and return the final path
and the nodes explored."""
frontiers = PriorityQueue()
path = []
explored = []
parent = {}
cost = {}
flag = 0
frontiers.append([0,start])
while frontiers and flag==0:
current = frontiers.pop()
print ["current - ",current[1]]
if current[1] == goal:
flag = 1
break
explored.append(current[1])
edges = graph.edges(current[1])
for i in edges:
if i[1]!=goal:
if not(i[1] in explored) and (i[1] in frontiers):
parent[i[1]] = current[1]
cost[i[1]] = heuristic(graph,current[1],i[1],goal)
frontiers.append([cost[i[1]], i[1]])
else:
newCost = heuristic(graph,current[1],i[1],goal)
if i[1] in cost:
if newCost < cost[i[1]]:
cost[i[1]] = newCost
parent[i[1]] = current[1]
else:
cost[i[1]] = newCost
else:
parent[i[1]] = current[1]
cost[i[1]] = heuristic(graph,current[1],i[1],goal)
flag=1
break
if start!=goal:
path.append(goal)
else:
return None, explored, 0
while path[-1] != start:
path.append(parent[path[-1]])
path.reverse()
return path, explored
In [552]:
from search_tests import a_star_tests
#%debug
a_star_tests(a_star, null_heuristic, heuristic_euclid)
pathes[('z', 'h')]
The following exercises will require you to implement several kinds of bidirectional and tridirectional searches.
For the following exercises, we will be using Atlanta OpenStreetMap data for our searches. If you want to run tests in iPython notebook using this data (rather than just calling the tests in search_tests), you'll need to load the data from file in the cell below.
In [ ]:
"""Loading Atlanta map data."""
atlanta = pickle.load(open('atlanta_osm.pickle','r'))
When using a geographic network, you may want to visualize your searches. We can do this by converting the search results to a GeoJSON file which we then visualize on Gist by importing the file.
We provide a method for doing this in visualize_graph.py called plot_search(), which takes as parameters the graph, the name of the file to write, the nodes on the path, and the set of all nodes explored. This produces a GeoJSON file named as specified, which you can upload to Gist to visualize the search path and explored nodes.
In [ ]:
"""Example of how to visualize search results
with two sample nodes in Atlanta."""
from visualize_graph import plot_search
path, explored = bidirectional_ucs(atlanta, '69244359', '557989279')
plot_search(graph, 'atlanta_search.json', path, explored)
# then upload 'atlanta_search.json' to Gist
15 points
Implement bidirectional uniform-cost search. Remember that this requires starting your search at both the start and end states.
This function will return the goal, the set of explored nodes from the start node's search, and the set of explored nodes from the goal node's search.
In [ ]:
def bidirectional_ucs(graph, start, goal):
"""Run bidirectional uniform-cost search
between start and goal, and return the path,
the nodes explored from start-initial
search, and the nodes explored from the
goal-initial search."""
# TODO: finish this function
raise NotImplementedError
#return path, (start_explored, goal_explored)
In [ ]:
from search_test import bidirectional_tests
bidirectional_tests(bidirectional_ucs)
20 points
Implement bidirectional A* search. Remember that you need to calculate a heuristic for both the start-to-goal search and the goal-to-start search.
This function will return the final search path, the set of nodes explored during the start-to-goal search, and the set of nodes explored during the goal-to-start search.
In [ ]:
def bidirectional_a_star(graph, start, goal):
"""Run bidirectional A* search between
start and goal, and return the path from
start to goal, the nodes explored from
start-initial search, and the nodes explored
from the goal-initial search."""
# TODO: finish this function
raise NotImplementedError
#return path, (start_explored, goal_explored)
In [ ]:
def tridirectional_search(graph, goals):
"""Run tridirectional uniform-cost search
between the goals, and return the path
and the nodes explored."""
# TODO: finish this function
raise NotImplementedError
#return path, nodes_explored
15 points
This is the heart of the assignment. Implement tridirectional search in such a way as to consistently improve on the performance of your previous implementation. This means consistently exploring fewer nodes during your search in order to reduce runtime.
The specifics are up to you, but we have a few suggestions:
This function will return the path computed and the set of all nodes explored.
In [ ]:
def tridirectional_search_advanced(graph, goals):
"""Run an improved tridirectional search between
goals, and return the path and nodes explored."""
# TODO: finish this function
raise NotImplementedError
#return path, nodes_explored
In [ ]:
def custom_search(graph, goals):
"""Run your best tridirectional search between
goals, and return the path and nodes explored."""
raise NotImplementedError
# return path, nodes_explored