In [33]:
%load_ext autoreload
%autoreload 2
In [34]:
import ipyparallel
from ipyparallel import Client
import time
import ipy_executor
For this to work you are going to need a collection of engines to connect to. You can probably create a local collection by just running ipcluster -n 8; for more sophisticated setups read the ipyparallel docs. You set up "profiles" and can start and connect to different engine setups by specifying profiles.
In [35]:
c = Client()
c.ids
Out[35]:
Parallel execution! You can turn the delay up to confirm that it's really running in parallel.
In [36]:
def f(x):
import time
time.sleep(1)
return x**2
with ipy_executor.IpyExecutor(c) as ex:
print(list(ex.map(f,range(3*len(c.ids)))))
Just checking that exceptions are correctly propagated - that is, sent back from the engines and attached to the Future, to be handled however the Future's creator thinks appropriate. map just cancels all outstanding Futures (running Futures cannot be interrupted) and re-raises the exception. So the below should just take a second, not a hundred seconds.
In [37]:
def g(x):
import time
time.sleep(1)
if x==3:
raise ValueError("Oops!")
return x**2
with ipy_executor.IpyExecutor(c) as ex:
list(ex.map(g,range(100*len(c.ids))))
You need to make sure the objects you care about are available on the engines. A "direct view" lets you push them into the engine namespace.
In [38]:
dview = c[:]
def h(x):
return h_internal(x)
exponent = 2
def h_internal(x):
return x**exponent
dview.push(dict(h_internal=h_internal,
exponent=exponent))
with ipy_executor.IpyExecutor(c) as ex:
print(list(ex.map(h,range(30))))
Tracebacks are slightly wonky since this is interactive code but at least you can see the remote stack.
In [39]:
def k(x):
return k_internal(x)
def k_internal(x):
if x==7:
raise ValueError("blarg")
return x**2
dview.push(dict(k_internal=k_internal))
with ipy_executor.IpyExecutor(c) as ex:
print(list(ex.map(k,range(30))))
Let's make sure the executor finishes all its jobs even after shutdown is called.
In [40]:
def l(x):
import time
time.sleep(0.1)
return 2*x
ex = ipy_executor.IpyExecutor(c)
fs = [ex.submit(l,i) for i in range(100)]
ex.shutdown(wait=False)
del ex
for f in fs:
print(f.result(), end=' ')
In [ ]: