First, IPython engines must be started, for example with the following command to launch 2 engines (one per core):
ipcluster start -n 2
In [1]:
from IPython.parallel import Client
The Client
allows to start jobs on the engines.
In [2]:
rc = Client()
We can obtain the engines identifiers through the client.
In [3]:
rc.ids
Out[3]:
ERRATUM: the original code did not contain %px
before the import os
statement. This magic command is necessary so that the import occurs on all engines.
In [4]:
%px import os
The %px
magic commands allows to execute commands in parallel on every engine.
In [5]:
%px print(os.getpid())
We can specify with %pxconfig
the engine identifiers which the commands should be executed on (here, the second engine).
In [6]:
%pxconfig --targets 1
In [7]:
%px print(os.getpid())
Another possibility is to use the %%px
cell magic to run an entire cell on all engines. The --targets
option can accept a slice object (here, all engines except the last one).
In [8]:
%%px --targets :-1
print(os.getpid())
By default, the parallel calls are synchronous (blocking) but we can ask IPython to make asynchronous calls.
In [9]:
%%px --noblock
import time
time.sleep(1)
os.getpid()
Out[9]:
With asynchronous (non-blocking) calls, the results can be obtained synchronously from the engines with %pxresult
. This call is blocking.
In [10]:
%pxresult
Another option to run tasks on the engines is to use map
. First, we need to retrieve a view on the engines, which represents a particular set of engines among the ones that are running.
In [11]:
v = rc[:]
We import a module on each engine.
In [12]:
with v.sync_imports():
import time
We define a simple function.
In [13]:
def f(x):
time.sleep(1)
return x * x
Now, we call map_sync
, which is a synchronous and parallel version of Python's built-in map
function. We execute f
on all integers between 0 and 9 in parallel across all engines.
In [14]:
v.map_sync(f, range(10))
Out[14]:
We check how much time the native function takes.
In [15]:
timeit -n 1 -r 1 map(f, range(10))
And we compare with the time taken by the parallel version.
In [16]:
r = v.map(f, range(10))
In [17]:
r.ready(), r.elapsed
Out[17]:
We wait and get the results.
In [18]:
r.get()
Out[18]:
In [19]:
r.elapsed, r.serial_time
Out[19]: