Assignment 2: Graph Search

In this assignment you will be implementing a variety of graph search algorithms, with the eventual goal of solving tridirectional search.

Before you start, you will need to install:

  1. networkx, which is a package for processing networks. This assignment will be easier if you take some time to test out and get familiar with the basic methods of networkx.

  2. matplotlib for basic network visualization.

We will be using two undirected networks for this assignment: a simplified map of Romania (from Russell and Norvig) and a full street map of Atlanta.


In [532]:
from __future__ import division
import random
import matplotlib.pyplot as plt
import pickle
import sys
sys.path.append('lib')
import networkx

In [533]:
"""Romania map data from Russell and Norvig, Chapter 3."""
romania = pickle.load(open('romania_graph.pickle', 'rb'))

Warmups

We'll start by implementing some simpler search and optimization methods before the real exercises.

Warmup 1: Priority queue

5 points

In all searches that involve calculating path cost or heuristic (e.g. uniform-cost), we have to order our search frontier. It turns out the way that we do this can impact our overall search runtime.

To show this, you'll implement a priority queue and demonstrate its performance benefits. For large graphs, sorting all input to a priority queue is impractical. As such, the datastructure you implement should have an amortized O(1) insertion and removal time. It should do better than the queue you've been provided in InsertionSortQueue().

Hints:

  1. The heapq module has been imported for you.
  2. Each edge has an associated weight.

In [534]:
import heapq

class PriorityQueue():
    """Implementation of a priority queue 
    to store nodes during search."""
    
    # HINT look up the module heapq.

    def __init__(self):
        self.queue = []
        self.current = 0 

    def next(self):
        if self.current >=len(self.queue):
            self.current
            raise StopIteration
    
        out = self.queue[self.current]
        self.current += 1

        return out

    def pop(self):
        return heapq.heappop(self.queue)

    def __iter__(self):
        return self

    def __str__(self):
        return 'PQ:[%s]'%(', '.join([str(i) for i in self.queue]))

    def append(self, node):
        heapq.heappush(self.queue, node)
        
    #def push(self,cost,node,path):
        # TODO: finish this

    def __contains__(self, key):
        self.current = 0
        return key in [n for v,n in self.queue]

    def __eq__(self, other):
        return self == other
    
    def clear(self):
        self.queue = []
        
    #def has_element(self, element):
    __next__ = next

In [535]:
from search_tests import priority_queue_tests

priority_queue_tests(PriorityQueue)


PriorityQueue tests passed
Warm-up 2: BFS

5 pts

To get you started with handling graphs in networkx, implement and test breadth-first search over the test network. You'll do this by writing two methods:

  1. breadth_first_search_goal, which returns the goal node and the set of all nodes explored, but no path.
  2. breadth_first_search, which returns the path and the set of all nodes explored.

Hint: networkx.edges() will return all edges connected to a given node.


In [536]:
pathes = pickle.load(open('romania_test_paths.pickle', 'r'))

In [537]:
pathes[('z', 'z')]


Out[537]:
(None, [])

In [538]:
#pq.length

In [539]:
def breadth_first_search_goal(graph, start, goal):
    """Run a breadth-first search from start
    to goal and return the goal as well as
    all nodes explored."""
    
    frontiers = [start]
    explored = []
    while frontiers:
        current = frontiers.pop(0)
        explored.append(current)
        edges = graph.edges(current)
        for i in edges:
            if i[1]!=goal:
                if not(i[1] in explored):
                    frontiers.append(i[1])
            else:
                break
    return goal, explored

In [540]:
def breadth_first_search(graph, start, goal):
    """Run a breadth-first search from start
    to goal and return the path as well as 
    all nodes explored."""
    
    path = []
    frontiers = [start]
    explored = []
    parent = {}
    flag = 0
    while frontiers and flag==0:
        current = frontiers.pop(0)
        if current == goal:
            flag = 1
            break
        explored.append(current)
        edges = graph.edges(current)
        for i in edges:
            if i[1]!=goal:
                if not(i[1] in explored) and not(i[1] in frontiers):
                    parent[i[1]] = current
                    frontiers.append(i[1])
            else:
                parent[i[1]] = current
                flag=1
                break
    if start!=goal:
        path.append(goal)
    else:
        return None, explored
    while path[-1] != start:
        path.append(parent[path[-1]])
    path.reverse()
    return path, explored

In [541]:
# This function exists to help you visually debug your code.
# Feel free to modify it in any way you like.
# graph should be a networkx graph
# node_positions should be a dictionary mapping nodes to x,y coordinates
def draw_graph(graph, node_positions={}, start=None, goal=None, explored=[], path=[]):

    if not node_positions:
        node_positions = networkx.spring_layout(graph)

    networkx.draw_networkx_nodes(graph, node_positions)
    networkx.draw_networkx_edges(graph, node_positions, style='dashed')
    
    networkx.draw_networkx_nodes(graph, node_positions, nodelist=explored, node_color='g') 

    if path:
        edges = [(path[i], path[i+1]) for i in range(0, len(path)-1)]
        networkx.draw_networkx_edges(graph, node_positions, edgelist=edges, edge_color='b')
   
    if start:
        networkx.draw_networkx_nodes(graph, node_positions, nodelist=[start], node_color='b')
    
    if goal:
        networkx.draw_networkx_nodes(graph, node_positions, nodelist=[goal], node_color='y')
    
    labels={}
    for node in romania.nodes():
        labels[node] = node
    networkx.draw_networkx_labels(graph,node_positions,labels,font_size=16)
    plt.plot()
    plt.show()

In [542]:
%pdb off
#path, explored = breadth_first_search(romania, 'n', 'o') 
#print path
#print explored
#pathes[('n', 'o')]


Automatic pdb calling has been turned OFF

In [543]:
"""Testing and visualizing breadth-first search
in the notebook."""
start = 'n'
goal = 'o'
#%debug
#%debug --breakpoint search_tests.py:58
#%run -d
goal, explored = breadth_first_search_goal(romania, start, goal) 
path, explored = breadth_first_search(romania, start, goal) 
node_locations = {n: romania.node[n]['position'] for n in romania.node.keys()}
draw_graph(romania, node_locations, start, goal, explored, path)

In [544]:
#print explored
#print path

In [545]:
from search_tests import bfs_tests

bfs_tests(breadth_first_search)


BFS tests passed.

10 points

Implement uniform-cost search using PriorityQueue() as your frontier. From now on, PriorityQueue() should be your default frontier.

uniform_cost_search() should return the same arguments as breadth-first search: the path to the goal node, the set of all nodes explored, and the total cost of the path.


In [546]:
def uniform_cost_search(graph, start, goal):
    """Run uniform-cost search from start
    to goal and return the path, the nodes
    explored, and the total cost."""
    
    frontiers = PriorityQueue()
    path = []
    explored = []
    parent = {}
    flag = 0
    j=1
    cost = 0
    frontiers.append([0,start])
    while frontiers and flag==0:
        current = frontiers.pop()
        if current[1] == goal:
            flag = 1
            break
        explored.append(current[1])
        edges = graph.edges(current[1])
        for i in edges:
            if i[1]!=goal:
                if not(i[1] in explored) and not(i[1] in frontiers):
                    parent[i[1]] = current[1]
                    frontiers.append([j, i[1]])
                    j+=1
            else:
                parent[i[1]] = current[1]
                flag=1
                break
    if start!=goal:
        path.append(goal)
    else:
        return None, explored, 0
    while path[-1] != start:
        path.append(parent[path[-1]])
        cost += 1
    path.reverse()
    return path, explored, cost

In [547]:
from search_tests import ucs_tests 
ucs_tests(uniform_cost_search)
pathes[('z', 'h')][0]


Uniform cost search tests passed
Out[547]:
['z', 'a', 's', 'f', 'b', 'u', 'h']

10 points

Implement A* search using Euclidean distance as your heuristic. You'll need to implement heuristic_euclid() then pass that function to a_star() as the heuristic parameter. We provide null_heuristic() as a baseline heuristic to test against when calling a_star_tests().


In [548]:
print romania.edge['a']['s']['weight']
print romania.node['a']['position']


140
(91, 492)

In [549]:
def null_heuristic(graph, u, v, goal):
    """Return 0 for all nodes."""
    return 0

In [550]:
import numpy 
def heuristic_euclid(graph, u, v, goal):
    """Return the Euclidean distance from
    the node to the goal. u is current node,
    v is node under consideration."""
    
    currPos = numpy.array(graph.node[u]['position'])
    nextPos = numpy.array(graph.node[v]['position'])
    goalPos = numpy.array(graph.node[goal]['position'])
    
    #costU2Goal = numpy.linalg.norm(currPos, goalPos)
    costU2V = graph.edge[u][v]['edge']
    costV2Goal = numpy.linalg.norm(nextPos-goalPos)
    
    return costU2V+costV2Goal
    #raise NotImplementedError

In [551]:
def a_star(graph, start, goal, heuristic):
    """Run A* search from the start to
    goal using the specified heuristic
    function, and return the final path
    and the nodes explored."""
    
    frontiers = PriorityQueue()
    path = []
    explored = []
    parent = {}
    cost = {}
    flag = 0
    frontiers.append([0,start])
    while frontiers and flag==0:
        current = frontiers.pop()
        print ["current - ",current[1]]
        if current[1] == goal:
            flag = 1
            break
        explored.append(current[1])
        edges = graph.edges(current[1])
        for i in edges:
            if i[1]!=goal:
                if not(i[1] in explored) and (i[1] in frontiers):
                    parent[i[1]] = current[1]
                    cost[i[1]] = heuristic(graph,current[1],i[1],goal)
                    frontiers.append([cost[i[1]], i[1]])
                else:
                    newCost = heuristic(graph,current[1],i[1],goal)
                    if i[1] in cost:
                        if newCost < cost[i[1]]:
                            cost[i[1]] = newCost
                            parent[i[1]] = current[1]
                    else:
                        cost[i[1]] = newCost
            else:
                parent[i[1]] = current[1]
                cost[i[1]] = heuristic(graph,current[1],i[1],goal)
                flag=1
                break
    if start!=goal:
        path.append(goal)
    else:
        return None, explored, 0
    while path[-1] != start:
        path.append(parent[path[-1]])
    path.reverse()
    return path, explored

In [552]:
from search_tests import a_star_tests

#%debug
a_star_tests(a_star, null_heuristic, heuristic_euclid)
pathes[('z', 'h')]


['current - ', 'c']
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-552-7307d6614a32> in <module>()
      2 
      3 #%debug
----> 4 a_star_tests(a_star, null_heuristic, heuristic_euclid)
      5 pathes[('z', 'h')]

D:\Dropbox\PhD\CourseWork\Fall2015\AI\6601_assignment-2\search_tests.pyc in a_star_tests(a_star, null_heuristic, euclidean_distance)
    221         right_path, right_closed = value
    222 
--> 223         path, closed = a_star(romania, start, goal, null_heuristic)
    224 
    225         path_is_valid = is_path_correct(romania, start, goal, path)

<ipython-input-551-ceb20b966cba> in a_star(graph, start, goal, heuristic)
     13     frontiers.append([0,start])
     14     while frontiers and flag==0:
---> 15         current = frontiers.pop()
     16         print ["current - ",current[1]]
     17         if current[1] == goal:

<ipython-input-534-aca180d254b8> in pop(self)
     22 
     23     def pop(self):
---> 24         return heapq.heappop(self.queue)
     25 
     26     def __iter__(self):

IndexError: index out of range

Exercises

The following exercises will require you to implement several kinds of bidirectional and tridirectional searches.

For the following exercises, we will be using Atlanta OpenStreetMap data for our searches. If you want to run tests in iPython notebook using this data (rather than just calling the tests in search_tests), you'll need to load the data from file in the cell below.


In [ ]:
"""Loading Atlanta map data."""
atlanta = pickle.load(open('atlanta_osm.pickle','r'))

Visualizing search results

When using a geographic network, you may want to visualize your searches. We can do this by converting the search results to a GeoJSON file which we then visualize on Gist by importing the file.

We provide a method for doing this in visualize_graph.py called plot_search(), which takes as parameters the graph, the name of the file to write, the nodes on the path, and the set of all nodes explored. This produces a GeoJSON file named as specified, which you can upload to Gist to visualize the search path and explored nodes.


In [ ]:
"""Example of how to visualize search results
with two sample nodes in Atlanta."""
from visualize_graph import plot_search
path, explored = bidirectional_ucs(atlanta, '69244359', '557989279')
plot_search(graph, 'atlanta_search.json', path, explored)
# then upload 'atlanta_search.json' to Gist

15 points

Implement bidirectional uniform-cost search. Remember that this requires starting your search at both the start and end states.

This function will return the goal, the set of explored nodes from the start node's search, and the set of explored nodes from the goal node's search.


In [ ]:
def bidirectional_ucs(graph, start, goal):
    """Run bidirectional uniform-cost search
    between start and goal, and return the path, 
    the nodes explored from start-initial 
    search, and the nodes explored from the 
    goal-initial search."""
    # TODO: finish this function
    raise NotImplementedError
    #return path, (start_explored, goal_explored)

In [ ]:
from search_test import bidirectional_tests

bidirectional_tests(bidirectional_ucs)

20 points

Implement bidirectional A* search. Remember that you need to calculate a heuristic for both the start-to-goal search and the goal-to-start search.

This function will return the final search path, the set of nodes explored during the start-to-goal search, and the set of nodes explored during the goal-to-start search.


In [ ]:
def bidirectional_a_star(graph, start, goal):
    """Run bidirectional A* search between
    start and goal, and return the path from
    start to goal, the nodes explored from
    start-initial search, and the nodes explored
    from the goal-initial search."""
    # TODO: finish this function
    raise NotImplementedError
    #return path, (start_explored, goal_explored)

20 points

Implement tridirectional search in the naive way: starting from each goal node, perform a uniform-cost search and keep expanding until two of the three searches meet.

This will return the path computed and the set of all nodes explored.


In [ ]:
def tridirectional_search(graph, goals):
    """Run tridirectional uniform-cost search
    between the goals, and return the path 
    and the nodes explored."""
    # TODO: finish this function
    raise NotImplementedError
    #return path, nodes_explored

15 points

This is the heart of the assignment. Implement tridirectional search in such a way as to consistently improve on the performance of your previous implementation. This means consistently exploring fewer nodes during your search in order to reduce runtime.

The specifics are up to you, but we have a few suggestions:

  • Tridirectional A*
  • choosing landmarks and precomputing reach values
  • ATL (A*, landmarks, and triangle-inequality)
  • shortcuts (skipping nodes with low reach values)

This function will return the path computed and the set of all nodes explored.


In [ ]:
def tridirectional_search_advanced(graph, goals):
    """Run an improved tridirectional search between
    goals, and return the path and nodes explored."""
    # TODO: finish this function
    raise NotImplementedError
    #return path, nodes_explored

Race!

Here's your chance to show us your best stuff. This part is mandatory if you want to compete in the race for extra credit. Implement custom_search() using whatever strategy you'd like. Remember that 'goals' will be a list of the three nodes between which you should route.


In [ ]:
def custom_search(graph, goals):
    """Run your best tridirectional search between
    goals, and return the path and nodes explored."""
    raise NotImplementedError
    # return path, nodes_explored