Today we'll look at performing parallel computations with IPython. Much of this material will draw from resources available online, including
Parallel computing is generally very difficult, and IPython provides primitives for a number of scenarios, including:
In addition, IPython parallel includes tools for executing parallel jobs interactively (this is the "I" in "IPython").
One common complaint levied against IPython is that it's too bloated. Why does a project which started as an enhanced Python interpreter include Parallel tools? You can hear the answer from IPython's creators in This Scipy 2012 Keynote. In it, Fernando Perez says IPython is
"A tool for managing the entire lifecycle of a scientific idea"
This life-cycle includes the following:
Within this context, you can see how the IPython interpreter, parallel tools, and the notebook all fit the vision.
Additionally, it is a natural fit because the underlying message-passing architecture necessary for the IPython notebook is also a natural platform for parallel computing.
The IPython engine is a Python kernel which takes commands over a network connection. There is one engine per core.
An important property of an IPython engine is that it blocks while user code is being executed, which allows asynchronous execution.
The IPython controller provides an interface for working with a set of engines.
At a general level, the controller is a collection of
processes to which IPython engines and clients can connect. The
controller is composed of a Hub
and a collection of
Schedulers
, which may be in processes or threads.
The controller provides a single point of contact for users who
wish to utilize the engines in the cluster. There is a variety of
different ways of working with a controller, but all of these
models are implemented via the View.apply
method, after
constructing View
objects to represent different collections engines.
The two primary models for interacting with engines are:
Advanced users can readily extend the View models to enable other styles of parallelism.
The center of an IPython cluster is the Hub. The Hub can be viewed as an über-logger, which keeps track of engine connections, schedulers, clients, as well as persist all task requests and results in a database for later use.
All actions that can be performed on the engine go through a Scheduler. While the engines themselves block when user code is run, the schedulers hide that from the user to provide a fully asynchronous interface to a set of engines. Each Scheduler is a small GIL-less function in C provided by pyzmq (the Python load-balanced scheduler being an exception).
The first thing required for using IPython Parallel is to start the cluster. By default, this will start a local cluster on your own computer, utilizing your CPU's multiple cores if available.
There are two ways to do this.
On the command-line, you can run the following command to start an IPython cluster:
[~]$ ipcluster start -n 4
will start a cluster with 4 cores (probably what you have available on a modern laptop). To use as many cores as are available, leave out the n
:
[~]$ ipcluster start
This spins-up a Python kernel on each core, along with a hub that connects to them.
Within the IPython notebook, you can use the Clusters tab of the dashboard, and press Start with the desired number of cores, under the desired profile (more on profiles later). This will automatically run the correct commands to start your IPython cluster.
In [2]:
from IPython import parallel
clients = parallel.Client()
clients.block = True # use synchronous computations
print(clients.ids)
Now we can use this object to run some code. Let's define a simple function that we will execute on the cores:
In [3]:
def mul(a, b):
return a * b
mul(5, 6)
Out[3]:
Now let's execute mul
on the first engine:
In [4]:
clients[0].apply(mul, 5, 6)
Out[4]:
Notice that the function becomes the first argument of apply
, and any
additional arguments or keyword arguments are passed after it.
We can similarly execute mul
in parallel on all the engines at once:
In [5]:
clients[:].apply(mul, 5, 6)
Out[5]:
But say we want to pass different arguments to each instance of mul
. In normal Python, we could do this with the map()
function:
In [6]:
%timeit map(mul, [5, 6, 7, 8], [8, 9, 10, 11])
The client views also have a map function which does the execution in parallel. Let's use a load-balanced view so that the IPython scheduler takes care of farming the tasks for us:
In [7]:
view = clients.load_balanced_view()
%timeit view.map(mul, [5, 6, 7, 8], [8, 9, 10, 11])
This is the basic interface to parallel computation in IPython!
In [8]:
clients = parallel.Client(profile='default')
print(len(clients))
In [9]:
clients.block = True
dview = clients.direct_view()
dview.block = True
print(dview)
In [10]:
dview.apply(sum, [1, 2, 3])
Out[10]:
You can also obtain a Direct view by slicing the Clients
object:
In [11]:
clients[::2]
Out[11]:
In [12]:
clients[::2].apply(sum, [1, 2, 3])
Out[12]:
In [13]:
lview = clients.load_balanced_view()
print(lview)
In [14]:
lview.apply(sum, [1, 2, 3])
Out[14]:
One way to introspect what's going on is to get the Process ID of the engines. This gives a unique label of any running process on your computer. If you have a cluster running on multiple computers, you can also print the hostname to see which host each cluster is running on:
In [15]:
import os
import socket
print(os.getpid())
print(socket.gethostname())
In [16]:
dview.apply(os.getpid)
Out[16]:
In [17]:
for i in range(10):
print(lview.apply(os.getpid))
Notice that the load-balanced view chooses engines somewhat randomly (it's not actually random, but is based on a process which is opaque to the user).
The direct view is an interface in which each engine is directly exposed to the user.
The direct view is a very flexible wrapper of the client. Let's look at a few ways of managing remote execution and remote data with direct views. We'll look at the following:
In [18]:
clients = parallel.Client()
dview = clients[:]
In a DirectView interface, we can either use Blocking (synchronous) execution, in which all results must finish computing before any results are recorded, or non-blocking (asynchronous) execution, where we receive results as they finish.
Here's an example of the two:
In [19]:
def get_pid_slowly():
# imports specified within the function definition,
# otherwise these packages will not be available on
# the engines (see below for a better way to approach this)
import os
import time
import random
# sleep up to 10 seconds
time.sleep(10 * random.random())
return os.getpid()
In [20]:
dview.block = True
dview.apply(get_pid_slowly)
Out[20]:
Notice that all three results come back at the same time, after all are finished computing.
Let's now see what happens if we use non-blocking execution:
In [21]:
dview.block = False
dview.apply(get_pid_slowly)
Out[21]:
What is returned is an AsyncResult
object.
In [22]:
res = dview.apply(get_pid_slowly)
In [23]:
res.ready()
Out[23]:
We can ask if the result is ready using the ready()
command. Once it's ready, we can get the result with the get()
command
In [24]:
res.ready()
Out[24]:
In [25]:
res.get()
Out[25]:
For convenience, you can also use the apply_sync
and apply_async
commands
In [26]:
dview.apply_sync(get_pid_slowly)
Out[26]:
In [27]:
dview.apply_async(get_pid_slowly)
Out[27]:
In [28]:
import numpy
def normalize_vector(v):
return numpy.asarray(v) / numpy.linalg.norm(v)
In [29]:
x = numpy.random.random(5)
normalize_vector(x)
Out[29]:
In [30]:
dview.block = True
dview.apply(normalize_vector, x)
Though we've imported numpy
on our local engine, we haven't imported numpy
on any of our other engines, so the code won't work.
There are a few ways to execute imports on other engines. We can do this directly with the execute
method:
In [31]:
dview.execute('import numpy')
Out[31]:
Alternatively, we can use the sync_imports
context manager to make this happen:
In [32]:
with dview.sync_imports():
import numpy
Or, we can use the px
(parallel execute) magic function to execute the contents of the cell on every engine:
In [33]:
%px import numpy
After doing any one of these, we're able to run our function and see the results:
In [34]:
dview.apply(normalize_vector, x)
Out[34]:
In [35]:
%%file myscript.py
import numpy
a = 5
def square(x):
return x * x
Now we can run this script on each of our clients:
In [36]:
dview.run("myscript.py")
Out[36]:
To look at the data, let's use the dictionary interface:
In [37]:
print(dview['a'])
In [38]:
print(dview['square'])
We can also use the dictionary interface to assign data:
In [39]:
dview['b'] = 4
print(dview['b'])
Now the variables a
and b
are defined globally on all our engines.
In [40]:
print(dview['a'])
print(dview['b'])
Now we'll use execute()
to add a
and b
and save the result to c
:
In [41]:
dview.execute('c=a+b')
Out[41]:
In [42]:
dview['c']
Out[42]:
We can also execute code on any subset of the engines:
In [43]:
clients[:2].execute('c=a*b')
Out[43]:
In [44]:
dview['c']
Out[44]:
Note that c
has only changed on the two engines we specified!
This is the basic way to push and pull data to and from the client and engines.
In [45]:
%px c = numpy.random.random(3)
%px print(c)
If you want to execute multiple lines at once, you can use the %%px
cell magic to do this:
In [46]:
%%px
c = numpy.random.random(5)
c *= a
print(c)
Again, we can access the variables directly using the dictionary interface:
In [47]:
dview['c']
Out[47]:
In [48]:
import numpy as np
dview.scatter('a', np.arange(24))
dview['a']
Out[48]:
Notice that the contents of a
are partitioned and scattered among the instances. To bring them back, we can use gather()
:
In [49]:
a = dview.gather('a')
a
Out[49]:
So, for example, you might do an operation something like this:
In [50]:
x = np.random.random(400)
dview.scatter('x', x)
dview.execute('y = numpy.sqrt(x)')
y = dview.gather('y')
print(np.allclose(y, np.sqrt(x)))
This is a somewhat trivial computation, but if you had a larger computation that could be done on partitions of the data, it would allow you to speed your calculation according to the number of cores available.
Let's write a simple example of doing distributed linear algebra. Note that this will not necessarily be more performant than an in-memory solution, but it is illustrative of the types of operations we can use to parallelize other problems.
We'll start with a simple example of vector-vector multiplication:
In [51]:
v1 = np.random.random(1001)
v2 = np.random.random(1001)
np.dot(v1, v2)
Out[51]:
In [52]:
dview.scatter('v1', v1)
for chunk in dview['v1']:
print chunk.shape
In [53]:
def parallel_dot(dview, v1, v2):
dview.scatter('v1', v1)
dview.scatter('v2', v2)
dview.execute('v1v2 = numpy.dot(v1, v2)')
return sum(dview.gather('v1v2'))
In [54]:
result1 = np.dot(v1, v2)
result2 = parallel_dot(dview, v1, v2)
print(result1)
print(result2)
print(np.allclose(result1, result2))
In [55]:
M1 = np.random.random((500, 1000))
M2 = np.random.random((1000, 50))
result = np.dot(M1, M2)
print(result.shape)
Tips:
What happens when you use dview.scatter
on a matrix? What are the shapes of the resulting chunks?
If you scatter one matrix in this manner, what parts of the other matrix must it interact with?
Once you have an answer, use np.allclose
to compare it to the serial result above.
In [55]:
Direct views are a nice low-level interface to the clients, but for most tasks it can be more helpful to use Load-balanced views.
A load-balanced view is a fault-tolerant way to distribute tasks among the engines. While there is no direct access to individual engines, it provides a simple and powerful interface.
In [56]:
lview = clients.load_balanced_view()
In [57]:
def sleep_and_return(factor=10):
import time
import random
r = factor * random.random()
time.sleep(r)
return r
In [58]:
lview.block = True
lview.apply(sleep_and_return, 1)
Out[58]:
Using apply
on a load balanced view simply executes the function once.
We can use the map
function to execute multiple inputs. We'll use block=False
to make sure the results come asynchronously:
In [59]:
lview.block = False
res = lview.map(sleep_and_return, range(10))
In [60]:
res.ready()
Out[60]:
In [61]:
res.result
Out[61]:
The progress
attribute tells us how many of the jobs have been completed up to this point:
In [62]:
import time
while not res.ready():
time.sleep(1)
print(res.progress)
Once all the results are finished, we can use the result
attribute to get at them:
In [63]:
res.result
Out[63]:
The load-balanced view also includes an iterator which will return results as they come in:
In [64]:
for result in lview.map(sleep_and_return, range(10)):
print(result)
In [66]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
x, y = 2 * np.random.random((2, 500)) - 1
ax = plt.axes(aspect='equal', xticks=[], yticks=[], frameon=False)
ax.scatter(x, y, c=(x ** 2 + y ** 2 < 1), cmap=plt.cm.binary)
ax.add_patch(mpl.patches.Circle((0, 0), 1, zorder=0, fc='#FFAAAA'));
The probability that those darts will fall within 1 unit of the center is
$$ p = \frac{\rm Area~of~circle}{Area~of~square} = \frac{\pi\cdot 1^2}{2^2} = \frac{\pi}{4} $$So if you throw $N$ darts, you can estimate $\pi$ to be equal to $$ \pi = 4\frac{N_{in}}{N} $$
Here we'll write a function to estimate $\pi$ using a random number generator:
In [67]:
def estimate_pi(N):
r = numpy.random.random((N, 2))
inside = numpy.sum(r ** 2, 1) <= 1
return 4 * inside.sum() * 1. / N
In [68]:
estimate_pi(1E6)
Out[68]:
Write a function which uses a Load-balanced View to parallelize this estimation of pi:
Tips:
Use N x N_per_trial
random draws to estimate pi
You can use the above estimate_pi
function in your answer
Do you want a blocking or a non-blocking function call?
In [69]:
def estimate_pi_parallel(N, lview, N_per_trial=1E6):
# your code here:
# distribute the N trials across the load-balanced view,
# using lview.map() and return the average of the results
pass
In [70]:
estimate_pi_parallel(100, lview)
Note that this is certainly not the best way to estimate $\pi$, but it demonstrates a simple example of using IPython parallel to do distributed Monte Carlo simulations, which can be very useful in practice.
The homework assignment involves parallel cross-validation. Cross-validation is a process by which you can evaluate how well a machine learning routine is modeling your data. For a review of cross-validation in scikit-learn, you can read through my Pycon 2014 tutorial on the subject (the full listing of the tutorial is available here).
We'll work with the 20 newsgroups data, a large corpus of text data available for download through the scikit-learn package:
In [71]:
from sklearn.datasets import fetch_20newsgroups
newsgroups = fetch_20newsgroups()
In [72]:
newsgroups.keys()
Out[72]:
In [73]:
len(newsgroups.data)
Out[73]:
In [74]:
newsgroups.target_names
Out[74]:
The scikit-learn documentation has information on how to vectorize this data; for simplicity here we'll use the pre-vectorized version also available in the package (this uses basic TF-IDF vectorization):
In [75]:
from sklearn.datasets import fetch_20newsgroups_vectorized
print(fetch_20newsgroups_vectorized.__doc__)
In [76]:
# remove metadata to prevent over-fitting of these features
data = fetch_20newsgroups_vectorized(remove=('headers', 'footers', 'quotes'))
In [77]:
data.keys()
Out[77]:
In [78]:
data['data']
Out[78]:
Our data is a sparse matrix with 130107 features.
Now that we have this data, here is our question: Given the vectorized data, build a classifier that can predict the category of an unlabeled message.
For an example of several estimators used on this dataset, see the Classification of 20 newsgroups within the scikit-learn documentation.
We'll use the SGDClassifier
, a variant of support vector machines.
Here's a simple SGDClassifier model trained on the full dataset:
In [79]:
from sklearn.naive_bayes import MultinomialNB
clf = MultinomialNB(alpha=0.1)
clf.fit(data['data'], data['target'])
Out[79]:
In [80]:
predicted = clf.predict(data.data)
print(clf.score(data['data'], data['target']))
For this model, we have correctly identified ~85% of the labels.
Let's look at the confusion matrix to visualize which categories the classifier is mixing up:
In [81]:
from sklearn.metrics import confusion_matrix
C = confusion_matrix(data.target, predicted)
fig, ax = plt.subplots(figsize=(8, 8))
plt.imshow(C, interpolation='nearest',
cmap=plt.cm.binary)
plt.xticks(range(20), data.target_names, rotation=90)
plt.yticks(range(20), data.target_names)
plt.xlabel('predicted')
plt.ylabel('true');
If you've done any ML validation before, you'll immediately notice a problem here: we used the same data to both train and evaluate the model. With this setup, there is the potential to over-fit the training data. Instead, we should use cross-validation: basically fitting on part of the data, and evaluating on another part.
We can do this by-hand here:
In [82]:
split = int(0.9 * len(data.target))
clf.fit(data.data[:split], data.target[:split])
clf.score(data.data[split:], data.target[split:])
Out[82]:
Using this validation set, we see that the actual score is closer to ~67%. We might get a better estimate by doing this multiple times, leaving out 1/10 of the data each time. Because this is such a common method to use, scikit-learn has a built-in convenience routine to do this:
In [83]:
from sklearn.cross_validation import cross_val_score
cross_val_score(clf, data.data, data.target, cv=10)
Out[83]:
It's clear that the score is closer to ~68% than the ~85% we saw above.
We can use this cross-validation approach to optimize over the hyperparameters in the problem. For example:
In [84]:
alphas = [1E-4, 1E-3, 1E-2, 1E-1]
results = []
for alpha in alphas:
clf = MultinomialNB(alpha)
results.append(np.mean(cross_val_score(clf, data.data, data.target)))
results
Out[84]:
We find that we get a much better fit for $\alpha = 0.001$: a score of around 75%. This is a simple example of a grid search in one dimension.
Create a function which will do a 1-dimensional grid search in parallel. Think about how you will manage the data (i.e. try to send the data to the nodes just once, rather than over and over)
Use this grid search to create and plot a curve showing accuracy vs. alpha (log-spaced alphas are probably best). What is the best alpha for this problem?
Apply this code to the SGDClassifier using penalty="elasticnet"
(you can dig through this demo to find an example of this). This is a much slower classifier, so the parallel computation will be very important!
Advanced: using a non-blocking load-balanced view, randomize the list of inputs and create a function which chooses randomized values of the hyperparameter and periodically reports the current best accuracy. Once the accuracy is suitable, you can choose Kernel->Interrupt
and use the best model. This is a very useful tool to have in practice.
For a deeper dive into this subject, you can take a look at Olivier Grisel's Parallel ML tutorial (see the notebook listing here). This includes information on Running IPython clusters on Amazon EC2.
(Optional?) Repeat the previous sentiment analysis exercise, but use parallel cross-validation to refine the hyperparameters. If desired, scale this up to EC2 using StarCluster.
That's all folks!
In [1]:
# Define some styles for the notebook:
from IPython.display import HTML
HTML("""
<style>
div.cell, div.text_cell_render{
max-width:750px;
margin-left:auto;
margin-right:auto;
}
.rendered_html
{
font-size: 140%;
}
.rendered_html li
{
line-height: 1.8;
}
.rendered_html h1, h2, h3, h4 {
text-align:center;
font-familly:"Charis SIL", serif;
}
</style>""")
Out[1]: