Get the presentation
On Github IO: http://dstuebe.github.io/generators/
On NbViewer: http://nbviewer.ipython.org/github/dstuebe/generators/blob/gh-pages/presentation.ipynb
March 2, 2015
Copyright(C) 2015, David Stuebe
A kind of function that can return an intermediate result ("the next
value") to its caller, but maintaining the function's local state so
that the function can be resumed again right where it left off.
PEP 255 introduced the generator object and the yield statement in version 2.2 of Python.
In [138]:
def fib():
a, b = 0, 1
while True:
yield b
a, b = b, a+b
In [139]:
function_result = fib()
print function_result
The result of calling the gererator function is a generator object. The generator object can be used as an iterator.
In [140]:
function_result.__iter__() is function_result
Out[140]:
In [141]:
zip(xrange(10),function_result)
Out[141]:
In [142]:
(10, function_result.next())
Out[142]:
In [117]:
def noisy_generator():
print ' Initializing'
print ' first yield'
yield "A"
print ' generator running...'
print ' second yield'
yield "B"
print ' generator running...'
print ' now what?'
In [118]:
g = noisy_generator()
print 'Calling next...'
print 'Result 1: %s' % g.next()
print 'Result 2: %s' % g.next()
print 'Result 3: %s' % g.next()
In [181]:
help(g)
In [182]:
help(g.throw)
Looks fun - lets try it...
In [183]:
def accepting_generator():
print 'Initializing'
try:
yield 'Hello'
print 'generator running...'
except StandardError as e:
print "Error Message: %s" % e
yield 'World'
g = accepting_generator()
g.next()
Out[183]:
In [184]:
g.throw(StandardError, 'Foo bar baz')
Out[184]:
In [185]:
g.next()
In [447]:
help(g.close)
In [448]:
def closeable():
try:
yield 1
yield 2
except GeneratorExit:
print 'closing'
g = closeable()
g.next()
Out[448]:
In [449]:
g.close()
In [450]:
g.next()
In [451]:
help(g.send)
In [452]:
g = fib()
g.send('foo')
so next() is really just send(None)
In [453]:
g = fib()
g.send(None)
Out[453]:
In [454]:
g.send(None)
Out[454]:
What is send('foo') ?
In [455]:
g.send('foo')
Out[455]:
Where did it go?
Coroutines are a natural way of expressing many algorithms, such as
simulations, games, asynchronous I/O, and other forms of event-
driven programming or co-operative multitasking. Python's generator
functions are almost coroutines -- but not quite -- in that they
allow pausing execution to produce a value, but do not provide for
values or exceptions to be passed in when execution resumes. They
also do not allow execution to be paused within the "try" portion of
try/finally blocks, and therefore make it difficult for an aborted
coroutine to clean up after itself.
PEP 342 added close, send and throw to the generator in version 2.5 of python and made yield an expresion rather than a statement.
In [143]:
def adder(val):
x = 0
while True:
x = yield val+x
g = adder(5)
g.send(None)
Out[143]:
In [144]:
g.send(2)
Out[144]:
In [145]:
g.send(6)
Out[145]:
In [146]:
def noisy_coroutine():
print ' Initializing'
print ' first yield'
received = yield "A"
print ' generator running after receiving: %s' % received
print ' second yield'
received = yield "B"
print ' generator running after receiving: %s' % received
print ' now what?'
In [147]:
g = noisy_coroutine()
print 'Calling send...'
g.send(None)
Out[147]:
In [148]:
g.send(1)
Out[148]:
In [149]:
g.send(2)
In [150]:
def foo():
x = yield
y = yield
z = yield
yield x, y, z
g = foo()
g.next()
g.send(1)
g.send(2)
print g.send(3)
In [81]:
def consumer(func):
def wrapper(*args,**kw):
gen = func(*args, **kw)
gen.next()
return gen
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
In [82]:
g = consumer(adder)(4)
print g.send(11)
print g.send(2)
In [83]:
@consumer
def subtractor(val):
'''A generator that subtracts numbers from a value'''
x = 0
while True:
x = yield x - val
g = subtractor(8)
g.send(15)
Out[83]:
In [84]:
help(subtractor)
In [418]:
# From http://www.dabeaz.com/coroutines/pipeline.py
def grep(pattern,lines):
for line in lines:
if pattern in line:
yield line
import time
def follow(thefile):
thefile.seek(0,2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1) # Sleep briefly
continue
yield line
# Set up a processing pipe : tail -f | grep python
with open("access-log") as logfile:
loglines = follow(logfile)
pylines = grep("python",loglines)
# Pull results out of the processing pipeline
for line in pylines:
print line,
In [423]:
import time
def follow(thefile, target):
thefile.seek(0,2) # Go to the end of the file
while True:
line = thefile.readline()
if not line:
time.sleep(0.1) # Sleep briefly
continue
target.send(line)
@consumer
def grep(pattern, target):
print "Looking for %s" % pattern
while True:
line = (yield)
if pattern in line:
target.send(line),
@consumer
def printer():
while True:
line = (yield)
print line,
with open("access-log") as logfile:
follow(logfile, grep('python',printer() ) )
In [5]:
# An object
class GrepHandler(object):
def __init__(self,pattern, target):
self.pattern = pattern
self.target = target
def send(self,line):
if self.pattern in line:
self.target.send(line)
# A coroutine
@consumer
def grep(pattern,target):
while True:
line = (yield)
if pattern in line:
target.send(line)
# A null-sink to send data
@consumer
def null():
while True: item = (yield)
# A benchmark
line = 'python is nice'
p1 = grep('python',null()) # Coroutine
p2 = GrepHandler('python',null()) # Object
from timeit import timeit
print "coroutine:", timeit("p1.send(line)", "from __main__ import line, p1")
print "object:", timeit("p2.send(line)", "from __main__ import line, p2")
In [114]:
%timeit p1.send(line)
Beware - ipython magic %timeit does not play nice with generator send calls!
The idea for this benchmark comes from a stack overflow question a-faster-nested-tuple-to-list-and-back
I'm trying to perform tuple to list and list to tuple conversions on nested sequences of unknown depth and shape. The calls are being made hundreds of thousands of times, which is why I'm trying to squeeze out as much speed as possible.
First, lets define a function to make some test data...
In [153]:
def make_test(m, n):
return [[range(m), make_test(m, n-1)] for i in range(n)]
make_test(2,3)
Out[153]:
In [154]:
def list2tuple(a):
return tuple((list2tuple(x) if isinstance(x, list) else x for x in a))
def tuple2list(a):
return list((tuple2list(x) if isinstance(x, tuple) else x for x in a))
list2tuple(make_test(2,3))
Out[154]:
In [155]:
make_test(2,3) == tuple2list(list2tuple(make_test(2,3)))
Out[155]:
In [156]:
@consumer
def colist2tuple():
"""Convertes lists to tuples"""
result = []
while True:
lst, l2t = yield result
result = tuple((l2t.send((x, l2t)) if isinstance(x,list) else x for x in lst) )
l2t = colist2tuple()
l2t.send((make_test(2,3),l2t))
Generators are not recursive...
In [157]:
@consumer
def colist2tuple():
"""Convertes lists to tuples"""
result = []
while True:
(lst, pool) = yield result
result = tuple((pool[0].send((x, pool[1:])) if isinstance(x,list) else x for x in lst) )
@consumer
def cotuple2list():
"""converts tuples to lists"""
result = None
while True:
(tup, pool) = (yield result)
result = list((pool[0].send((x, pool[1:])) if isinstance(x, tuple) else x for x in tup))
class GenPool:
def __init__(self, gen_func, depth):
self.pool = [gen_func() for i in xrange(depth) ]
def send(self,iterable):
return self.pool[0].send((iterable, self.pool[1:]))
l2t_pool = GenPool(colist2tuple,5)
t2l_pool = GenPool(cotuple2list,5)
make_test(3,2) == t2l_pool.send(l2t_pool.send(make_test(3,2)))
Out[157]:
In [158]:
def inplace_list2tuple(a):
for (i,x) in enumerate(a):
if isinstance(x,list):
a[i] = inplace_list2tuple(x)
return tuple(a)
def inplace_tuple2list(a):
a = list(a) # can't really modify a tuple in place...
for (i,x) in enumerate(a):
if isinstance(x,tuple):
a[i] = inplace_tuple2list(x)
return a
make_test(2,3) == inplace_tuple2list(inplace_list2tuple(make_test(2,3)))
Out[158]:
In [166]:
@consumer
def inplace_colist2tuple():
"""Convertes lists to tuples"""
result = None
while True:
(lst, co_pool) = (yield result)
for (i,x) in enumerate(lst):
if isinstance(x,list):
lst[i] = co_pool[0].send((x, co_pool[1:]))
result = tuple(lst)
@consumer
def inplace_cotuple2list():
"""converts tuples to lists"""
result = None
while True:
(tup, co_pool) = (yield result)
result = list(tup)
for (i,x) in enumerate(result):
if isinstance(x,tuple):
result[i] = co_pool[0].send((x, co_pool[1:]))
l2t_pool = GenPool(inplace_colist2tuple,5)
t2l_pool = GenPool(inplace_cotuple2list,5)
make_test(3,2) == t2l_pool.send(l2t_pool.send(make_test(3,2)))
Out[166]:
In [168]:
import timeit
breadth,depth = 25,8
number,repeat = 2,3
l2t_pool = GenPool(colist2tuple,breadth)
t2l_pool = GenPool(cotuple2list,breadth)
inplace_l2t_pool = GenPool(inplace_colist2tuple,breadth)
inplace_t2l_pool = GenPool(inplace_cotuple2list,breadth)
In [40]:
# Compare round trip operations
print "Generator %s" % ["%0.5f" % (v/number) for v in timeit.repeat('t2l_pool.send(l2t_pool.send(test_data))', setup='from __main__ import t2l_pool, l2t_pool, make_test, depth, breadth; test_data = make_test(breadth, depth)', number=number, repeat=repeat)]
print "Recursive %s" % ["%0.5f" % (v/number) for v in timeit.repeat('tuple2list(list2tuple(test_data))', setup='from __main__ import tuple2list, list2tuple, make_test, depth, breadth; test_data = make_test(breadth, depth)', number=number, repeat=repeat)]
In [41]:
# Compare round trip operations using inplace for list to tuple
print "Generator %s" % ["%0.5f" % (v/number) for v in timeit.repeat('inplace_t2l_pool.send(inplace_l2t_pool.send(test_data))', setup='from __main__ import inplace_l2t_pool, inplace_t2l_pool, make_test, depth, breadth; test_data = make_test(breadth, depth)', number=number, repeat=repeat)]
print "Recursive %s" % ["%0.5f" % (v/number) for v in timeit.repeat('inplace_tuple2list(inplace_list2tuple(test_data))', setup='from __main__ import inplace_tuple2list, inplace_list2tuple, make_test, depth, breadth; test_data = make_test(breadth, depth)', number=number, repeat=repeat)]
In [42]:
# Compare just list2tuple using inplace
print "Generator %s" % ["%0.5f" % (v/number) for v in timeit.repeat('inplace_l2t_pool.send(test_data)', setup='from __main__ import inplace_l2t_pool, make_test, depth, breadth; test_data = make_test(breadth, depth)', number=number, repeat=repeat)]
print "Recursive %s" % ["%0.5f" % (v/number) for v in timeit.repeat('inplace_list2tuple(test_data)', setup='from __main__ import inplace_list2tuple, make_test, depth, breadth; test_data = make_test(breadth, depth)', number=number, repeat=repeat)]
Generators are not magic - a function call is still a function call and the gil still serializes the process.
In [146]:
test_data = make_test(breadth,depth)
# Courtine
%timeit t2l_pool.send(l2t_pool.send(test_data))
# Recursive
%timeit tuple2list(list2tuple(test_data))
In [35]:
inplace_l2t_pool = GenPool(inplace_colist2tuple,breadth)
inplace_t2l_pool = GenPool(inplace_cotuple2list,breadth)
test_data = make_test(breadth,depth)
%timeit inplace_t2l_pool.send(inplace_l2t_pool.send(test_data))
test_data = make_test(breadth,depth)
%timeit inplace_tuple2list(inplace_list2tuple(test_data))
In [156]:
# Generator
test_data = make_test(breadth,depth)
%timeit inplace_l2t_pool.send(test_data)
# Recursive
test_data = make_test(breadth,depth)
%timeit inplace_list2tuple(test_data)
In [43]:
import cython
cython_list2tuple, cython_tuple2list = cython.inline(
"""
@cython.profile(True)
def cython_list2tuple(a):
return tuple([cython_list2tuple(x) if type(x)==list else x for x in a])
@cython.profile(True)
def cython_tuple2list(a):
return [cython_tuple2list(x) if type(x)==tuple else x for x in a]
"""
).values() # it returns a dict of named functions
make_test(3,2) == cython_tuple2list(cython_list2tuple(make_test(3,2)))
Out[43]:
In [44]:
print "Cython: %s" % ["%0.5f" % (v/number) for v in timeit.repeat('cython_tuple2list(cython_list2tuple(t))', setup='from __main__ import cython_list2tuple, cython_tuple2list, make_test, depth, breadth; t = make_test(breadth, depth)', number=number, repeat=repeat)]
As expected, cython is blazing fast
In [163]:
import cProfile
In [164]:
test_data = make_test(breadth, depth)
cProfile.run('tuple2list(list2tuple(test_data));')
In [165]:
test_data = make_test(breadth, depth)
cProfile.run('inplace_tuple2list(inplace_list2tuple(test_data));')
In [49]:
test_data = make_test(breadth, depth)
cProfile.run('cython_tuple2list(cython_list2tuple(test_data));')
In [169]:
test_data = make_test(breadth, depth)
cProfile.run('t2l_pool.send(l2t_pool.send(test_data))')
In [171]:
test_data = make_test(breadth, depth)
cProfile.run('inplace_t2l_pool.send(inplace_l2t_pool.send(test_data))')
In [174]:
import numpy
import itertools
# make some data
n = 1000000
a = numpy.random.randn(n)
b = numpy.random.randn(n)
number,repeat = 2,3
In [175]:
print "Numpy Dot Product"
print "Value %s" % numpy.dot(a,b)
print "Time %s" % ["%0.5f" % (v/number) for v in timeit.repeat('numpy.dot(a,b)', setup='from __main__ import numpy, a, b', number=number, repeat=repeat)]
In [177]:
def naive_loop(it):
result = numpy.float64(0.0)
for (a_val,b_val) in it:
result += a_val*b_val
return result
print "Naive Dot Product"
print "Value: %s" % naive_loop(numpy.nditer([a, b]))
print "Time: %s" % ["%0.5f" % (v/number) for v in timeit.repeat('naive_loop(it)', setup='from __main__ import numpy, naive_loop, a, b; it = numpy.nditer([a, b])', number=number, repeat=repeat)]
In [176]:
@consumer
def mult(target=None):
result = None
while True:
(a, b) = (yield result)
result = target.send(a*b)
@consumer
def add():
result = numpy.float64(0.0)
while True:
m = (yield result)
result += m
dot_product_process = mult(add())
def gen_loop(it):
dot_product = None
for a_val,b_val in it:
dot_product = dot_product_process.send((a_val, b_val))
return dot_product
print "Coroutinte Dot Product"
print "Value: %s" % gen_loop(numpy.nditer([a, b]))
print "Time %s" % ["%0.5f" % (v/number) for v in timeit.repeat('gen_loop(it)', setup='from __main__ import numpy, gen_loop, a, b; it = numpy.nditer([a, b])', number=number, repeat=repeat)]
In [187]:
# Coroutine processes without using return...
@consumer
def mult_allocated_result(target):
while True:
(a, b) = (yield)
result = target.send(a*b)
@consumer
def add_allocated_result(result):
while True:
m = (yield)
result[0] += m
result = numpy.zeros((1,))
dot_product_process_noresult = mult_allocated_result(add_allocated_result(result))
def allocated_result_loop(it):
dot_product = None
for a_val,b_val in it:
dot_product_process_noresult.send((a_val, b_val))
print "Coroutine Dot Product:"
allocated_result_loop(numpy.nditer([a, b]))
print "Value: %s" % result
print "Time %s" % ["%0.5f" % (v/number) for v in timeit.repeat('allocated_result_loop(it)', setup='from __main__ import numpy, allocated_result_loop, a, b; it = numpy.nditer([a, b])', number=number, repeat=repeat)]
In [77]:
n = 100000
a = numpy.random.randn(n)
b = numpy.random.randn(n)
In [78]:
cProfile.run('gen_loop(numpy.nditer([a, b]))')
In [79]:
cProfile.run('naive_loop(numpy.nditer([a, b]))')
Unfortunately, we don't get a lot of insight into why the naive loop is soo slow.
In [95]:
TARGETS = 'targets'
class AnalysisWindowComplete(Exception):
"""Used in coroutine control flow, this is not an error condition."""
@consumer
def myprinter(name):
while True:
p = (yield)
print " PrinterName: %s; says: %s" % (name, p)
In [104]:
@consumer
def average(targets):
try:
while True:
cnt, result = 0, 0.0
try:
while True:
result += (yield)
cnt += 1
except AnalysisWindowComplete as wc:
print ' In complete with:', wc
for target in targets:
target.send(result/cnt)
except (ValueError, IndexError) as e:
raise
In [105]:
avg_co = average([myprinter('#1'),] )
avg_co.send(5.)
avg_co.send(5.)
avg_co.send(5.)
avg_co.send(4.)
print "Call Complete"
avg_co.throw(AnalysisWindowComplete,'foobar')
In [106]:
def get_targets(co):
'''Get the targets from the generators frame locals'''
try:
return co.gi_frame.f_locals[TARGETS]
except KeyError as ke:
raise KeyError('No target key found?')
def set_targets(co, targets):
"""Set new targets after the generator has started!"""
t = get_targets(co)
while len(t) > 0:
t.pop()
for target in targets:
t.append(target)
In [108]:
# Now continue using the same coroutine workflow for next analysis
avg_co.send(6)
avg_co.send(6)
avg_co.send(7)
# change where things go...
set_targets(avg_co,(myprinter('#2'),myprinter('#3')) )
avg_co.throw(AnalysisWindowComplete,'Print twice!')
Generators can be used to build some really powerful data processing flows
Pitfalls
With lots more pictures than this talk...
http://www.dabeaz.com/generators-uk/
http://www.dabeaz.com/coroutines/
http://www.dabeaz.com/finalgenerator/
Combining Context Managers, Decorators and Generators in Python 3.0 for control flow of inline thread execution.
Get the presentation On Github IO: http://dstuebe.github.io/generators/
On NbViewer: http://nbviewer.ipython.org/github/dstuebe/generators/blob/gh-pages/presentation.ipynb
In [ ]: