Note: This is not yet ready, but shows the direction I'm leaning in for Fourth Edition Search.
This notebook describes several state-space search algorithms, and how they can be used to solve a variety of problems. We start with a simple algorithm and a simple domain: finding a route from city to city. Later we will explore other algorithms and domains.
Like all state-space search problems, in a route-finding problem you will be given:
'A'
for the city Arad).'B'
for the city Bucharest).'A'
to 'S'
).You will be asked to find:
We'll use this map:
A state-space search problem can be represented by a graph, where the vertexes of the graph are the states of the problem (in this case, cities) and the edges of the graph are the actions (in this case, driving along a road).
We'll represent a city by its single initial letter.
We'll represent the graph of connections as a dict
that maps each city to a list of the neighboring cities (connected by a road). For now we don't explicitly represent the actions, nor the distances
between cities.
In [1]:
romania = {
'A': ['Z', 'T', 'S'],
'B': ['F', 'P', 'G', 'U'],
'C': ['D', 'R', 'P'],
'D': ['M', 'C'],
'E': ['H'],
'F': ['S', 'B'],
'G': ['B'],
'H': ['U', 'E'],
'I': ['N', 'V'],
'L': ['T', 'M'],
'M': ['L', 'D'],
'N': ['I'],
'O': ['Z', 'S'],
'P': ['R', 'C', 'B'],
'R': ['S', 'C', 'P'],
'S': ['A', 'O', 'F', 'R'],
'T': ['A', 'L'],
'U': ['B', 'V', 'H'],
'V': ['U', 'I'],
'Z': ['O', 'A']}
Suppose we want to get from A
to B
. Where can we go from the start state, A
?
In [2]:
romania['A']
Out[2]:
We see that from A
we can get to any of the three cities ['Z', 'T', 'S']
. Which should we choose? We don't know. That's the whole point of search: we don't know which immediate action is best, so we'll have to explore, until we find a path that leads to the goal.
How do we explore? We'll start with a simple algorithm that will get us from A
to B
. We'll keep a frontier—a collection of not-yet-explored states—and expand the frontier outward until it reaches the goal. To be more precise:
'A'
.s
.s
is the goal, we're done. Return the path to s
.s
. For each one:breadth_first
The function breadth_first
implements this strategy:
In [3]:
from collections import deque # Doubly-ended queue: pop from left, append to right.
def breadth_first(start, goal, neighbors):
"Find a shortest sequence of states from start to the goal."
frontier = deque([start]) # A queue of states
previous = {start: None} # start has no previous state; other states will
while frontier:
s = frontier.popleft()
if s == goal:
return path(previous, s)
for s2 in neighbors[s]:
if s2 not in previous:
frontier.append(s2)
previous[s2] = s
def path(previous, s):
"Return a list of states that lead to state s, according to the previous dict."
return [] if (s is None) else path(previous, previous[s]) + [s]
A couple of things to note:
end
state by following the trail of previous[end]
pointers, all the way back to start
.
The dict previous
is a map of {state: previous_state}
. s
that is the goal state, we know we have found a shortest path, because any other state in the queue must correspond to a path that is as long or longer.previous
contains all the states that are currently in frontier
as well as all the states that were in frontier
in the past.breadth_first
returns None
. If a path is found, it returns the sequence of states on the path.Some examples:
In [4]:
breadth_first('A', 'B', romania)
Out[4]:
In [5]:
breadth_first('L', 'N', romania)
Out[5]:
In [6]:
breadth_first('N', 'L', romania)
Out[6]:
In [7]:
breadth_first('E', 'E', romania)
Out[7]:
Now let's try a different kind of problem that can be solved with the same search function.
A word ladder problem is this: given a start word and a goal word, find the shortest way to transform the start word into the goal word by changing one letter at a time, such that each change results in a word. For example starting with green
we can reach grass
in 7 steps:
green
→ greed
→ treed
→ trees
→ tress
→ cress
→ crass
→ grass
We will need a dictionary of words. We'll use 5-letter words from the Stanford GraphBase project for this purpose. Let's get that file from aimadata.
In [8]:
from search import *
sgb_words = DataFile("EN-text/sgb-words.txt")
We can assign WORDS
to be the set of all the words in this file:
In [9]:
WORDS = set(sgb_words.read().split())
len(WORDS)
Out[9]:
And define neighboring_words
to return the set of all words that are a one-letter change away from a given word
:
In [10]:
def neighboring_words(word):
"All words that are one letter away from this word."
neighbors = {word[:i] + c + word[i+1:]
for i in range(len(word))
for c in 'abcdefghijklmnopqrstuvwxyz'
if c != word[i]}
return neighbors & WORDS
For example:
In [11]:
neighboring_words('hello')
Out[11]:
In [12]:
neighboring_words('world')
Out[12]:
Now we can create word_neighbors
as a dict of {word: {neighboring_word, ...}}
:
In [13]:
word_neighbors = {word: neighboring_words(word)
for word in WORDS}
Now the breadth_first
function can be used to solve a word ladder problem:
In [14]:
breadth_first('green', 'grass', word_neighbors)
Out[14]:
In [15]:
breadth_first('smart', 'brain', word_neighbors)
Out[15]:
In [16]:
breadth_first('frown', 'smile', word_neighbors)
Out[16]:
Now we'll embelish the breadth_first
algorithm to make a family of search algorithms with more capabilities:
Here's how we do these things:
Problem.actions(state)
to return a collection of the actions that are allowed in a state,
and another method, Problem.result(state, action)
that says what happens when you take an action.explored
of states that have already been explored. We also have a class, Frontier
, that makes it efficient to ask if a state is on the frontier.Frontier
class acts as a priority queue, allowing the "best" state to be explored next.
We represent a sequence of actions and resulting states as a linked list of Node
objects.The algorithm breadth_first_search
is basically the same as breadth_first
, but using our new conventions:
In [17]:
def breadth_first_search(problem):
"Search for goal; paths with least number of steps first."
if problem.is_goal(problem.initial):
return Node(problem.initial)
frontier = FrontierQ(Node(problem.initial), LIFO=False)
explored = set()
while frontier:
node = frontier.pop()
explored.add(node.state)
for action in problem.actions(node.state):
child = node.child(problem, action)
if child.state not in explored and child.state not in frontier:
if problem.is_goal(child.state):
return child
frontier.add(child)
Next is uniform_cost_search
, in which each step can have a different cost, and we still consider first one os the states with minimum cost so far.
In [18]:
def uniform_cost_search(problem, costfn=lambda node: node.path_cost):
frontier = FrontierPQ(Node(problem.initial), costfn)
explored = set()
while frontier:
node = frontier.pop()
if problem.is_goal(node.state):
return node
explored.add(node.state)
for action in problem.actions(node.state):
child = node.child(problem, action)
if child.state not in explored and child not in frontier:
frontier.add(child)
elif child in frontier and frontier.cost[child] < child.path_cost:
frontier.replace(child)
Finally, astar_search
in which the cost includes an estimate of the distance to the goal as well as the distance travelled so far.
In [19]:
def astar_search(problem, heuristic):
costfn = lambda node: node.path_cost + heuristic(node.state)
return uniform_cost_search(problem, costfn)
The solution to a search problem is now a linked list of Node
s, where each Node
includes a state
and the path_cost
of getting to the state. In addition, for every Node
except for the first (root) Node
, there is a previous Node
(indicating the state that lead to this Node
) and an action
(indicating the action taken to get here).
In [20]:
class Node(object):
"""A node in a search tree. A search tree is spanning tree over states.
A Node contains a state, the previous node in the tree, the action that
takes us from the previous state to this state, and the path cost to get to
this state. If a state is arrived at by two paths, then there are two nodes
with the same state."""
def __init__(self, state, previous=None, action=None, step_cost=1):
"Create a search tree Node, derived from a previous Node by an action."
self.state = state
self.previous = previous
self.action = action
self.path_cost = 0 if previous is None else (previous.path_cost + step_cost)
def __repr__(self): return "<Node {}: {}>".format(self.state, self.path_cost)
def __lt__(self, other): return self.path_cost < other.path_cost
def child(self, problem, action):
"The Node you get by taking an action from this Node."
result = problem.result(self.state, action)
return Node(result, self, action,
problem.step_cost(self.state, action, result))
A frontier is a collection of Nodes that acts like both a Queue and a Set. A frontier, f
, supports these operations:
f.add(node)
: Add a node to the Frontier.
f.pop()
: Remove and return the "best" node from the frontier.
f.replace(node)
: add this node and remove a previous node with the same state.
state in f
: Test if some node in the frontier has arrived at state.
f[state]
: returns the node corresponding to this state in frontier.
len(f)
: The number of Nodes in the frontier. When the frontier is empty, f
is false.
We provide two kinds of frontiers: One for "regular" queues, either first-in-first-out (for breadth-first search) or last-in-first-out (for depth-first search), and one for priority queues, where you can specify what cost function on nodes you are trying to minimize.
In [21]:
from collections import OrderedDict
import heapq
class FrontierQ(OrderedDict):
"A Frontier that supports FIFO or LIFO Queue ordering."
def __init__(self, initial, LIFO=False):
"""Initialize Frontier with an initial Node.
If LIFO is True, pop from the end first; otherwise from front first."""
self.LIFO = LIFO
self.add(initial)
def add(self, node):
"Add a node to the frontier."
self[node.state] = node
def pop(self):
"Remove and return the next Node in the frontier."
(state, node) = self.popitem(self.LIFO)
return node
def replace(self, node):
"Make this node replace the nold node with the same state."
del self[node.state]
self.add(node)
In [22]:
class FrontierPQ:
"A Frontier ordered by a cost function; a Priority Queue."
def __init__(self, initial, costfn=lambda node: node.path_cost):
"Initialize Frontier with an initial Node, and specify a cost function."
self.heap = []
self.states = {}
self.costfn = costfn
self.add(initial)
def add(self, node):
"Add node to the frontier."
cost = self.costfn(node)
heapq.heappush(self.heap, (cost, node))
self.states[node.state] = node
def pop(self):
"Remove and return the Node with minimum cost."
(cost, node) = heapq.heappop(self.heap)
self.states.pop(node.state, None) # remove state
return node
def replace(self, node):
"Make this node replace a previous node with the same state."
if node.state not in self:
raise ValueError('{} not there to replace'.format(node.state))
for (i, (cost, old_node)) in enumerate(self.heap):
if old_node.state == node.state:
self.heap[i] = (self.costfn(node), node)
heapq._siftdown(self.heap, 0, i)
return
def __contains__(self, state): return state in self.states
def __len__(self): return len(self.heap)
Problem
is the abstract class for all search problems. You can define your own class of problems as a subclass of Problem
. You will need to override the actions
and result
method to describe how your problem works. You will also have to either override is_goal
or pass a collection of goal states to the initialization method. If actions have different costs, you should override the step_cost
method.
In [23]:
class Problem(object):
"""The abstract class for a search problem."""
def __init__(self, initial=None, goals=(), **additional_keywords):
"""Provide an initial state and optional goal states.
A subclass can have additional keyword arguments."""
self.initial = initial # The initial state of the problem.
self.goals = goals # A collection of possibe goal states.
self.__dict__.update(**additional_keywords)
def actions(self, state):
"Return a list of actions executable in this state."
raise NotImplementedError # Override this!
def result(self, state, action):
"The state that results from executing this action in this state."
raise NotImplementedError # Override this!
def is_goal(self, state):
"True if the state is a goal."
return state in self.goals # Optionally override this!
def step_cost(self, state, action, result=None):
"The cost of taking this action from this state."
return 1 # Override this if actions have different costs
In [24]:
def action_sequence(node):
"The sequence of actions to get to this node."
actions = []
while node.previous:
actions.append(node.action)
node = node.previous
return actions[::-1]
def state_sequence(node):
"The sequence of states to get to this node."
states = [node.state]
while node.previous:
node = node.previous
states.append(node.state)
return states[::-1]
In [25]:
dirt = '*'
clean = ' '
class TwoLocationVacuumProblem(Problem):
"""A Vacuum in a world with two locations, and dirt.
Each state is a tuple of (location, dirt_in_W, dirt_in_E)."""
def actions(self, state): return ('W', 'E', 'Suck')
def is_goal(self, state): return dirt not in state
def result(self, state, action):
"The state that results from executing this action in this state."
(loc, dirtW, dirtE) = state
if action == 'W': return ('W', dirtW, dirtE)
elif action == 'E': return ('E', dirtW, dirtE)
elif action == 'Suck' and loc == 'W': return (loc, clean, dirtE)
elif action == 'Suck' and loc == 'E': return (loc, dirtW, clean)
else: raise ValueError('unknown action: ' + action)
In [26]:
problem = TwoLocationVacuumProblem(initial=('W', dirt, dirt))
result = uniform_cost_search(problem)
result
Out[26]:
In [27]:
action_sequence(result)
Out[27]:
In [28]:
state_sequence(result)
Out[28]:
In [29]:
problem = TwoLocationVacuumProblem(initial=('E', clean, dirt))
result = uniform_cost_search(problem)
action_sequence(result)
Out[29]:
Here is another problem domain, to show you how to define one. The idea is that we have a number of water jugs and a water tap and the goal is to measure out a specific amount of water (in, say, ounces or liters). You can completely fill or empty a jug, but because the jugs don't have markings on them, you can't partially fill them with a specific amount. You can, however, pour one jug into another, stopping when the seconfd is full or the first is empty.
In [30]:
class PourProblem(Problem):
"""Problem about pouring water between jugs to achieve some water level.
Each state is a tuples of levels. In the initialization, provide a tuple of
capacities, e.g. PourProblem(capacities=(8, 16, 32), initial=(2, 4, 3), goals={7}),
which means three jugs of capacity 8, 16, 32, currently filled with 2, 4, 3 units of
water, respectively, and the goal is to get a level of 7 in any one of the jugs."""
def actions(self, state):
"""The actions executable in this state."""
jugs = range(len(state))
return ([('Fill', i) for i in jugs if state[i] != self.capacities[i]] +
[('Dump', i) for i in jugs if state[i] != 0] +
[('Pour', i, j) for i in jugs for j in jugs if i != j])
def result(self, state, action):
"""The state that results from executing this action in this state."""
result = list(state)
act, i, j = action[0], action[1], action[-1]
if act == 'Fill': # Fill i to capacity
result[i] = self.capacities[i]
elif act == 'Dump': # Empty i
result[i] = 0
elif act == 'Pour':
a, b = state[i], state[j]
result[i], result[j] = ((0, a + b)
if (a + b <= self.capacities[j]) else
(a + b - self.capacities[j], self.capacities[j]))
else:
raise ValueError('unknown action', action)
return tuple(result)
def is_goal(self, state):
"""True if any of the jugs has a level equal to one of the goal levels."""
return any(level in self.goals for level in state)
In [31]:
p7 = PourProblem(initial=(2, 0), capacities=(5, 13), goals={7})
p7.result((2, 0), ('Fill', 1))
Out[31]:
In [32]:
result = uniform_cost_search(p7)
action_sequence(result)
Out[32]:
In [33]:
def showpath(searcher, problem):
"Show what happens when searcvher solves problem."
problem = Instrumented(problem)
print('\n{}:'.format(searcher.__name__))
result = searcher(problem)
if result:
actions = action_sequence(result)
state = problem.initial
path_cost = 0
for steps, action in enumerate(actions, 1):
path_cost += problem.step_cost(state, action, 0)
result = problem.result(state, action)
print(' {} =={}==> {}; cost {} after {} steps'
.format(state, action, result, path_cost, steps,
'; GOAL!' if problem.is_goal(result) else ''))
state = result
msg = 'GOAL FOUND' if result else 'no solution'
print('{} after {} results and {} goal checks'
.format(msg, problem._counter['result'], problem._counter['is_goal']))
from collections import Counter
class Instrumented:
"Instrument an object to count all the attribute accesses in _counter."
def __init__(self, obj):
self._object = obj
self._counter = Counter()
def __getattr__(self, attr):
self._counter[attr] += 1
return getattr(self._object, attr)
In [34]:
showpath(uniform_cost_search, p7)
In [35]:
p = PourProblem(initial=(0, 0), capacities=(7, 13), goals={2})
showpath(uniform_cost_search, p)
In [36]:
class GreenPourProblem(PourProblem):
def step_cost(self, state, action, result=None):
"The cost is the amount of water used in a fill."
if action[0] == 'Fill':
i = action[1]
return self.capacities[i] - state[i]
return 0
In [37]:
p = GreenPourProblem(initial=(0, 0), capacities=(7, 13), goals={2})
showpath(uniform_cost_search, p)
In [38]:
def compare_searchers(problem, searchers=None):
"Apply each of the search algorithms to the problem, and show results"
if searchers is None:
searchers = (breadth_first_search, uniform_cost_search)
for searcher in searchers:
showpath(searcher, problem)
In [39]:
compare_searchers(p)
In [40]:
import random
N, S, E, W = DIRECTIONS = [(0, 1), (0, -1), (1, 0), (-1, 0)]
def Grid(width, height, obstacles=0.1):
"""A 2-D grid, width x height, with obstacles that are either a collection of points,
or a fraction between 0 and 1 indicating the density of obstacles, chosen at random."""
grid = {(x, y) for x in range(width) for y in range(height)}
if isinstance(obstacles, (float, int)):
obstacles = random.sample(grid, int(width * height * obstacles))
def neighbors(x, y):
for (dx, dy) in DIRECTIONS:
(nx, ny) = (x + dx, y + dy)
if (nx, ny) not in obstacles and 0 <= nx < width and 0 <= ny < height:
yield (nx, ny)
return {(x, y): list(neighbors(x, y))
for x in range(width) for y in range(height)}
Grid(5, 5)
Out[40]:
In [41]:
class GridProblem(Problem):
"Create with a call like GridProblem(grid=Grid(10, 10), initial=(0, 0), goal=(9, 9))"
def actions(self, state): return DIRECTIONS
def result(self, state, action):
#print('ask for result of', state, action)
(x, y) = state
(dx, dy) = action
r = (x + dx, y + dy)
return r if r in self.grid[state] else state
In [42]:
gp = GridProblem(grid=Grid(5, 5, 0.3), initial=(0, 0), goals={(4, 4)})
showpath(uniform_cost_search, gp)
In [43]:
def hardness(problem):
L = breadth_first_search(problem)
#print('hardness', problem.initial, problem.capacities, problem.goals, L)
return len(action_sequence(L)) if (L is not None) else 0
In [44]:
hardness(p7)
Out[44]:
In [45]:
action_sequence(breadth_first_search(p7))
Out[45]:
In [46]:
C = 9 # Maximum capacity to consider
phard = max((PourProblem(initial=(a, b), capacities=(A, B), goals={goal})
for A in range(C+1) for B in range(C+1)
for a in range(A) for b in range(B)
for goal in range(max(A, B))),
key=hardness)
phard.initial, phard.capacities, phard.goals
Out[46]:
In [47]:
showpath(breadth_first_search, PourProblem(initial=(0, 0), capacities=(7, 9), goals={8}))
In [48]:
showpath(uniform_cost_search, phard)
In [49]:
class GridProblem(Problem):
"""A Grid."""
def actions(self, state): return ['N', 'S', 'E', 'W']
def result(self, state, action):
"""The state that results from executing this action in this state."""
(W, H) = self.size
if action == 'N' and state > W: return state - W
if action == 'S' and state + W < W * W: return state + W
if action == 'E' and (state + 1) % W !=0: return state + 1
if action == 'W' and state % W != 0: return state - 1
return state
In [50]:
compare_searchers(GridProblem(initial=0, goals={44}, size=(10, 10)))
In [51]:
def test_frontier():
#### Breadth-first search with FIFO Q
f = FrontierQ(Node(1), LIFO=False)
assert 1 in f and len(f) == 1
f.add(Node(2))
f.add(Node(3))
assert 1 in f and 2 in f and 3 in f and len(f) == 3
assert f.pop().state == 1
assert 1 not in f and 2 in f and 3 in f and len(f) == 2
assert f
assert f.pop().state == 2
assert f.pop().state == 3
assert not f
#### Depth-first search with LIFO Q
f = FrontierQ(Node('a'), LIFO=True)
for s in 'bcdef': f.add(Node(s))
assert len(f) == 6 and 'a' in f and 'c' in f and 'f' in f
for s in 'fedcba': assert f.pop().state == s
assert not f
#### Best-first search with Priority Q
f = FrontierPQ(Node(''), lambda node: len(node.state))
assert '' in f and len(f) == 1 and f
for s in ['book', 'boo', 'bookie', 'bookies', 'cook', 'look', 'b']:
assert s not in f
f.add(Node(s))
assert s in f
assert f.pop().state == ''
assert f.pop().state == 'b'
assert f.pop().state == 'boo'
assert {f.pop().state for _ in '123'} == {'book', 'cook', 'look'}
assert f.pop().state == 'bookie'
#### Romania: Two paths to Bucharest; cheapest one found first
S = Node('S')
SF = Node('F', S, 'S->F', 99)
SFB = Node('B', SF, 'F->B', 211)
SR = Node('R', S, 'S->R', 80)
SRP = Node('P', SR, 'R->P', 97)
SRPB = Node('B', SRP, 'P->B', 101)
f = FrontierPQ(S)
f.add(SF); f.add(SR), f.add(SRP), f.add(SRPB); f.add(SFB)
def cs(n): return (n.path_cost, n.state) # cs: cost and state
assert cs(f.pop()) == (0, 'S')
assert cs(f.pop()) == (80, 'R')
assert cs(f.pop()) == (99, 'F')
assert cs(f.pop()) == (177, 'P')
assert cs(f.pop()) == (278, 'B')
return 'test_frontier ok'
test_frontier()
Out[51]:
In [52]:
%matplotlib inline
import matplotlib.pyplot as plt
p = plt.plot([i**2 for i in range(10)])
plt.savefig('destination_path.eps', format='eps', dpi=1200)
In [53]:
import itertools
import random
# http://stackoverflow.com/questions/10194482/custom-matplotlib-plot-chess-board-like-table-with-colored-cells
from matplotlib.table import Table
def main():
grid_table(8, 8)
plt.axis('scaled')
plt.show()
def grid_table(nrows, ncols):
fig, ax = plt.subplots()
ax.set_axis_off()
colors = ['white', 'lightgrey', 'dimgrey']
tb = Table(ax, bbox=[0,0,2,2])
for i,j in itertools.product(range(ncols), range(nrows)):
tb.add_cell(i, j, 2./ncols, 2./nrows, text='{:0.2f}'.format(0.1234),
loc='center', facecolor=random.choice(colors), edgecolor='grey') # facecolors=
ax.add_table(tb)
#ax.plot([0, .3], [.2, .2])
#ax.add_line(plt.Line2D([0.3, 0.5], [0.7, 0.7], linewidth=2, color='blue'))
return fig
main()
In [55]:
import collections
class defaultkeydict(collections.defaultdict):
"""Like defaultdict, but the default_factory is a function of the key.
>>> d = defaultkeydict(abs); d[-42]
42
"""
def __missing__(self, key):
self[key] = self.default_factory(key)
return self[key]
In [ ]: