This notebook was put together by [Jake Vanderplas](http://www.vanderplas.com) for UW's [Astro 599](http://www.astro.washington.edu/users/vanderplas/Astr599_2014/) course. Source and license info is on [GitHub](https://github.com/jakevdp/2014_fall_ASTR599/).
In [1]:
# Make this work with Python 2
from __future__ import print_function
Today we'll look at performing parallel computations with IPython. Much of this material will draw from resources available online, including
Parallel computing is generally very difficult, and IPython provides primitives for a number of scenarios, including:
In addition, IPython parallel includes tools for executing parallel jobs interactively (this is the "I" in "IPython").
One common complaint levied against IPython is that it's too bloated. Why does a project which started as an enhanced Python interpreter include Parallel tools? You can hear the answer from IPython's creators in This Scipy 2012 Keynote. In it, Fernando Perez says IPython is
"A tool for managing the entire lifecycle of a scientific idea"
This life-cycle includes the following:
Within this context, you can see how the IPython interpreter, parallel tools, and the notebook all fit the vision.
Additionally, it is a natural fit because the underlying message-passing architecture necessary for the IPython notebook is also a natural platform for parallel computing.
The IPython engine is a Python kernel which takes commands over a network connection. There is one engine per core.
An important property of an IPython engine is that it blocks while user code is being executed, which allows asynchronous execution.
The IPython controller provides an interface for working with a set of engines.
At a general level, the controller is a collection of
processes to which IPython engines and clients can connect. The
controller is composed of a Hub
and a collection of
Schedulers
, which may be in processes or threads.
The controller provides a single point of contact for users who
wish to utilize the engines in the cluster. There is a variety of
different ways of working with a controller, but all of these
models are implemented via the View.apply
method, after
constructing View
objects to represent different collections engines.
The two primary models for interacting with engines are:
Advanced users can readily extend the View models to enable other styles of parallelism.
The center of an IPython cluster is the Hub. The Hub can be viewed as an über-logger, which keeps track of engine connections, schedulers, clients, as well as persist all task requests and results in a database for later use.
All actions that can be performed on the engine go through a Scheduler. While the engines themselves block when user code is run, the schedulers hide that from the user to provide a fully asynchronous interface to a set of engines. Each Scheduler is a small GIL-less function in C provided by pyzmq (the Python load-balanced scheduler being an exception).
The first thing required for using IPython Parallel is to start the cluster. By default, this will start a local cluster on your own computer, utilizing your CPU's multiple cores if available.
There are two ways to do this.
On the command-line, you can run the following command to start an IPython cluster:
[~]$ ipcluster start -n 4
will start a cluster with 4 cores (probably what you have available on a modern laptop). To use as many cores as are available, leave out the n
:
[~]$ ipcluster start
This spins-up a Python kernel on each core, along with a hub that connects to them.
Within the IPython notebook, you can use the Clusters tab of the dashboard, and press Start with the desired number of cores, under the desired profile (more on profiles later). This will automatically run the correct commands to start your IPython cluster.
In [2]:
from IPython import parallel
clients = parallel.Client()
clients.block = True # use synchronous computations
print(clients.ids)
Now we can use this object to run some code. Let's define a simple function that we will execute on the cores:
In [3]:
def mul(a, b):
return a * b
mul(5, 6)
Out[3]:
Now let's execute mul
on the first engine:
In [4]:
clients[0].apply(mul, 5, 6)
Out[4]:
Notice that the function becomes the first argument of apply
, and any
additional arguments or keyword arguments are passed after it.
We can similarly execute mul
in parallel on all the engines at once:
In [5]:
clients[0:2].apply(mul, 5, 6)
Out[5]:
But say we want to pass different arguments to each instance of mul
. In normal Python, we could do this with the map()
function:
In [6]:
%timeit list(map(mul, [5, 6, 7, 8], [8, 9, 10, 11]))
The client views also have a map function which does the execution in parallel. Let's use a load-balanced view so that the IPython scheduler takes care of farming the tasks for us:
In [7]:
view = clients.load_balanced_view()
%timeit view.map(mul, [5, 6, 7, 8], [8, 9, 10, 11])
This is the basic interface to parallel computation in IPython!
Note that this is pretty slow in this case... that's because the multiplication function is intrinsically much faster than the overhead of sending the info around. If we have a slower function, then we can win with parallelization:
In [8]:
def slow_mul(a, b):
import time
time.sleep(1) # sleep 1 second
return a * b
%timeit list(map(slow_mul, [5, 6, 7, 8], [8, 9, 10, 11]))
%timeit view.map(slow_mul, [5, 6, 7, 8], [8, 9, 10, 11])
In [9]:
clients = parallel.Client(profile='default')
print(len(clients))
In [10]:
clients.block = True
dview = clients.direct_view()
dview.block = True
print(dview)
In [11]:
dview.apply(sum, [1, 2, 3])
Out[11]:
You can also obtain a Direct view by slicing the Clients
object:
In [12]:
clients[::2]
Out[12]:
In [13]:
clients[::2].apply(sum, [1, 2, 3])
Out[13]:
In [14]:
clients[:].apply(sum, [4,5,6])
Out[14]:
In [15]:
lview = clients.load_balanced_view()
print(lview)
In [16]:
lview.apply(sum, [1, 2, 3])
Out[16]:
One way to introspect what's going on is to get the Process ID of the engines. This gives a unique label of any running process on your computer. If you have a cluster running on multiple computers, you can also print the hostname to see which host each cluster is running on:
In [17]:
import os
import socket
print(os.getpid())
print(socket.gethostname())
In [18]:
dview.apply(os.getpid)
Out[18]:
In [19]:
for i in range(10):
print(lview.apply(os.getpid))
Notice that the load-balanced view chooses engines somewhat randomly (it's not actually random, but is based on a process which is opaque to the user).
The direct view is an interface in which each engine is directly exposed to the user.
The direct view is a very flexible wrapper of the client. Let's look at a few ways of managing remote execution and remote data with direct views. We'll look at the following:
In [20]:
clients = parallel.Client()
dview = clients[:]
In a DirectView interface, we can either use Blocking (synchronous) execution, in which all results must finish computing before any results are recorded, or non-blocking (asynchronous) execution, where we receive results as they finish.
Here's an example of the two:
In [21]:
def get_pid_slowly():
# imports specified within the function definition,
# otherwise these packages will not be available on
# the engines (see below for a better way to approach this)
import os
import time
import random
# sleep up to 10 seconds
time.sleep(10 * random.random())
return os.getpid()
In [22]:
dview.block = True
dview.apply(get_pid_slowly)
Out[22]:
Notice that all three results come back at the same time, after all are finished computing.
Let's now see what happens if we use non-blocking execution:
In [23]:
dview.block = False
dview.apply(get_pid_slowly)
Out[23]:
What is returned is an AsyncResult
object.
In [24]:
res = dview.apply(get_pid_slowly)
We can ask if the result is ready using the ready()
command. Once it's ready, we can get the result with the get()
command
In [25]:
res.ready()
Out[25]:
In [26]:
res.get()
Out[26]:
For convenience, you can also use the apply_sync
and apply_async
commands
In [27]:
dview.apply_sync(get_pid_slowly)
Out[27]:
In [28]:
dview.apply_async(get_pid_slowly)
Out[28]:
In [29]:
import numpy
def normalize_vector(v):
return numpy.asarray(v) / numpy.linalg.norm(v)
In [30]:
x = numpy.random.random(5)
normalize_vector(x)
Out[30]:
In [31]:
dview.block = True
dview.apply(normalize_vector, x)
Though we've imported numpy
on our local engine, we haven't imported numpy
on any of our other engines, so the code won't work.
There are a few ways to execute imports on other engines. We can do this directly with the execute
method:
In [32]:
dview.execute('import numpy')
Out[32]:
Alternatively, we can use the sync_imports
context manager to make this happen:
In [33]:
with dview.sync_imports():
import numpy
Or, in the IPython notebook, we can use the px
(parallel execute) magic function to execute the contents of the cell on every engine:
In [34]:
%px import numpy
After doing any one of these, we're able to run our function and see the results:
In [35]:
dview.apply(normalize_vector, x)
Out[35]:
In [36]:
%%file myscript.py
import numpy
a = 5
def square(x):
return x * x
In [37]:
!cat myscript.py
Now we can run this script on each of our clients:
In [38]:
dview.run("myscript.py")
Out[38]:
To look at the data, let's use the dictionary interface:
In [39]:
print(dview['a'])
In [40]:
print(dview['square'])
We can also use the dictionary interface to assign data:
In [41]:
dview['b'] = 4
print(dview['b'])
In [42]:
clients[:2]['b'] = 6
dview['b']
Out[42]:
Now the variables a
and b
are defined globally on all our engines.
In [43]:
print(dview['a'])
print(dview['b'])
Now we'll use execute()
to add a
and b
and save the result to c
:
In [44]:
dview.execute('c=a+b')
Out[44]:
In [45]:
dview['c']
Out[45]:
We can also execute code on any subset of the engines:
In [46]:
clients[1:3].execute('c=a*b')
Out[46]:
In [47]:
dview['c']
Out[47]:
Note that c
has only changed on the two engines we specified!
This is the basic way to push and pull data to and from the client and engines.
In [48]:
%px c = numpy.random.random(3)
%px print(c)
If you want to execute multiple lines at once, you can use the %%px
cell magic to do this:
In [49]:
%%px
c = numpy.random.random(5)
c *= a
print(c)
Again, we can access the variables directly using the dictionary interface:
In [50]:
dview['c']
Out[50]:
In [51]:
import numpy as np
dview.scatter('a', np.arange(23))
dview['a']
Out[51]:
Notice that the contents of a
are partitioned and scattered among the instances. To bring them back, we can use gather()
:
In [52]:
a = dview.gather('a')
a
Out[52]:
So, for example, you might do an operation something like this:
In [53]:
x = np.random.random(400)
dview.scatter('x', x)
dview.execute('y = numpy.sqrt(x)')
y = dview.gather('y')
print(np.allclose(y, np.sqrt(x)))
This is a somewhat trivial computation, but if you had a larger computation that could be done on partitions of the data, it would allow you to speed your calculation according to the number of cores available.
Let's write a simple example of doing distributed linear algebra. Note that this will not necessarily be more performant than an in-memory solution, but it is illustrative of the types of operations we can use to parallelize other problems.
We'll start with a simple example of vector-vector multiplication:
In [54]:
v1 = np.random.random(1001)
v2 = np.random.random(1001)
np.dot(v1, v2)
Out[54]:
In [55]:
dview.scatter('v1', v1)
for chunk in dview['v1']:
print(chunk.shape)
In [56]:
def parallel_dot(dview, v1, v2):
dview.scatter('v1', v1)
dview.scatter('v2', v2)
dview.execute('v1v2 = numpy.dot(v1, v2)')
return sum(dview.gather('v1v2'))
In [57]:
result1 = np.dot(v1, v2)
result2 = parallel_dot(dview, v1, v2)
print(result1)
print(result2)
print("results match:", np.allclose(result1, result2))
In [58]:
M1 = np.random.random((500, 1000))
M2 = np.random.random((1000, 50))
result = np.dot(M1, M2)
print(result.shape)
Tips:
What happens when you use dview.scatter
on a matrix? What are the shapes of the resulting chunks?
If you scatter one matrix in this manner, what parts of the other matrix must it interact with?
Once you have an answer, use np.allclose
to compare it to the serial result above.
In [59]:
def parallel_dot(dview, A, B):
dview.scatter('A', A)
dview['B'] = B
dview.execute('C = numpy.dot(A, B)')
return dview.gather('C')
A = np.random.random((10, 3))
B = np.random.random((3, 5))
result1 = parallel_dot(dview, M1, M2)
result2 = np.dot(M1, M2)
print(np.allclose(result1, result2))
Direct views are a nice low-level interface to the clients, but for most tasks it can be more helpful to use Load-balanced views.
A load-balanced view is a fault-tolerant way to distribute tasks among the engines. While there is no direct access to individual engines, it provides a simple and powerful interface.
In [60]:
lview = clients.load_balanced_view()
In [61]:
def sleep_and_return(factor=10):
import time
import random
r = factor * random.random()
time.sleep(r)
return r
In [62]:
lview.block = True
lview.apply(sleep_and_return, 1)
Out[62]:
Using apply
on a load balanced view simply executes the function once.
We can use the map
function to execute multiple inputs. We'll use block=False
to make sure the results come asynchronously.
Note that there are several ways to get at the results:
The progress
attribute tells us how many of the jobs have been completed up to this point:
In [63]:
import time
lview.block = False
res = lview.map(sleep_and_return, range(10))
while not res.ready():
time.sleep(1)
print(res.progress, flush=True)
In [64]:
res.result
Out[64]:
We can also use Python's iteration syntax to get the results as they come in:
In [65]:
for val in lview.map(sleep_and_return, range(10)):
print(val, flush=True)
Either way, we can use the result
attribute to get at all of the results:
In [66]:
res.result
Out[66]:
In [67]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
x, y = 2 * np.random.random((2, 500)) - 1
ax = plt.axes(aspect='equal', xticks=[], yticks=[], frameon=False)
ax.scatter(x, y, c=(x ** 2 + y ** 2 < 1), cmap=plt.cm.binary)
ax.add_patch(mpl.patches.Circle((0, 0), 1, zorder=0, fc='#FFAAAA'));
The probability that those darts will fall within 1 unit of the center is
$$ p = \frac{\rm Area~of~circle}{Area~of~square} = \frac{\pi\cdot 1^2}{2^2} = \frac{\pi}{4} $$So if you throw $N$ darts, you can estimate $\pi$ to be equal to $$ \pi = 4\frac{N_{in}}{N} $$
Here we'll write a function to estimate $\pi$ using a random number generator:
In [68]:
def estimate_pi(N):
r = numpy.random.random((N, 2))
inside = numpy.sum(r ** 2, 1) <= 1
return 4 * inside.sum() * 1. / N
In [69]:
estimate_pi(1E6)
Out[69]:
Write a function which uses a Load-balanced View to parallelize this estimation of pi:
Tips:
Use N x N_per_trial
random draws to estimate pi
You can use the above estimate_pi
function in your answer
Do you want a blocking or a non-blocking function call?
In [70]:
def estimate_pi_parallel(N, lview, N_per_trial=1E6):
res = lview.map_async(estimate_pi,
N * [N_per_trial])
while not res.ready():
time.sleep(0.5)
print(res.progress)
return numpy.mean(res)
In [71]:
estimate_pi_parallel(100, lview)
Out[71]:
For the homework today, we'll look at two Astronomy applications. You may choose one (or both, if you want more practice) to turn in. In both of them, we'll return to a problem that we've seen before, but extend our computation using IPython parallel.
As usual, you can turn-in your results via GitHub: use either an IPython notebook, or a stand-alone .py
file.
In the Scikit-learn homework a few weeks ago, we looked at different regression schemes for computing photometric redshifts (see Exercise 2). Return to this, and determine the minimum rms you can achieve given combinations of max_depth
and n_estimators
in the random forest regressor
Here's how we'll download and prepare the data:
In [72]:
from astroML.datasets import fetch_sdss_specgals
import numpy as np
data = fetch_sdss_specgals()
# put magnitudes in a matrix
X = np.vstack([data['modelMag_%s' % f] for f in 'ugriz']).T
y = data['z']
# down-sample the data for speed
X = X[::10]
y = y[::10]
Here's an example of using cross-validation to compute the r2
score for a given C
:
In [73]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.cross_validation import cross_val_score, StratifiedKFold
# define the model with the given value of C
model = RandomForestRegressor(n_estimators=10, max_depth=3)
# compute the scores via cross-validation
scores = cross_val_score(model, X, y,
scoring='r2', cv=3)
# print the mean of the cross-validation scores
score = np.mean(scores)
print(score)
Write a script that uses a parallel computation to explore what the best value of max_depth
and n_estimators
is. What's the maximal score you can obtain?
In the lecture on optimization, we looked at computing the periods of LINEAR stars using the lomb-scargle algorithm.
Write a script which uses IPython parallel to compute the periods of ~1000 stars from the LINEAR dataset. Plot the log of the period you found vs the g-r color of the star. You can compare your results to this paper, though the Lomb-Scargle period-finder might lead to slightly different results.
Here's how to get the data:
In [74]:
from astroML.datasets import fetch_LINEAR_sample
lcs = fetch_LINEAR_sample()
id_ = lcs.ids[0]
time, flux, dflux = lcs[id_].T
And here's how to compute the lomb-scargle periodogram:
In [75]:
from astroML.time_series import lomb_scargle
periods = np.logspace(-2, 0, 10000)
periodogram = lomb_scargle(time, flux, dflux, omega=2 * np.pi / periods, generalized=True)
Now we find the maximum period:
In [76]:
idx = np.argsort(periodogram)
print(periods[idx[-1]])
Your task is to compute the best period for all the ids using a parallel function, and then plot the results against the g-r color, which can be found this way:
In [77]:
g_r = [lcs.get_target_parameter(id_, 'gr') for id_ in lcs.ids]
print(g_r[:10])