Notebook created by Martín Villanueva - martin.villanueva@usm.cl
- DI UTFSM - June 2017.
Note: Since this notebook makes use of Ipython Interactive widgets, you have to run the corresponding cell in order to visualize it properly.
In [21]:
import numpy as np
import matplotlib.pyplot as plt
import sys
import memory_profiler
%load_ext memory_profiler
Some general concepts:
First thing you need to know to understand the difference between a process and a thread, is a fact, that processes do not run, threads do.
Process: Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.
Thread: A thread is an entity within a process that can be scheduled for execution. All threads of a process share its virtual address space and system resources. In addition, each thread maintains exception handlers, a scheduling priority, thread local storage, a unique thread identifier, and a set of structures the system will use to save the thread context until it is scheduled. The thread context includes the thread's set of machine registers, the kernel stack, a thread environment block, and a user stack in the address space of the thread's process. Threads can also have their own security context, which can be used for impersonating clients.
From Python Documentation: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython's memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
The GIL is controversial because it prevents multithreaded CPython programs from taking full advantage of multiprocessor systems in certain situations. Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL. Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck.
In [2]:
import numba
import threading
We can completely bypass the GIL of Python with this two libraries: Numba
and Threading
. The first (that we study in a previuos session) allows us to create a just in time compiled function, with a nogil
option that completely avoid any call to the CPython API in order to bypass the GIL. The second (Threading
) allows us to create and handle Threads in Python.
As example we want to evaluate the function $f(x,y) = \log(\exp(x)\cdot \log(y))$ in a vectorial way, over two huge unidimensional arrays $\mathbf{x},\mathbf{y} \in \mathbb{R}^N$.
This problem can be easily solved with NumPy vectorized operations, but since we are working with huge arrays, we want to split the computation over such arrays into computation of chunks of these arrays, each one performed by a single Thread.
In [3]:
@numba.jit('void(double[:], double[:], int64, int64, double[:])', nopython=True, nogil=True)
def numba_f(x, y, s_index, delta, res):
"""
x: (1d ndarray)
y: (1d ndarray)
s_index: (int) starting index
delta: (int) size of the chunk to compute
res: (1d ndarray) which store the results
"""
for i in range(s_index, min(s_index+delta, len(x))):
res[i] = np.log(np.exp(x[i]) * np.log(y[i]))
Now we create the huge unidimensional arrays to be passed to this function:
In [4]:
# data size, 100 million items
N = int(100e6)
# data
x = np.random.random(N)
y = np.random.random(N)
# array for results
r = np.zeros(N)
Next we configure the number of threads to use, the chunk size of each thread and the starting index of each thread:
In [9]:
# number of threads
T = 4
# data size for each thread
chunk_size = N // T
# starting index for each thread
s_indexes = [i*chunk_size for i in range(T)]
Finally we create the threads, launch its execution, and wait for each one to finish its computation:
In [6]:
# threads creation
threads = []
for s_index in s_indexes:
threads.append( threading.Thread( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
# threads start execution
for thread in threads:
thread.start()
# we wait for each thread to finish its execution
for thread in threads:
thread.join()
# all threads have finished here
Also we can run a 1-threaded version as follows:
In [8]:
numba_f(x, y, 0, N, r)
Now we can benchmark the execution of both version with timeit
magic:
In [10]:
%%timeit
threads = []
for s_index in s_indexes:
threads.append( threading.Thread( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
for thread in threads:
thread.start()
for thread in threads:
thread.join()
In [29]:
%%memit
threads = []
for s_index in s_indexes:
threads.append( threading.Thread( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
for thread in threads:
thread.start()
for thread in threads:
thread.join()
In [11]:
%%timeit
numba_f(x, y, 0, N, r)
timeit
calculation in order to make a fair comparison. x2
in the threaded version. Why the improvement isn't x4
if we are using $4$ threads? (Consider this machine which has $2$ physicall cores with hyperthreading).Alternatively if we are not worried about memory consumption due to the replication of resources caused by the creation of process, we can use the multiprocessing
library instead. It has the same exact sintax as the Threading
library with minor changes, but processes are created instead of threads.
In [22]:
import multiprocessing
We will use the same example used above:
In [31]:
# creation of processes
jobs = []
for s_index in s_indexes:
jobs.append( multiprocessing.Process( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
# start execution of processes
for job in jobs:
job.start()
# we wait for each process to finish its execution
for job in jobs:
job.join()
# all processes have finished here
And also perform a time benchmark with timeit
:
In [32]:
%%timeit
jobs = []
for s_index in s_indexes:
jobs.append( multiprocessing.Process( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
for job in jobs:
job.start()
for job in jobs:
job.join()
As you can see the performance is not as good as with the Threading
library, but anyway there is an improvement to the naive/sequential version.
In [33]:
%%memit
jobs = []
for s_index in s_indexes:
jobs.append( multiprocessing.Process( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
for job in jobs:
job.start()
for job in jobs:
job.join()
IPython.Parallel
We will see how to run multiple tasks in parallel on a multicore computer. IPython implements highly-powerful and user-friendly facilities for interactive parallel computing in the Notebook.
We first need to install ipyparallel
(also called IPython.parallel
) and configure it:
conda install ipyparallel
Then you have to add the line c.NotebookApp.server_extensions.append('ipyparallel. nbextension')
to your configuration file: ~/.jupyter/jupyter_notebook_config.py
. If this file doesn't exist you must create it with:
jupyter notebook --generate-config
Finally, to enable the IPython Clusters tab in Jupyter Notebook run:
ipcluster nbextension enable
To use IPython.parallel
, we need to launch a few engines. There are two ways:
ipcluster start -n 4
in the terminal.In general, you can launch as many engines as the number of CPUs you have on your machine.
For more info: Ipython Github Repository
In [2]:
from ipyparallel import Client
There are several steps to distribute code across multiple cores:
Engines are Python processes that execute code on different computing units. Once the engines have been launched, we create a Client instance that will give us access to these engines:
In [36]:
rc = Client()
There are two ways to access the engines:
In [37]:
rc.ids
Out[37]:
There are several ways to run code in parallel on the engines. First, we can use the %px
magic command.
The code passed to the %px
magic command is executed on all engines. Here, we display the OS process identifier (also called PID) of every engine. Every engine is an independent Python process.
In [38]:
%px import os, time
In [39]:
%px print(os.getpid())
We can also specify the exact list of engines to run code on. The --targets option accepts a list of engine identifiers.
Note that we used the cell magic %%px
this time instead of the line magic. The cell magic allows us to execute several lines of code on the engines.
In [44]:
%%px --targets 0:3
print(os.getpid())
By default, the %px
magic executes commands in blocking mode; the cell only returns when the commands have completed on all engines. It is possible to run non-blocking commands with the --noblock
or -a
option. In this case, the cell returns immediately, and the task's status and results can be polled asynchronously from IPython's interactive session:
In [71]:
%%px -a
import time
time.sleep(5)
Out[71]:
In [72]:
print(_.elapsed, _.ready())
The %pxresult
blocks until the task finishes:
In [73]:
%pxresult
print(_.elapsed, _.ready())
We can also create a direct view on some or all of the engines. IPython
provides convenient functions for common use cases, such as a parallel map
function:
In [78]:
view = rc[:]
print(view)
res = view.map(lambda x: x*x, range(10))
print(res.get())
The direct view has a few useful methods like parallel versions of map() and apply().
The load-balanced interface gives us high-level parallel computing routines that are dynamically executed on the engines.
Here, we will demonstrate how to use it with a very known example: Estimate $\pi$ in parallel using a Monte-Carlo method:
Let's first create a balanced view:
In [98]:
v = rc.load_balanced_view()
The first function samples and counts the number of (random) points in the quarter disc. The second function below returns an estimation of $\pi$ based on the number of points in the quarter disc, and the total number of points:
In [99]:
def sample(n):
import numpy as np
# Random coordinates.
x, y = np.random.rand(2, n)
# Square distances to the origin.
r_square = x ** 2 + y ** 2
# Number of points in the quarter disc.
return (r_square <= 1).sum()
def pi(n_in, n):
return 4. * float(n_in) / n
Here is an example:
In [13]:
n = 100000000
pi(sample(n), n)
Out[13]:
Let's evaluate the time taken by this function on a single core:
In [53]:
%timeit pi(sample(n),n)
We will now run this simulation in parallel. First, we divide this task into 100 smaller subtasks where the number of points is divided by 100
In [79]:
args = [n // 100] * 100
print(args)
We use a parallel map()
function to run these tasks in parallel. Our sample()
function is called 100 times, taking n // 100
as its argument every time. We will combine the 100 results later.
In [80]:
ar = v.map(sample, args)
There's also a synchronous version of map()
called map_sync()
which blocks until the tasks have completed, and directly returns the results.
This function doesn't return the results. Instead, it launches the 100 tasks in parallel and returns an AsyncResult
object. The AsyncResult
object can be used to interactively poll the tasks status and eventually retrieve the results. This object has a metadata
attribute: a list of dictionaries for all engines with useful information.
In [86]:
print(len(ar.metadata))
ar.metadata[0]
Out[86]:
In [59]:
ar.ready(), ar.progress
Out[59]:
This tells us that the tasks are still running at this point, and that ar.progress
tasks have completed so far.
Once all tasks have completed, we can get some information about the elapsed time:
In [60]:
ar.elapsed, ar.serial_time
Out[60]:
The first number represents the actual elapsed time for the entire job, while the second number represents the cumulative time spent on all engines.
Finally, we combine all results with the ar.result()
method. This is the list of all results returned by the 100 tasks. We use the pi() function to get the final estimation:
In [63]:
print(ar.result())
In [64]:
pi(np.sum(ar.result()), n)
Out[64]:
In [117]:
import time
from IPython.display import display
from ipywidgets import IntProgress, HTML, VBox
In [126]:
def progress_bar(ar):
# We create a progress bar.
progress = IntProgress()
progress.bar_style = 'info'
# The maximum value is the number of tasks.
progress.max = len(ar.msg_ids)
# We display the widget in the output area.
label = HTML()
box = VBox(children=[label, progress])
display(box)
# Repeat every second:
while not ar.ready():
# Update the widget's value with the
# number of tasks that have finished
# so far.
progress.value = ar.progress
label.value = "{0} / {1}".format(ar.progress,len(ar.msg_ids))
time.sleep(0.1)
label.value = "{0} / {1}".format(ar.progress,len(ar.msg_ids))
progress.value = progress.max
In [127]:
ar = v.map(sample, args)
progress_bar(ar)