In [33]:
%load_ext autoreload
%autoreload 2


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload

In [34]:
import ipyparallel
from ipyparallel import Client

import time

import ipy_executor

For this to work you are going to need a collection of engines to connect to. You can probably create a local collection by just running ipcluster -n 8; for more sophisticated setups read the ipyparallel docs. You set up "profiles" and can start and connect to different engine setups by specifying profiles.


In [35]:
c = Client()
c.ids


Out[35]:
[0, 1, 2, 3, 4, 5, 6, 7]

Parallel execution! You can turn the delay up to confirm that it's really running in parallel.


In [36]:
def f(x):
    import time
    time.sleep(1)
    return x**2

with ipy_executor.IpyExecutor(c) as ex:
    print(list(ex.map(f,range(3*len(c.ids)))))


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529]

Just checking that exceptions are correctly propagated - that is, sent back from the engines and attached to the Future, to be handled however the Future's creator thinks appropriate. map just cancels all outstanding Futures (running Futures cannot be interrupted) and re-raises the exception. So the below should just take a second, not a hundred seconds.


In [37]:
def g(x):
    import time
    time.sleep(1)
    if x==3:
        raise ValueError("Oops!")
    return x**2

with ipy_executor.IpyExecutor(c) as ex:
    list(ex.map(g,range(100*len(c.ids))))


---------------------------------------------------------------------------ValueError                                Traceback (most recent call last)<string> in <module>()
<ipython-input-37-1a7b99969cca> in g(x)
ValueError: Oops!

You need to make sure the objects you care about are available on the engines. A "direct view" lets you push them into the engine namespace.


In [38]:
dview = c[:]

def h(x):
    return h_internal(x)

exponent = 2
def h_internal(x):
    return x**exponent
dview.push(dict(h_internal=h_internal,
                exponent=exponent))

with ipy_executor.IpyExecutor(c) as ex:
    print(list(ex.map(h,range(30))))


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841]

Tracebacks are slightly wonky since this is interactive code but at least you can see the remote stack.


In [39]:
def k(x):
    return k_internal(x)

def k_internal(x):
    if x==7:
        raise ValueError("blarg")
    return x**2
dview.push(dict(k_internal=k_internal))

with ipy_executor.IpyExecutor(c) as ex:
    print(list(ex.map(k,range(30))))


---------------------------------------------------------------------------ValueError                                Traceback (most recent call last)<string> in <module>()
<ipython-input-39-ba9d7e977960> in k(x)
<ipython-input-39-ba9d7e977960> in k_internal(x)
ValueError: blarg

Let's make sure the executor finishes all its jobs even after shutdown is called.


In [40]:
def l(x):
    import time
    time.sleep(0.1)
    return 2*x
ex = ipy_executor.IpyExecutor(c)
fs = [ex.submit(l,i) for i in range(100)]
ex.shutdown(wait=False)
del ex
for f in fs:
    print(f.result(), end=' ')


0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98 100 102 104 106 108 110 112 114 116 118 120 122 124 126 128 130 132 134 136 138 140 142 144 146 148 150 152 154 156 158 160 162 164 166 168 170 172 174 176 178 180 182 184 186 188 190 192 194 196 198 

In [ ]: