After speaking to a number of academics involved with Monte Carlo simulations, I realised many of them resort to creating bash scripts to run a number of python instances of simulation before writing another script to piece the data back together. It turns out this is incredibly simple to do in IPython, and in particular in an IPython Notebook. We take a look at how to run computations in parallel as well as giving a use-case in the creation of Julia fractals.
Most modern CPUs have 2 or more cores, allowing for everyone to take full advantage of parallel computing. The IPython implementation here is probably most useful for when you have some computationally intensive program which is not called on a regular basis. Writing a full parallel implementation can be time consuming (depending on the problem and the experience of the programmer), however as we'll see, writing an implementation in IPython can be quick and painless. Of course more tailored parallel implementations require more effort should use a more established MPI such as OpenMPI. Thankfully this can be done within the IPython framework.
To begin, we will set up the engines that we will use for our computation.
The client is handler for all of the engines (kernels) on a cluster.
In [2]:
from IPython.parallel import Client
%pylab inline
Before we start, we first need to assign a number of engines in our cluster. This can be done through the IPython notebook interface or through the command line. The basic syntax is
$ ipcluster start -n 4
which starts a cluster of 4 engines to compute on. For more help use $ ipcluster -h
or consult the documentation to set up engines with specific settings or parameters by creating a profile.
We begin by calling the Client() function.
In [3]:
client = Client()
client.block = True # Computations run synchronously.
print client.ids
In [4]:
dview = client.direct_view()
In [5]:
def f(x):
return x
dview.apply(f, "Hello World")
Out[5]:
One can also select a subset of the engines directly using list slicing:
In [6]:
even_dview = client[::2]
even_dview.apply(f, "Hello World")
Out[6]:
In [7]:
bview = client.load_balanced_view()
bview.apply(f, "Hello World")
Out[7]:
In [8]:
import os
print os.getpid()
As shown before, we can apply a function (with parameters if needed) to the view, which returns the result for each engine in a list.
In [9]:
dview.apply(os.getpid)
Out[9]:
Alternatively we can apply a function decorator to our function and call it as normal. A function decorator is a wrapper function, or a function which has one or more functions as a parameter.
In [10]:
@dview.remote(block=True)
def getpid():
import os
return os.getpid()
getpid()
Out[10]:
Another method uses the cell magic %%px
or line magic %px
. This executes the line or cell on a direct view of the current client (specifically its current list of targets).
In [11]:
%%px
import os
print os.getpid()
Finally, and most fundamentally, one can excute a string of code using the execute function.
In [12]:
res = dview.execute('print os.getpid()')
res.display_outputs()
In [13]:
dview.execute('import numpy')
Out[13]:
In [14]:
%px import numpy
In [15]:
with dview.sync_imports():
import numpy
The later method uses the context manager sync_imports
which simultaneously sets the local and remote imports.
It is also possible for all remote engines to run a centralised script. This could be used for imports, defining application functions, or actual computation.
In [16]:
%%file script.py
def hello_world():
print "Hello World"
hello_world()
In [17]:
dview.run("script.py").display_outputs()
Since the computational engines are all running with IPython, then %magics are supported, as well as inline plotting, for example,
In [19]:
%%px
%pylab inline
z = np.random.random(10)
hist(z);
Accessing and setting data can be done in multiple ways (spotting a theme here?). All variables and objects on each engine can be accessed like a dictionary using the direct view. We are not limited to accessing purely what we have assigned, anything in the global namespace on the remote engine is fair game.
In [20]:
dview['a'] = 5
dview['b'] = 10
print dview['a']
print dview['os.getpid']
Values can be set and retrieved using the more conventional push and pull methods respectively.
In [21]:
dview.push(dict(a=1, b=2))
dview.pull(('a','b'))
Out[21]:
Arrays can be distributed evenly over the engines using the scatter method.
In [22]:
a = np.arange(32)
dview.scatter('a', a)
%px print a
This allows us to compute the result for each array before using the gather method to piece them back together.
In [23]:
%px a = a**2
dview.gather('a')
Out[23]:
Direct views are simple to understand and are extremely useful when we know how to split our computation up efficiently. In some cases however there may be a list of computational tasks with undetermined computational runtime which is where a load balanced view becomes useful. The load balanced view was defined earlier as:
bview = client.load_balanced_view()
Below we have a function which will hopefully illustrate how the view behaves. The function sleeps for a random amount of time before returning the PID of the engine and a random number multiplied by an inputted factor.
In [24]:
def random_number(factor):
import time
import os
import random
r = random.random()
time.sleep(5*r)
return os.getpid(), factor*r
If we apply a function to the load balanced view we receive only one result as the function has been only been executed once.
In [25]:
bview.block=True
bview.apply(random_number,1)
Out[25]:
To pass in multiple inputs (effectively creating a queue of parameters), we can use the map function. By setting bview.block=False
we allow the results to stream in asynchronously.
In [26]:
bview.block = False
res = bview.map(random_number, range(10))
The map instantly returns an AsyncResult object which begins empty and collects the results as they are computed. To check whether the computation is complete we run
In [27]:
res.ready()
Out[27]:
which in this case is False
. After waiting a sufficient amount of time, the complete result becomes available:
In [29]:
print res.ready()
print res.result
If we call result before the computation is complete then the system will wait until it is. Note that there is no particular ordering to the engine used - the scheduler is picking the next available engine to run the function. The load balanced view map is also iterable which allows for results to be returned as and when they become available.
In [30]:
for result in bview.map(random_number, range(10)):
print result
Now for an interesting application. Since everyone has estimated $\pi$ a thousand times before we'll avoid that example. Instead we'll look at visualising Julia sets, closely following this well written guide to Numpy/SciPy Recipes for Image Processing.
We consider a rational function $f: \mathbb{C} \to \mathbb{C}$. In this example we'll consider the function $f(z) = z^2 + c$ where $c \in \mathbb{C}$. Written in Python this function is:
In [31]:
def f(z, c=-.065+.66j):
return z**2 + c
Note that Python uses the letter j
rather than i
for the complex unit.
We can use this function as an iterative map on the complex plane, i.e we have $z_t = f(z_{t-1}) = \dots = f^t(z_0)$. There are two possibilities which can happen, either $|z_t|$ diverges as $t \to \infty$, or $|z_t|$ remains bounded.
Points where $f^t(z)$ diverge are called the escape set of $f$, and points which do not diverge are called the prisoner set of $f$. The Julia set of $f$ is then defined to be the boundary of the prisoner set, i.e,
$$J(f) = \partial \left\{ z \in \mathbb{C} \left\vert \frac{}{}\right . \lim_{t \to \infty} |f^t(z)|< \infty \right\} $$The interesting fact about this dynamical system is that it behaves chaotically for almost all $c$, that is to say that two nearby points $z_0$ and $z_0 + \epsilon$ will likely yield very different trajectories in the large $t$ limit.
To begin with we define the section of the complex plane we want to explore (zmin
and zmax
) as well as our resolution (m
and n
). The resolution defines how many inital conditions we are going to sample, which in this case is $mn=1024^2$.
In [32]:
m=1024
n=1024
zmin = -1.3 - 1j * 1.3
zmax = 1.3 + 1j * 1.3
xs = np.linspace(zmin.real, zmax.real, n)
ys = np.linspace(zmin.imag, zmax.imag, m)
X, Y = np.meshgrid(xs, ys)
Z = X + 1j * Y
Using the meshgrid
function we can easily create an array of complex numbers.
In [33]:
print Z[:2,:2]
In [34]:
def julia(f, Z, tmax=256):
J = np.ones(Z.shape) * tmax
for t in xrange(tmax):
mask = np.abs(Z) <= 2.
Z[ mask] = f(Z[mask])
J[-mask] -= 1
return J
The above function carries out this function in a very succinct fashion. Firstly, we initialise an array with our maximum iteration number. We then check our complex plane array to see if any values have a modulus greater than 2. This creates a boolean index array which we can use to select only the elements in our array Z
which satisfy the condition.
Iterating the process tmax
times we then arrive at our array J
which distinguishes between escape times for each initial conditions. We can also run the algorithm in the other direction for the same result. The use of the boolean arrays means that if an iteration of the function escapes the threshold radius we do not ever apply the map again to the function - saving valuable computational time!
Below we first create a copy of Z
since our julia function edits the array in-place and we will want to use the original again later.
In [78]:
ZZ = np.copy(Z)
J = julia(f, ZZ)
In [79]:
figure(figsize=[10,10])
imshow(J)
axis('off');
In parallel we first feed the functions to each engine.
In [80]:
%%px
import numpy as np
def f(z, c=-.065+.66j):
return z**2 + c
def julia_parallel(f,Z,tmax=256):
J = np.ones(Z.shape) * tmax
for t in xrange(tmax):
mask = np.abs(Z) <= 2.
Z[mask] = f(Z[mask])
J[-mask] -= 1
return J
We can then scatter/distribute the array amoungst the different engines. Each engine runs the same number of loops on the same sized array so in theory the load should be evenly distributed. In reality the size of the masks may vary so one would expect some deviation in the workloads.
In [81]:
dview.scatter('Z', Z)
%px ZZ = np.copy(Z)
%px J = julia_parallel(f,ZZ)
PJ = dview.gather('J')
Piecing together all the parts we arrive at the same fractal as before.
In [82]:
figure(figsize=[10,10])
imshow(PJ)
axis('off');
So did we speed it up?
In [83]:
Z = X + 1j * Y
In [84]:
%%time
J = julia(f, Z)
In [85]:
Z = X + 1j * Y
In [86]:
%%time
dview.scatter('Z', Z)
%px ZZ = np.copy(Z)
%px J = julia_parallel(f,ZZ)
PJ = dview.gather('J')
So we see that the computational time has indeed been reduced by a factor of the number of engines we chose at the start (plus some overhead). For this small example the overhead is relatively large, however if we wanted to double or triple the resolution then this overhead would become insignificant.
Of course no speed test is complete without checking that the answers agree,
In [87]:
np.allclose(J,PJ)
Out[87]:
which thankfully they do!
My first introduction to parallel computing was a bit of a trial by fire, creating a single script to run on multiple cores, being careful with master/slave logic, and making sure the right result came out of the other end. By contrast, the IPython parallel tools seem intuitive and easy to use and it is quick and easy to get something workable. For areas such as finance or agent-based modelling which require the use of lots of Monte Carlo simulation this is a god-send for early exploration as well as churning out larger results.
For larger computations one would naturally move out of the notebook environment for a speed up. It is also possible to use MPI with IPython for applications which require a large number of engines or require a substantial number/volume of messages to passed between engines.
Fractals Christian Bauckhage
Parallel IPython Notebook 1 Jake Vanderplas
Parallel IPython Notebook 2 Liang Bo Wang
Using MPI with IPython Documentation