In [1]:
from cfg import read_grammar_rules
from cfg import WCFG
from rule import Rule
from symbol import is_terminal, is_nonterminal, make_symbol
# for convenience we will use `defaultdict` from the package collections
# which allows us to define a default constructor for values
from collections import defaultdict
In [39]:
G1 = WCFG(read_grammar_rules(open('examples/arithmetic', 'r')))
In [40]:
print G1
In [41]:
class Item(object): # this class is also available in `item.py`
"""A dotted rule used in CKY/Earley."""
def __init__(self, rule, dots):
assert len(dots) > 0, 'I do not accept an empty list of dots'
self.rule_ = rule
self.dots_ = tuple(dots)
def __eq__(self, other):
return self.rule_ == other.rule_ and self.dots_ == other.dots_
def __ne__(self, other):
return not(self == other)
def __hash__(self):
return hash((self.rule_, self.dots_))
def __repr__(self):
return '{0} ||| {1}'.format(self.rule_, self.dots_)
def __str__(self):
return '{0} ||| {1}'.format(self.rule_, self.dots_)
@property
def lhs(self):
return self.rule_.lhs
@property
def rule(self):
return self.rule_
@property
def dot(self):
return self.dots_[-1]
@property
def start(self):
return self.dots_[0]
@property
def next(self):
"""return the symbol to the right of the dot (or None, if the item is complete)"""
if self.is_complete():
return None
return self.rule_.rhs[len(self.dots_) - 1]
def state(self, i):
return self.dots_[i]
def advance(self, dot):
"""return a new item with an extended sequence of dots"""
return Item(self.rule_, self.dots_ + (dot,))
def is_complete(self):
"""complete items are those whose dot reached the end of the RHS sequence"""
return len(self.rule_.rhs) + 1 == len(self.dots_)
In [42]:
r = Rule('[S]', ['[X]'], 0.0)
i1 = Item(r, [0])
i2 = i1.advance(1)
In [43]:
print i1
print i2
In [44]:
i1 != i2
Out[44]:
In [45]:
i1.is_complete()
Out[45]:
In [46]:
i2.is_complete()
Out[46]:
In [47]:
i1.next
Out[47]:
In [48]:
i2.next
We need an agenda of items. Some items are active, others are passive.
The active agenda is nothing but a stack or queue, whereas the passive agenda is simply a set (all items that have already been processed). However, we can further organise the passive items for easy/quick access within inference rules.
In [49]:
class Agenda(object): # this class is also available in `agenda.py`
def __init__(self):
# we are organising active items in a stack (last in first out)
self._active = []
# an item should never queue twice, thus we will manage a set of items which we have already seen
self._seen = set()
# we organise incomplete items by the symbols they wait for at a certain position
# that is, if the key is a pair (Y, i)
# the value is a set of items of the form
# [X -> alpha * Y beta, [...i]]
self._incomplete = defaultdict(set)
# we organise complete items by their LHS symbol spanning from a certain position
# if the key is a pair (X, i)
# then the value is a set of items of the form
# [X -> gamma *, [i ... j]]
self._complete = defaultdict(set)
def __len__(self):
"""return the number of active items"""
return len(self._active)
def push(self, item):
"""push an item into the queue of active items"""
if item not in self._seen: # if an item has been seen before, we simply ignore it
self._active.append(item)
self._seen.add(item)
return True
return False
def pop(self):
"""pop an active item"""
assert len(self._active) > 0, 'I have no items left.'
return self._active.pop()
def make_passive(self, item):
if item.is_complete(): # complete items offer a way to rewrite a certain LHS from a certain position
self._complete[(item.lhs, item.start)].add(item)
else: # incomplete items are waiting for the completion of the symbol to the right of the dot
self._incomplete[(item.next, item.dot)].add(item)
def waiting(self, symbol, dot):
return self._incomplete.get((symbol, dot), set())
def complete(self, lhs, start):
return self._complete.get((lhs, start), set())
def itercomplete(self):
"""an iterator over complete items in arbitrary order"""
for items in self._complete.itervalues():
for item in items:
yield item
Let's see how it works
In [50]:
A = Agenda()
In [51]:
r1 = Rule('[S]', ['[S]', '[X]'], 1.0)
r1
Out[51]:
we can push items into the agenda
In [52]:
A.push(Item(r1, [0])) # S -> S X, [0] (earley axiom)
Out[52]:
and the agenda will make sure there are no duplicates
In [53]:
A.push(Item(r1, [0]))
Out[53]:
In [54]:
len(A)
Out[54]:
In [55]:
i1 = Item(r1, [0])
i1
Out[55]:
In [56]:
A.make_passive(i1)
In [57]:
A._incomplete
Out[57]:
In [58]:
A.push(Item(Rule('[S]', ['[X]'], 1.0), [0]))
Out[58]:
In [59]:
A.make_passive(Item(Rule('[S]', ['[X]'], 1.0), [0]))
In [60]:
A._incomplete
Out[60]:
In [61]:
A.push(Item(Rule('[S]', ['[X]'], 1.0), [0, 1]))
Out[61]:
In [62]:
A.make_passive(Item(Rule('[S]', ['[X]'], 1.0), [0, 1]))
In [63]:
A._complete
Out[63]:
In [64]:
def axioms(cfg, sentence):
"""
:params cfg: a context-free grammar (an instance of WCFG)
:params sentence: the input sentence (as a list or tuple)
:returns: a list of items
"""
items = []
for rule in cfg:
for i in range(len(sentence)):
items.append(Item(rule, [i]))
return items
In [67]:
sentence = 'a * a'.split()
axioms(G1, sentence)
Out[67]:
For every rule X -> w_i alpha such that w_i is an input word at position i, we have an item of the kind:
which gets represented as
Don't forget that these optimised axioms require an additional inference rule (some sort of delayed axioms which resembles a combination of prediction with completion).
In [86]:
def axioms2(cfg, sentence):
"""
:params cfg: a context-free grammar (an instance of WCFG)
:params sentence: the input sentence (as a list or tuple)
:returns: a list of items
"""
by_rhs0 = defaultdict(list)
for rule in cfg:
by_rhs0[rule.rhs[0]].append(rule)
items = []
for i, word in enumerate(sentence):
for rule in by_rhs0.get(word, []):
items.append(Item(rule, [i]))
return items
In [87]:
axioms2(G1, sentence)
Out[87]:
In [70]:
def scan(item, sentence):
assert is_terminal(item.next), 'Only terminal symbols can be scanned, got %s' % item.next
if item.dot < len(sentence) and sentence[item.dot] == item.next:
new = item.advance(item.dot + 1)
return new
else:
return None
In [71]:
S = []
for item in axioms2(G1, sentence):
new = scan(item, sentence)
if new is not None:
S.append(new)
S
Out[71]:
In [ ]:
Here we let an active item interact with passive items:
either an active item is complete, then we try to advance incomplete passive items
or an active item is incomplete, in which case we try to advance the item itself by looking back to complete passive items
Either way, this is the inference rule:
[X -> alpha * Y beta, [i ... k]] [Y -> gamma *, [k ... j]]
----------------------------------------------------------
[X -> alpha Y * beta, [i ... j]]
In [82]:
def complete(item, agenda):
items = []
if item.is_complete():
# advance the dot for incomplete items waiting for item.lhs spanning from item.start
for incomplete in agenda.waiting(item.lhs, item.start):
items.append(incomplete.advance(item.dot))
else:
# look for completions of item.next spanning from item.dot
ends = set()
for complete in agenda.complete(item.next, item.dot):
ends.add(complete.dot)
# advance the dot of the input item for each position that complete a span
for end in ends:
items.append(item.advance(end))
return items
In [88]:
def make_forest(complete_items):
forest = WCFG()
for item in complete_items:
lhs = make_symbol(item.lhs, item.start, item.dot)
rhs = []
for i, sym in enumerate(item.rule.rhs):
rhs.append(make_symbol(sym, item.state(i), item.state(i + 1)))
forest.add(Rule(lhs, rhs, item.rule.prob))
return forest
In [79]:
def cky(cfg, sentence):
A = Agenda()
for item in axioms(cfg, sentence):
A.push(item)
while A:
item = A.pop()
if item.is_complete() or is_nonterminal(item.next):
for new in complete(item, A):
A.push(new)
else:
new = scan(item, sentence)
if new is not None:
A.push(new)
A.make_passive(item)
return make_forest(A.itercomplete())
In [80]:
forest = cky(G1, sentence)
In [81]:
print forest
In [85]:
goal = make_symbol('[E]', 0, len(sentence))
goal
Out[85]:
In [ ]: