Scientific Programming in Python

Topic 8: Basics of Parallelism

Notebook created by Martín Villanueva - martin.villanueva@usm.cl - DI UTFSM - June 2017.

Note: Since this notebook makes use of Ipython Interactive widgets, you have to run the corresponding cell in order to visualize it properly.


In [21]:
import numpy as np
import matplotlib.pyplot as plt
import sys
import memory_profiler
%load_ext memory_profiler


The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler

1.- About Parallelism in Python

Some general concepts:

  1. Concurrency: Concurrency is concerned with managing access to shared state from different threads.
  2. Parallelism: Parallelism is concerned with utilizing multiple processors/cores to improve the performance of a computation.
  3. Process: A process is an instance of a computer program that is being executed. A process may be made up of multiple threads of execution that execute instructions concurrently.
  4. Threads: A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.

Threads vs Processes

First thing you need to know to understand the difference between a process and a thread, is a fact, that processes do not run, threads do.

  • Process: Each process provides the resources needed to execute a program. A process has a virtual address space, executable code, open handles to system objects, a security context, a unique process identifier, environment variables, a priority class, minimum and maximum working set sizes, and at least one thread of execution. Each process is started with a single thread, often called the primary thread, but can create additional threads from any of its threads.

  • Thread: A thread is an entity within a process that can be scheduled for execution. All threads of a process share its virtual address space and system resources. In addition, each thread maintains exception handlers, a scheduling priority, thread local storage, a unique thread identifier, and a set of structures the system will use to save the thread context until it is scheduled. The thread context includes the thread's set of machine registers, the kernel stack, a thread environment block, and a user stack in the address space of the thread's process. Threads can also have their own security context, which can be used for impersonating clients.

Global Interpreter Lock (GIL) in Python

From Python Documentation: In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython's memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

The GIL is controversial because it prevents multithreaded CPython programs from taking full advantage of multiprocessor systems in certain situations. Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL. Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck.

2.- Multithreading with threading and numba


In [2]:
import numba
import threading

We can completely bypass the GIL of Python with this two libraries: Numba and Threading. The first (that we study in a previuos session) allows us to create a just in time compiled function, with a nogil option that completely avoid any call to the CPython API in order to bypass the GIL. The second (Threading) allows us to create and handle Threads in Python.

As example we want to evaluate the function $f(x,y) = \log(\exp(x)\cdot \log(y))$ in a vectorial way, over two huge unidimensional arrays $\mathbf{x},\mathbf{y} \in \mathbb{R}^N$.

This problem can be easily solved with NumPy vectorized operations, but since we are working with huge arrays, we want to split the computation over such arrays into computation of chunks of these arrays, each one performed by a single Thread.


In [3]:
@numba.jit('void(double[:], double[:], int64, int64, double[:])', nopython=True, nogil=True)
def numba_f(x, y, s_index, delta, res):
    """
    x: (1d ndarray)
    y: (1d ndarray)
    s_index: (int) starting index
    delta: (int) size of the chunk to compute
    res: (1d ndarray) which store the results
    """
    for i in range(s_index, min(s_index+delta, len(x))):
        res[i] = np.log(np.exp(x[i]) * np.log(y[i]))

Now we create the huge unidimensional arrays to be passed to this function:


In [4]:
# data size, 100 million items
N = int(100e6)

# data
x = np.random.random(N)
y = np.random.random(N)

# array for results
r = np.zeros(N)

Next we configure the number of threads to use, the chunk size of each thread and the starting index of each thread:


In [9]:
# number of threads
T = 4

# data size for each thread
chunk_size = N // T

# starting index for each thread
s_indexes = [i*chunk_size for i in range(T)]

Finally we create the threads, launch its execution, and wait for each one to finish its computation:


In [6]:
# threads creation
threads = []
for s_index in s_indexes:
    threads.append( threading.Thread( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
# threads start execution
for thread in threads:
    thread.start()
# we wait for each thread to finish its execution
for thread in threads:
    thread.join()
# all threads have finished here

Also we can run a 1-threaded version as follows:


In [8]:
numba_f(x, y, 0, N, r)

Now we can benchmark the execution of both version with timeit magic:


In [10]:
%%timeit 
threads = []
for s_index in s_indexes:
    threads.append( threading.Thread( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()


1 loop, best of 3: 1.09 s per loop

In [29]:
%%memit
threads = []
for s_index in s_indexes:
    threads.append( threading.Thread( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()


peak memory: 2372.86 MiB, increment: 0.05 MiB

In [11]:
%%timeit
numba_f(x, y, 0, N, r)


1 loop, best of 3: 2.36 s per loop
  • The creation of threads was included in the timeit calculation in order to make a fair comparison.
  • You can observe an improvement slightly higher than x2 in the threaded version. Why the improvement isn't x4 if we are using $4$ threads? (Consider this machine which has $2$ physicall cores with hyperthreading).

3.- Multiprocessing

Alternatively if we are not worried about memory consumption due to the replication of resources caused by the creation of process, we can use the multiprocessing library instead. It has the same exact sintax as the Threading library with minor changes, but processes are created instead of threads.


In [22]:
import multiprocessing

We will use the same example used above:


In [31]:
# creation of processes
jobs = []
for s_index in s_indexes:
    jobs.append( multiprocessing.Process( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
# start execution of processes
for job in jobs:
    job.start()
# we wait for each process to finish its execution
for job in jobs:
    job.join()
# all processes have finished here

And also perform a time benchmark with timeit:


In [32]:
%%timeit 
jobs = []
for s_index in s_indexes:
    jobs.append( multiprocessing.Process( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
for job in jobs:
    job.start()
for job in jobs:
    job.join()


1 loop, best of 3: 1.8 s per loop

As you can see the performance is not as good as with the Threading library, but anyway there is an improvement to the naive/sequential version.


In [33]:
%%memit
jobs = []
for s_index in s_indexes:
    jobs.append( multiprocessing.Process( target=numba_f, args=(x, y, s_index, chunk_size, r) ) )
for job in jobs:
    job.start()
for job in jobs:
    job.join()


peak memory: 2372.84 MiB, increment: 0.00 MiB

4.- IPython.Parallel

We will see how to run multiple tasks in parallel on a multicore computer. IPython implements highly-powerful and user-friendly facilities for interactive parallel computing in the Notebook.

We first need to install ipyparallel (also called IPython.parallel) and configure it:

conda install ipyparallel

Then you have to add the line c.NotebookApp.server_extensions.append('ipyparallel. nbextension') to your configuration file: ~/.jupyter/jupyter_notebook_config.py. If this file doesn't exist you must create it with:

jupyter notebook --generate-config

Finally, to enable the IPython Clusters tab in Jupyter Notebook run:

ipcluster nbextension enable

To use IPython.parallel, we need to launch a few engines. There are two ways:

  1. The first way to do it is to run ipcluster start -n 4 in the terminal.
  2. Or you can also launch engines from the Notebook dashboard.

In general, you can launch as many engines as the number of CPUs you have on your machine.

For more info: Ipython Github Repository


In [2]:
from ipyparallel import Client

There are several steps to distribute code across multiple cores:

  1. Launching several IPython engines (there is typically one process per core).
  2. Creating a Client that acts as a proxy to these engines.
  3. Using the client to launch tasks on the engines and retrieve the results.

Engines are Python processes that execute code on different computing units. Once the engines have been launched, we create a Client instance that will give us access to these engines:


In [36]:
rc = Client()

There are two ways to access the engines:

  • With the direct interface, we have a direct access to every engine.
  • With the load-balanced interface, we submit jobs to a scheduler which dynamically assigns them to the engines depending on their current load.

Direct Interface

The ids attribute of the client shows us the identifiers of the engines that were automatically detected by IPython:


In [37]:
rc.ids


Out[37]:
[0, 1, 2, 3]

There are several ways to run code in parallel on the engines. First, we can use the %px magic command.

The code passed to the %px magic command is executed on all engines. Here, we display the OS process identifier (also called PID) of every engine. Every engine is an independent Python process.


In [38]:
%px import os, time

In [39]:
%px print(os.getpid())


[stdout:0] 15544
[stdout:1] 15542
[stdout:2] 15543
[stdout:3] 15545

We can also specify the exact list of engines to run code on. The --targets option accepts a list of engine identifiers.

Note that we used the cell magic %%px this time instead of the line magic. The cell magic allows us to execute several lines of code on the engines.


In [44]:
%%px --targets 0:3
print(os.getpid())


[stdout:0] 15544
[stdout:1] 15542
[stdout:2] 15543

By default, the %px magic executes commands in blocking mode; the cell only returns when the commands have completed on all engines. It is possible to run non-blocking commands with the --noblock or -a option. In this case, the cell returns immediately, and the task's status and results can be polled asynchronously from IPython's interactive session:


In [71]:
%%px -a
import time
time.sleep(5)


Out[71]:
<AsyncResult: execute>

In [72]:
print(_.elapsed, _.ready())


0.276527 False

The %pxresult blocks until the task finishes:


In [73]:
%pxresult
print(_.elapsed, _.ready())


5.027085 True

We can also create a direct view on some or all of the engines. IPython provides convenient functions for common use cases, such as a parallel map function:


In [78]:
view = rc[:]
print(view)
res = view.map(lambda x: x*x, range(10))
print(res.get())


<DirectView [0, 1, 2, 3]>
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

The direct view has a few useful methods like parallel versions of map() and apply().

Load-balanced Interface

The load-balanced interface gives us high-level parallel computing routines that are dynamically executed on the engines.

Here, we will demonstrate how to use it with a very known example: Estimate $\pi$ in parallel using a Monte-Carlo method:

  • The "Monte Carlo Method" is a method of solving problems using statistics. Given the probability, $P$, that an event will occur in certain conditions, a computer can be used to generate those conditions repeatedly. The number of times the event occurs divided by the number of times the conditions are generated should be approximately equal to $P$.
  • We will sample a large number of points uniformly in a unit square, and estimate the proportion of those which are in a quarter unit circle. We'll then get an estimation of pi since we know that this proportion should be pi/4.

Let's first create a balanced view:


In [98]:
v = rc.load_balanced_view()

The first function samples and counts the number of (random) points in the quarter disc. The second function below returns an estimation of $\pi$ based on the number of points in the quarter disc, and the total number of points:


In [99]:
def sample(n):
    import numpy as np
    # Random coordinates.
    x, y = np.random.rand(2, n)
    # Square distances to the origin.
    r_square = x ** 2 + y ** 2
    # Number of points in the quarter disc.
    return (r_square <= 1).sum()

def pi(n_in, n):
    return 4. * float(n_in) / n

Here is an example:


In [13]:
n = 100000000
pi(sample(n), n)


Out[13]:
3.1414034

Let's evaluate the time taken by this function on a single core:


In [53]:
%timeit pi(sample(n),n)


1 loop, best of 3: 5.78 s per loop

We will now run this simulation in parallel. First, we divide this task into 100 smaller subtasks where the number of points is divided by 100


In [79]:
args = [n // 100] * 100
print(args)


[1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000, 1000000]

We use a parallel map() function to run these tasks in parallel. Our sample() function is called 100 times, taking n // 100 as its argument every time. We will combine the 100 results later.


In [80]:
ar = v.map(sample, args)

There's also a synchronous version of map() called map_sync() which blocks until the tasks have completed, and directly returns the results.

This function doesn't return the results. Instead, it launches the 100 tasks in parallel and returns an AsyncResult object. The AsyncResult object can be used to interactively poll the tasks status and eventually retrieve the results. This object has a metadata attribute: a list of dictionaries for all engines with useful information.


In [86]:
print(len(ar.metadata))
ar.metadata[0]


100
Out[86]:
{'after': [],
 'completed': datetime.datetime(2017, 6, 11, 18, 59, 2, 266188, tzinfo=tzutc()),
 'data': {},
 'engine_id': 1,
 'engine_uuid': '149d4f71-b08ce1e3bfe43739aca00f86',
 'error': None,
 'execute_input': None,
 'execute_result': None,
 'follow': [],
 'msg_id': 'a67a050e-f6e42d198c4047fc88d7c99f',
 'outputs': [],
 'received': datetime.datetime(2017, 6, 11, 18, 59, 2, 293158, tzinfo=tzutc()),
 'started': datetime.datetime(2017, 6, 11, 18, 59, 2, 98913, tzinfo=tzutc()),
 'status': 'ok',
 'stderr': '',
 'stdout': '',
 'submitted': datetime.datetime(2017, 6, 11, 18, 59, 2, 83549, tzinfo=tzutc())}

In [59]:
ar.ready(), ar.progress


Out[59]:
(False, 20)

This tells us that the tasks are still running at this point, and that ar.progress tasks have completed so far.

Once all tasks have completed, we can get some information about the elapsed time:


In [60]:
ar.elapsed, ar.serial_time


Out[60]:
(2.843275, 10.093617999999998)

The first number represents the actual elapsed time for the entire job, while the second number represents the cumulative time spent on all engines.

Finally, we combine all results with the ar.result() method. This is the list of all results returned by the 100 tasks. We use the pi() function to get the final estimation:


In [63]:
print(ar.result())


[785031, 785106, 785003, 785859, 785476, 785609, 785184, 785558, 785772, 785583, 785485, 785425, 785371, 785218, 785493, 785697, 785261, 785241, 785652, 785031, 785342, 785193, 785997, 785416, 785160, 785200, 785384, 785673, 786332, 785177, 786004, 785397, 785148, 784814, 785427, 785295, 785806, 785606, 785754, 784712, 785112, 785345, 786177, 785246, 783837, 785325, 785505, 785406, 785509, 785384, 784566, 785953, 784803, 784742, 785783, 785409, 785071, 785788, 785560, 785246, 784681, 785738, 786119, 785308, 784338, 784824, 785710, 784670, 785148, 785126, 785297, 785695, 785873, 785949, 785487, 785554, 785245, 785568, 785740, 785092, 785879, 785840, 785214, 785393, 785362, 785951, 785251, 785364, 785610, 785204, 785532, 785204, 785662, 785304, 784837, 785424, 784836, 785776, 785610, 784837]

In [64]:
pi(np.sum(ar.result()), n)


Out[64]:
3.14151644

A progress bar with Ipython.widgets

We can create cool Ipython widgets to visualize the progress of the tasks. The idea is to create a loop polling for the tasks' status at every 0.1 second. An IntProgressWidget widget is updated in real-time and shows the progress of the tasks:


In [117]:
import time 
from IPython.display import display
from ipywidgets import IntProgress, HTML, VBox

In [126]:
def progress_bar(ar):
    # We create a progress bar.
    progress = IntProgress()
    progress.bar_style = 'info'
    # The maximum value is the number of tasks.
    progress.max = len(ar.msg_ids)
    # We display the widget in the output area.
    label = HTML()
    box = VBox(children=[label, progress])
    display(box)
    # Repeat every second:
    while not ar.ready():
        # Update the widget's value with the
        # number of tasks that have finished
        # so far.
        progress.value = ar.progress
        label.value = "{0} / {1}".format(ar.progress,len(ar.msg_ids))
        time.sleep(0.1)
    label.value = "{0} / {1}".format(ar.progress,len(ar.msg_ids))
    progress.value = progress.max

In [127]:
ar = v.map(sample, args)
progress_bar(ar)