In [1]:
import numpy as np
We will only use engines on local cores which does not require any setup - see docs for detailed instructions on how to set up a remote cluster, including setting up to use Amazon EC2 clusters.
You can start a cluster on the IPython Clusters
tab in the main Jupyter browser window or via the command line with
ipcluster start -n <put desired number of engines to run here>
The main advantage of developing parallel applications using ipyparallel
is that it can be done interactively within Jupyter.
In [2]:
from ipyparallel import Client
The client connects to the cluster of "remote" engines that perfrom the actual computation. These engines may be on the same machine or on a cluster.
In [3]:
rc = Client()
In [4]:
rc.ids
Out[4]:
A view provides access to a subset of the engines available to the client. Jobs are submitted to the engines via the view. A direct view allows the user to explicitly send work specific engines. The load balanced view is like the Pool
object in multiprocessing
, and manages the scheduling and distribution of jobs for you.
Direct view
In [5]:
dv = rc[:]
Add 10 sets of 3 numbers in parallel using all engines.
In [6]:
dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
Out[6]:
Add 10 sets of 3 numbers in parallel using only alternate engines.
In [7]:
rc[::2].map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
Out[7]:
Add 10 sets of 3 numbers using a specific engine.
In [8]:
rc[2].map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
Out[8]:
Load balanced view
Use this when you have many jobs that take different amounts of time to complete.
In [9]:
lv = rc.load_balanced_view()
In [10]:
lv.map_sync(lambda x: sum(x), np.random.random((10, 100000)))
Out[10]:
In contrast to map
, apply
is just a simple function call run on all remote engines, and has the usual function signature apply(f, *args, **kwargs)
. It is a primitive on which other more useful functions (such as map
) are built upon.
In [11]:
rc[1:3].apply_sync(lambda x, y: x**2 + y**2, 3, 4)
Out[11]:
In [12]:
rc[1:3].apply_sync(lambda x, y: x**2 + y**2, x=3, y=4)
Out[12]:
We have used the map_sync
and apply_sync
methods. The sync
suffix indicate that we want to run a synchronous job. Synchronous jobs block
until all the computation is done and return the result.
In [13]:
res = dv.map_sync(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [14]:
res
Out[14]:
In contrast, asynchronous jobs return immediately so that you can do other work, but returns a AsyncMapResult
object, similar to the future
object returned by the concurrent.futures
package. You can query its status, cancel running jobs and retrieve results once they have been computed.
In [15]:
res = dv.map_async(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [16]:
res
Out[16]:
In [17]:
res.done()
Out[17]:
In [18]:
res.get()
Out[18]:
There is also a map
method that by default uses asynchronous mode, but you can change this by setting the block
attribute or function argument.
In [19]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [20]:
res.get()
Out[20]:
Change blocking mode for just one job.
In [21]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10), block=True)
In [22]:
res
Out[22]:
Change blocking mode for this view so that all jobs are synchronous.
In [23]:
dv.block = True
In [24]:
res = dv.map(lambda x, y, z: x + y + z, range(10), range(10), range(10))
In [25]:
res
Out[25]:
The @remote
decorator results in functions that will execute simultaneously on all engines in a view. For example, you can use this decorator if you always want to run $n$ independent parallel MCMC chains.
In [26]:
@dv.remote(block = True)
def f1(n):
import numpy as np
return np.random.rand(n)
In [27]:
f1(4)
Out[27]:
The @parallel decorator breaks up elementwise operations and distributes them.
In [28]:
@dv.parallel(block = True)
def f2(x):
return x
In [29]:
f2(range(15))
Out[29]:
In [30]:
@dv.parallel(block = True)
def f3(x):
return sum(x)
In [31]:
f3(range(15))
Out[31]:
In [32]:
@dv.parallel(block = True)
def f4(x, y):
return x + y
In [33]:
f4(np.arange(10), np.arange(10))
Out[33]:
In [34]:
def mandel1(x, y, max_iters=80):
c = complex(x, y)
z = 0.0j
for i in range(max_iters):
z = z*z + c
if z.real*z.real + z.imag*z.imag >= 4:
return i
return max_iters
In [35]:
@dv.parallel(block = True)
def mandel2(x, y, max_iters=80):
c = complex(x, y)
z = 0.0j
for i in range(max_iters):
z = z*z + c
if z.real*z.real + z.imag*z.imag >= 4:
return i
return max_iters
In [36]:
x = np.arange(-2, 1, 0.01)
y = np.arange(-1, 1, 0.01)
X, Y = np.meshgrid(x, y)
In [37]:
%%time
im1 = np.reshape(list(map(mandel1, X.ravel(), Y.ravel())), (len(y), len(x)))
In [38]:
%%time
im2 = np.reshape(mandel2.map(X.ravel(), Y.ravel()), (len(y), len(x)))
In [39]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
axes[0].grid(False)
axes[0].imshow(im1, cmap='jet')
axes[1].grid(False)
axes[1].imshow(im2, cmap='jet')
pass
Modules imported locally are NOT available in the remote engines.
In [ ]:
import time
import datetime
In [ ]:
def g1(x):
time.sleep(0.1)
now = datetime.datetime.now()
return (now, x)
This fails with an Exception because the time
and datetime
modules are not imported in the remote engines.
dv.map_sync(g1, range(10))
The simplest fix is to import the module(s) within the function
In [ ]:
def g2(x):
import time, datetime
time.sleep(0.1)
now = datetime.datetime.now()
return (now, x)
In [ ]:
dv.map_sync(g2, range(5))
Alternatively, you can simultaneously import both locally and in the remote engines with the sync_import
context manager.
In [ ]:
with dv.sync_imports():
import time
import datetime
Now the g1
function will work.
In [ ]:
dv.map_sync(g1, range(5))
Finally, there is also a require
decorator that can be used. This will force the remote engine to import all packages given.
In [40]:
from ipyparallel import require
In [41]:
@require('scipy.stats')
def g3(x):
return scipy.stats.norm(0,1).pdf(x)
In [42]:
dv.map(g3, np.arange(-3, 4))
Out[42]:
We can send data to remote engines with push
and retrieve them with pull
, or using the dictionary interface. For example, you can use this to distribute a large lookup table to all engines once instead of repeatedly as a function argument.
In [43]:
dv.push(dict(a=3, b=2))
Out[43]:
In [44]:
def f(x):
global a, b
return a*x + b
In [45]:
dv.map_sync(f, range(5))
Out[45]:
In [46]:
dv.pull(('a', 'b'))
Out[46]:
In [47]:
dv['c'] = 5
In [48]:
dv['a']
Out[48]:
In [49]:
dv['c']
Out[49]:
Using numba.jit
is straightforward.
In [50]:
with dv.sync_imports():
import numba
In [51]:
@numba.jit
def f_numba(x):
return np.sum(x)
In [52]:
dv.map(f_numba, np.random.random((6, 4)))
Out[52]:
We need to do some extra work to make sure the shared libary compiled with cython is available to the remote engines:
named
shared module with the -n
flagnp.ndarray[dtype, ndim]
in place of memroy viewssite-packages
directory~/.ipython/cython
In [53]:
%load_ext cython
In [54]:
%%cython -n cylib
import cython
import numpy as np
cimport numpy as np
@cython.boundscheck(False)
@cython.wraparound(False)
def f(np.ndarray[np.float64_t, ndim=1] x):
x.setflags(write=True)
cdef int i
cdef int n = x.shape[0]
cdef double s = 0
for i in range(n):
s += x[i]
return s
In [55]:
import os
import glob
import site
import shutil
src = glob.glob(os.path.join(os.path.expanduser('~/'), '.ipython', 'cython', 'cylib*so'))[0]
dst = site.getsitepackages()[0]
shutil.copy(src, dst)
Out[55]:
In [56]:
with dv.sync_imports():
import cylib
In [57]:
dv.map(cylib.f, np.random.random((6, 4)))
Out[57]:
This sends the command to all targeted engines.
In [58]:
%px import numpy as np
%px a = np.random.random(4)
%px a.sum()
The scatter
method partitions and distributes data to all engines. The gather
method does the reverse. Together with %px
, we can simulate parallel list comprehensions.
In [59]:
dv.scatter('a', np.random.randint(0, 10, 10))
%px print(a)
In [60]:
dv.gather('a')
Out[60]:
In [61]:
dv.scatter('xs', range(24))
%px y = [x**2 for x in xs]
np.array(dv.gather('y'))
Out[61]:
In [62]:
%%px --target [1,3]
%matplotlib inline
import seaborn as sns
x = np.random.normal(np.random.randint(-10, 10), 1, 100)
sns.kdeplot(x);
In [63]:
%%px --target [1,3] --noblock
%matplotlib inline
import seaborn as sns
x = np.random.normal(np.random.randint(-10, 10), 1, 100)
sns.kdeplot(x);
Out[63]:
In [64]:
%pxresult