back to Index
couroutine tests
In [1]:
# Cross-notebook include shim
with open("nbinclude.ipynb") as nbinclude_f: # don't rename nbinclude_f
import IPython.nbformat.current
get_ipython().run_cell(IPython.nbformat.current.read(nbinclude_f, 'json').worksheets[0].cells[0].input)
In [4]:
def grep(pattern):
print "Looking for %s" % pattern
while True:
line = (yield)
if pattern in line:
print line,
In [5]:
g = grep("python")
In [6]:
g.next()
In [8]:
g.send("Yeah, but no, but yeah, but no")
In [9]:
g.send("A series of tubes")
In [10]:
g.send("python generators rock!")
In [4]:
import sys
In [5]:
def counter():
i = 0;
while True:
i+=1
print i
sys.stdout.flush()
yield
In [6]:
c=counter()
In [7]:
c.next()
In [35]:
c2=counter()
In [36]:
c2=counter()
In [37]:
c2.next()
In [3]:
import coroutine
In [1]:
@coroutine
def printer():
tmp=(yield)
print tmp
def sender(coru):
coru.send("hello")
print "I'm sender"
In [8]:
import time
In [9]:
def countersleep():
i = 0;
while True:
i+=1
print i
print 'leaving...'
sys.stdout.flush()
time.sleep(1)
print 'out!'
yield
In [17]:
cs=countersleep()
In [19]:
next(cs)
In [20]:
next(cs)
In [ ]: