Parallel Computing with IPython

Today we'll look at performing parallel computations with IPython. Much of this material will draw from resources available online, including

Parallel computing is generally very difficult, and IPython provides primitives for a number of scenarios, including:

  • Single program, multiple data (SPMD) parallelism
  • Multiple program, multiple data (MPMD) parallelism
  • Task farming
  • Data parallel computation
  • Coordination of distributed processes
  • Combinations of these approaches
  • Custom user defined approaches

In addition, IPython parallel includes tools for executing parallel jobs interactively (this is the "I" in "IPython").

First... Why IPython Parallel?

One common complaint levied against IPython is that it's too bloated. Why does a project which started as an enhanced Python interpreter include Parallel tools? You can hear the answer from IPython's creators in This Scipy 2012 Keynote. In it, Fernando Perez says IPython is

"A tool for managing the entire lifecycle of a scientific idea"

This life-cycle includes the following:

  • Individual exploratory work
  • Collaborative development
  • Production work (HPC, cloud, parallel computing)
  • Publication
  • Education

Within this context, you can see how the IPython interpreter, parallel tools, and the notebook all fit the vision.

Additionally, it is a natural fit because the underlying message-passing architecture necessary for the IPython notebook is also a natural platform for parallel computing.

IPython Parallel Architecture

The IPython architecture consists of four components:

  • The IPython engine
  • The IPython hub
  • The IPython schedulers
  • The cluster client

These components live in the IPython.parallel package and are installed with IPython.

IPython engine

The IPython engine is a Python kernel which takes commands over a network connection. There is one engine per core.

An important property of an IPython engine is that it blocks while user code is being executed, which allows asynchronous execution.

IPython controller

The IPython controller provides an interface for working with a set of engines.

At a general level, the controller is a collection of processes to which IPython engines and clients can connect. The controller is composed of a Hub and a collection of Schedulers, which may be in processes or threads.

The controller provides a single point of contact for users who wish to utilize the engines in the cluster. There is a variety of different ways of working with a controller, but all of these models are implemented via the View.apply method, after constructing View objects to represent different collections engines. The two primary models for interacting with engines are:

  • A Direct interface, where engines are addressed explicitly.
  • A LoadBalanced interface, where the Scheduler is trusted with assigning work to appropriate engines.

Advanced users can readily extend the View models to enable other styles of parallelism.

The Hub

The center of an IPython cluster is the Hub. The Hub can be viewed as an über-logger, which keeps track of engine connections, schedulers, clients, as well as persist all task requests and results in a database for later use.

Schedulers

All actions that can be performed on the engine go through a Scheduler. While the engines themselves block when user code is run, the schedulers hide that from the user to provide a fully asynchronous interface to a set of engines. Each Scheduler is a small GIL-less function in C provided by pyzmq (the Python load-balanced scheduler being an exception).

Getting Started

The first thing required for using IPython Parallel is to start the cluster. By default, this will start a local cluster on your own computer, utilizing your CPU's multiple cores if available.

There are two ways to do this.

From the command-line

On the command-line, you can run the following command to start an IPython cluster:

[~]$ ipcluster start -n 4

will start a cluster with 4 cores (probably what you have available on a modern laptop). To use as many cores as are available, leave out the n:

[~]$ ipcluster start

This spins-up a Python kernel on each core, along with a hub that connects to them.

From the notebook interface

Within the IPython notebook, you can use the Clusters tab of the dashboard, and press Start with the desired number of cores, under the desired profile (more on profiles later). This will automatically run the correct commands to start your IPython cluster.

Accessing the Clients

To make sure everything is working, let's run a simple example. We'll use the IPython.parallel submodule to get a view of the clients, and print the client ids:


In [1]:
from IPython import parallel
clients = parallel.Client()
clients.block = True  # use synchronous computations
print clients.ids


[0, 1, 2, 3]

Now we can use this object to run some code. Let's define a simple function that we will execute on the cores:


In [2]:
def mul(a, b):
    return a * b

mul(5, 6)


Out[2]:
30

Now let's execute mul on the first engine:


In [3]:
clients[0].apply(mul, 5, 6)


Out[3]:
30

Notice that the function becomes the first argument of apply, and any additional arguments or keyword arguments are passed after it.

We can similarly execute mul in parallel on all the engines at once:


In [4]:
clients[:].apply(mul, 5, 6)


Out[4]:
[30, 30, 30, 30]

But say we want to pass different arguments to each instance of mul. In normal Python, we could do this with the map() function:


In [5]:
map(mul, [5, 6, 7, 8], [8, 9, 10, 11])


Out[5]:
[40, 54, 70, 88]

The client views also have a map function which does the execution in parallel. Let's use a load-balanced view so that the IPython scheduler takes care of farming the tasks for us:


In [6]:
view = clients.load_balanced_view()
view.map(mul, [5, 6, 7, 8], [8, 9, 10, 11])


Out[6]:
[40, 54, 70, 88]

This is the basic interface to parallel computation in IPython!

Basic Concepts

The Client

The client is a lightweight handle on all the engines of a cluster:


In [7]:
clients = parallel.Client(profile='default')
print len(clients)


4

Views

Views provide the fundamental ways of accessing the clients. There are two types of views:

Direct View

A direct view allows direct execution of a command on all the engines:


In [8]:
clients.block = True
dview = clients.direct_view()
dview.block = True
print dview


<DirectView all>

In [9]:
dview.apply(sum, [1, 2, 3])


Out[9]:
[6, 6, 6, 6]

You can also obtain a Direct view by slicing the Clients object:


In [10]:
clients[::2]


Out[10]:
<DirectView [0, 2]>

In [11]:
clients[::2].apply(sum, [1, 2, 3])


Out[11]:
[6, 6]

Load Balanced View

A load balanced view allows execution of a command on any one engine. Which engine is used is up to the scheduler:


In [12]:
lview = clients.load_balanced_view()
print lview


<LoadBalancedView None>

In [13]:
lview.apply(sum, [1, 2, 3])


Out[13]:
6

Introspection: looking at the Process ID

One way to introspect what's going on is to get the Process ID of the engines. This gives a unique label of any running process on your computer. If you have a cluster running on multiple computers, you can also print the hostname to see which host each cluster is running on:


In [14]:
import os
import socket
print os.getpid()
print socket.gethostname()


9146
jakesmac

In [15]:
dview.apply(os.getpid)


Out[15]:
[9142, 9143, 9144, 9145]

In [16]:
for i in range(5):
    print lview.apply(os.getpid)


9145
9144
9142
9143
9145

Notice that the load-balanced view chooses engines somewhat randomly (it's not actually random, but is based on a process which is opaque to the user).

Working with Direct Views:

The direct view is an interface in which each engine is directly exposed to the user.

The direct view is a very flexible wrapper of the client. Let's look at a few ways of managing remote execution and remote data with direct views. We'll look at the following:

  • Blocking vs. non-blocking execution
  • Ways to manage remote execution
  • Ways to manage & access remote data

In [17]:
clients = parallel.Client()
dview = clients[:]

DirectView: blocking vs. non-blocking

In a DirectView interface, we can either use Blocking (synchronous) execution, in which all results must finish computing before any results are recorded, or non-blocking (asynchronous) execution, where we receive results as they finish.

Here's an example of the two:


In [18]:
def get_pid_slowly():
    # imports in the function, otherwise these imports
    # don't happen on the engines
    import os
    import time
    import random
    
    # sleep up to 10 seconds
    time.sleep(10 * random.random())
    return os.getpid()

In [19]:
dview.block = True
dview.apply(get_pid_slowly)


Out[19]:
[9142, 9143, 9144, 9145]

Notice that all three results come back at the same time, after all are finished computing.

Let's now see what happens if we use non-blocking execution:


In [20]:
dview.block = False
dview.apply(get_pid_slowly)


Out[20]:
<AsyncResult: get_pid_slowly>

What is returned is an AsyncResult object.


In [21]:
res = dview.apply(get_pid_slowly)

In [22]:
res.ready()


Out[22]:
False

We can ask if the result is ready using the ready() command. Once it's ready, we can get the result with the get() command


In [25]:
res.ready()


Out[25]:
True

In [26]:
res.get()


Out[26]:
[9142, 9143, 9144, 9145]

For convenience, you can also use the apply_sync and apply_async commands


In [27]:
dview.apply_sync(get_pid_slowly)


Out[27]:
[9142, 9143, 9144, 9145]

In [28]:
dview.apply_async(get_pid_slowly)


Out[28]:
<AsyncResult: get_pid_slowly>

DirectView: Remote Imports

You might notice that you get errors if you've not imported certain modules on the engines. For example:


In [36]:
import numpy
def normalize_vector(v):
    return numpy.asarray(v) / numpy.linalg.norm(v)

In [37]:
x = numpy.random.random(5)
normalize_vector(x)


Out[37]:
array([ 0.23527065,  0.60162073,  0.56038515,  0.32331687,  0.40513565])

In [38]:
dview.block = True
dview.apply(normalize_vector, x)


[0:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)<string> in <module>()
<ipython-input-36-bb245818ec0d> in normalize_vector(v)
NameError: global name 'numpy' is not defined

[1:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)<string> in <module>()
<ipython-input-36-bb245818ec0d> in normalize_vector(v)
NameError: global name 'numpy' is not defined

[2:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)<string> in <module>()
<ipython-input-36-bb245818ec0d> in normalize_vector(v)
NameError: global name 'numpy' is not defined

[3:apply]: 
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)<string> in <module>()
<ipython-input-36-bb245818ec0d> in normalize_vector(v)
NameError: global name 'numpy' is not defined

Though we've imported numpy on our local engine, we haven't imported numpy on any of our other engines, so the code won't work.

There are a few ways to execute imports on other engines. We can do this directly with the execute method:


In [39]:
dview.execute('import numpy')


Out[39]:
<AsyncResult: finished>

Alternatively, we can use the sync_imports context manager to make this happen:


In [40]:
with dview.sync_imports():
    import numpy


importing numpy on engine(s)

Or, we can use the px (parallel execute) magic function to execute the contents of the cell on every engine:


In [41]:
%px import numpy

After doing any of these, we're able to run our function and see the results:


In [42]:
dview.apply(normalize_vector, x)


Out[42]:
[array([ 0.23527065,  0.60162073,  0.56038515,  0.32331687,  0.40513565]),
 array([ 0.23527065,  0.60162073,  0.56038515,  0.32331687,  0.40513565]),
 array([ 0.23527065,  0.60162073,  0.56038515,  0.32331687,  0.40513565]),
 array([ 0.23527065,  0.60162073,  0.56038515,  0.32331687,  0.40513565])]

DirectView: running scripts remotely

We can use a direct view to run a given script on each client. For example, let's look at the following script:


In [43]:
%%file myscript.py
import numpy

a = 5

def square(x):
    return x * x


Overwriting myscript.py

Now we can run this script on each of our clients:


In [44]:
dview.run("myscript.py")


Out[44]:
<AsyncResult: finished>

To look at the data, let's use the dictionary interface:


In [45]:
print dview['a']


[5, 5, 5, 5]

In [46]:
print dview['square']


[<function square at 0x1034a5e60>, <function square at 0x1034a5ed8>, <function square at 0x1034bb050>, <function square at 0x1034bb0c8>]

We can also use the dictionary interface to assign data:


In [47]:
dview['b'] = 4
print dview['b']


[4, 4, 4, 4]

Now the variables a and b are defined globally on all our engines.

DirectView: execute() method

Code can also be executed directly on the engine by passing strings to the engines. First, let's look at the definitions of a and b on our engines:


In [48]:
print dview['a']
print dview['b']


[5, 5, 5, 5]
[4, 4, 4, 4]

Now we'll use execute() to add a and b and save the result to c:


In [49]:
dview.execute('c=a+b')


Out[49]:
<AsyncResult: finished>

In [50]:
dview['c']


Out[50]:
[9, 9, 9, 9]

We can also execute code on any subset of the engines:


In [51]:
clients[:2].execute('c=a*b')


Out[51]:
<AsyncResult: execute>

In [52]:
dview['c']


Out[52]:
[20, 20, 9, 9]

This is the basic way to push and pull data to and from the client and engines.

Parallel Magics

We saw this above, but one nice way to interface to the clients is through the %px magic functions. This works only within the IPython notebook, but is a nice shortcut for simple parallel execution. Basically, anything after the %px on a line is executed on every engine:


In [53]:
%px c = numpy.random.random(3)
%px print c


[stdout:0] [ 0.50721809  0.17834299  0.03162916]
[stdout:1] [ 0.08939572  0.08664146  0.27040773]
[stdout:2] [ 0.00414367  0.43595796  0.17353835]
[stdout:3] [ 0.6570126   0.06122939  0.16020218]

If you want to execute multiple lines at once, you can use the %%px cell magic to do this:


In [54]:
%%px
c = numpy.random.random(5)
c *= a
print c


[stdout:0] [ 0.15017173  4.73927557  0.47193208  3.18212826  0.18068899]
[stdout:1] [ 3.76819739  1.02773891  0.2910802   2.24805272  3.1696827 ]
[stdout:2] [ 3.29312298  3.98483227  1.36147721  3.74585018  0.16350398]
[stdout:3] [ 3.0226832   1.80901736  4.92593167  3.89666551  4.50883695]

Again, we can access the variables directly using the dictionary interface:


In [55]:
dview['c']


Out[55]:
[array([ 0.15017173,  4.73927557,  0.47193208,  3.18212826,  0.18068899]),
 array([ 3.76819739,  1.02773891,  0.2910802 ,  2.24805272,  3.1696827 ]),
 array([ 3.29312298,  3.98483227,  1.36147721,  3.74585018,  0.16350398]),
 array([ 3.0226832 ,  1.80901736,  4.92593167,  3.89666551,  4.50883695])]

DirectView: Scatter and Gather

One very useful way to push and pull data is using the scatter() and gather() commands. This will take an array and partition it among the parallel instances:


In [56]:
dview.scatter('a', np.arange(24))
dview['a']


Out[56]:
[array([0, 1, 2, 3, 4, 5]),
 array([ 6,  7,  8,  9, 10, 11]),
 array([12, 13, 14, 15, 16, 17]),
 array([18, 19, 20, 21, 22, 23])]

Notice that the contents of a are partitioned and scattered among the instances. To bring them back, we can use gather():


In [57]:
a = dview.gather('a')
a


Out[57]:
array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16,
       17, 18, 19, 20, 21, 22, 23])

So, for example, you might do an operation something like this:


In [58]:
x = np.random.random(400)
dview.scatter('x', x)
dview.execute('y = numpy.sqrt(x)')
y = dview.gather('y')

print np.allclose(y, np.sqrt(x))


True

This is a somewhat trivial computation, but if you had a larger computation that could be done on partitions of the data, it would allow you to speed your calculation according to the number of cores available.

Example: distributed matrix multiplication

Let's write a simple distributed matrix multiplication function.

First, let's look at how scatter works with matrices:


In [59]:
A = np.random.random((8, 3))
dview.scatter('A', A)
print A
print
for chunk in dview['A']:
    print chunk


[[ 0.38270801  0.93567854  0.00724878]
 [ 0.1043931   0.88228169  0.89202187]
 [ 0.41265125  0.37364223  0.18220115]
 [ 0.73935114  0.60215577  0.66854311]
 [ 0.52294005  0.52325057  0.58609731]
 [ 0.92111696  0.9419077   0.44157558]
 [ 0.35245913  0.9811961   0.47649591]
 [ 0.01952694  0.05868491  0.57032717]]

[[ 0.38270801  0.93567854  0.00724878]
 [ 0.1043931   0.88228169  0.89202187]]
[[ 0.41265125  0.37364223  0.18220115]
 [ 0.73935114  0.60215577  0.66854311]]
[[ 0.52294005  0.52325057  0.58609731]
 [ 0.92111696  0.9419077   0.44157558]]
[[ 0.35245913  0.9811961   0.47649591]
 [ 0.01952694  0.05868491  0.57032717]]

So scattering a matrix breaks it up into blocks... we can perform a matrix multiplication on each of these blocks, and gather the individual results to get the final result!


In [60]:
def parallel_dot(dview, A, B):
    dview.scatter('A', A)
    dview['B'] = B
    dview.execute('C = numpy.dot(A, B)')
    return dview.gather('C')

A = np.random.random((10, 3))
B = np.random.random((3, 5))

np.allclose(parallel_dot(dview, A, B),
            np.dot(A, B))


Out[60]:
True

Working with Load-balanced Views

Direct views are a nice low-level interface to the clients, but for most tasks it can be more helpful to use Load-balanced views.

A load-balanced view is a fault-tolerant way to distribute tasks among the engines. While there is no direct access to individual engines, it provides a simple and powerful interface.


In [61]:
lview = clients.load_balanced_view()

In [62]:
def sleep_and_return(factor=10):
    import time
    import random
    r = factor * random.random()
    time.sleep(r)
    return r

In [63]:
lview.block = True
lview.apply(sleep_and_return, 1)


Out[63]:
0.383964253758315

Using apply on a load balanced view simply executes the function once.

We can use the map function to execute multiple inputs. We'll use block=False to make sure the results come asynchronously:


In [67]:
lview.block = False
res = lview.map(sleep_and_return, range(10))

In [68]:
res.ready()


Out[68]:
False

The progress attribute tells us how many of the jobs have been completed up to this point:


In [69]:
import time
while not res.ready():
    time.sleep(1)
    print res.progress


6
7
7
7
8
10

Once all the results are finished, we can use the result attribute to get at them:


In [70]:
res.result


Out[70]:
[0.0,
 0.27259788387004413,
 1.6492597432569556,
 0.16940516700627017,
 2.423163349769624,
 1.3291874648520896,
 0.676810918286002,
 5.4501484144834675,
 4.003957788350984,
 4.828199829606783]

The load-balanced view also includes an iterator which will return results as they come in:


In [71]:
for result in lview.map(sleep_and_return, range(10)):
    print result


0.0
0.45237777481
1.09647522681
1.61237816742
2.05581658346
4.25450122799
0.0663860312067
5.71882736737
2.49521158545
7.23768335018

Exercise: Monte Carlo $\pi$

Here you'll do a quick exercise where you estimate $\pi$ using a rudimentary Monte Carlo method. Imagine you have a 2x2 square dart board and you randomly throw darts at it.


In [72]:
%pylab inline
x, y = 2 * np.random.random((2, 500)) - 1

ax = plt.axes(aspect='equal', xticks=[], yticks=[], frameon=False)
ax.scatter(x, y, c=(x ** 2 + y ** 2 < 1), cmap=plt.cm.binary)
ax.add_patch(mpl.patches.Circle((0, 0), 1, zorder=0, fc='#FFAAAA'))


Populating the interactive namespace from numpy and matplotlib
Out[72]:
<matplotlib.patches.Circle at 0x1063308d0>

The probability that those darts will fall within 1 unit of the center is

$$ p = \frac{\rm Area~of~circle}{Area~of~square} = \frac{\pi\cdot 1^2}{2^2} = \frac{\pi}{4} $$

So if you throw $N$ darts, you can estimate $\pi$ to be equal to $$ \pi = 4\frac{N_{in}}{N} $$

Here we'll write a function to estimate $\pi$ using a random number generator:


In [73]:
def estimate_pi(N):
    r = np.random.random((N, 2))
    inside = np.sum(r ** 2, 1) <= 1
    return 4 * inside.sum() * 1. / N

In [74]:
estimate_pi(1E6)


Out[74]:
3.1420720000000002

Write a function which uses a Load-balanced View to parallelize this estimation of pi:


In [ ]:
def estimate_pi(N, lview, N_per_trial=1E6):
    # your code here:
    # distribute the N trials across the load-balanced view,
    # using lview.map() and return the average of the results

Note that this is certainly not the best way to estimate $\pi$, but it shows how you can use IPython parallel to do distributed Monte Carlo simulations, which can be very useful in practice.

Homework

For the homework today, we'll look at two Astronomy applications. You may choose one (or both, if you want more practice) to turn in. In both of them, we'll return to a problem that we've seen before, but extend our computation using IPython parallel.

As usual, you can turn-in your results via GitHub: use either an IPython notebook, or a stand-alone .py file.

1. Cross-validation of photometric redshifts

In the Scikit-learn homework a few weeks ago, we looked at different regression schemes for computing photometric redshifts (see Exercise 2). Return to this, and determine the minimum rms you can achieve given combinations of max_depth and n_estimators in the random forest regressor

Here's how we'll download and prepare the data:


In [ ]:
from astroML.datasets import fetch_sdss_specgals
import numpy as np
data = fetch_sdss_specgals()

# put magnitudes in a matrix
X = np.vstack([data['modelMag_%s' % f] for f in 'ugriz']).T
y = data['z']

# down-sample the data for speed
X = X[::10]
y = y[::10]

Here's an example of using cross-validation to compute the r2 score for a given C:


In [ ]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.cross_validation import cross_val_score, StratifiedKFold

# define the model with the given value of C
model = RandomForestRegressor(n_estimators=10, max_depth=3)

# compute the scores via cross-validation
scores = cross_val_score(model, X, y,
                         scoring='r2', cv=3)

# print the mean of the cross-validation scores
score = np.mean(scores)
print score

Write a script that uses a parallel computation to explore what the best value of max_depth and n_estimators is. What's the maximal score you can obtain?

2. Computing Periods of Stars

In the lecture on optimization, we looked at computing the periods of LINEAR stars using the lomb-scargle algorithm.

Write a script which uses IPython parallel to compute the periods of ~1000 stars from the LINEAR dataset. Plot the log of the period you found vs the g-i color of the star. You can compare your results to this paper, though the Lomb-Scargle period-finder might lead to slightly different results.

Here's how to get the data:


In [ ]:
from astroML.datasets import fetch_LINEAR_sample
lcs = fetch_LINEAR_sample()

id_ = lcs.ids[0]
time, flux, dflux = lcs[id_].T

And here's how to compute the lomb-scargle periodogram:


In [ ]:
from astroML.time_series import lomb_scargle
periods = np.logspace(-2, 0, 10000)
periodogram = lomb_scargle(time, flux, dflux, omega=2 * np.pi / periods, generalized=True)

Now we find the maximum period:


In [ ]:
idx = np.argsort(periodogram)
print periods[idx[-1]]

And we get the $g-r$ color:


In [ ]:
g_r = lcs.get_target_parameter(id_, 'gr')
print g_r

You'll have to automate the task of computing the best period, and use a LoadBalancedView to distribute this across as many cores as you have available.