In [ ]:
import numpy as np
import matplotlib.pyplot as plt
from classification import nearest_neighbor_prediction
%matplotlib inline

Introduction to concurrent.futures

concurrent.futures is a high-level interface for asynchronous, parallel calculations.
It is a new interface that replaces the multiprocessing and threading interfaces.

As a reminder, here is comparison between mpi4py and concurrent.futures:

mpi4py concurrent.futures
Can run on several nodes Runs only on one node
Only handles processes Can handle processes and threads
No interactivity Some interactivity (e.g. integrates in Jupyter notebook)
Allows elaborate communications between processes No communication between processes or threads

In this notebook, the feature that will be most visible is the fact that concurrent.futures integrates well into interactive python environments (e.g. Jupyter notebook, ipython). In particular, there is no need to execute code in a separate script, no need to save the data in a file and then to read it in the interactive python environment. The results are directly accessible in the interactive python environment.

Installation

  • Python 3: already installed by default

  • Python 2: conda install futures

Import statement


In [ ]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

Example

In the example, we determine which integers in a list of integers are even.

  • The statement with ProcessPoolExecutor(max_workers=4) as e spawns 4 processes and suppresses them once they have finished the calculation.

  • The function map distributes the integers of the list among the processes.

  • The returned object result is an iterator, with each element corresponding to the elements of list_integers.


In [ ]:
# Define function
def is_even( i ):
    """Determine if the integer i is even"""
    even = (i%2==0)
    return( even )

list_integers = [ 4, 3, 6, 7, 9, 10, 124, 325 ]

# Spawn 4 processes
with ProcessPoolExecutor(max_workers=4) as e:
    result = e.map( is_even, list_integers )
    
for answer in result:
    print(answer)

Note that, because concurrent.futures integrates in the Jupyter Notebook, the object result is directly accessible in the notebook. We did not have to save it to a file and to load it in the Jupyter notebook. We also did not need to gather the results on one processor.


Digit classification

Let us again load the data, and define a function to look at it.


In [ ]:
# Load the data
train_images = np.load('./data/train_images.npy')
train_labels = np.load('./data/train_labels.npy')
test_images = np.load('./data/test_images.npy')

# Define function to have a look at the data
def show_random_digit( images, labels=None ):
    """"Show a random image out of `images`, 
    with the corresponding label if available"""
    i = np.random.randint(len(images))
    image = images[i].reshape((28, 28))
    plt.imshow( image, cmap='Greys' )
    if labels is not None:
        plt.title('Label: %d' %labels[i])

In addition, we need to define a function that takes only one argument, in order to pass it to the function map.


In [ ]:
def predict( test_images ):
    return( nearest_neighbor_prediction( test_images, train_images, train_labels ) )

Serial execution

Let us again run the serial code, but now directly from the Jupyter notebook (i.e. not from a script that we then run from the terminal using the ! character).


In [ ]:
%%time 
test_labels_serial = predict( test_images )

With processes


In [ ]:
# Choose the number of processes and split the data
N_processes = 4
split_arrays = np.array_split( test_images, N_processes )

In [ ]:
%%time
with ProcessPoolExecutor(max_workers=N_processes) as e:
    result = e.map( predict, split_arrays )

# Merge the result from each process into a single array
test_labels_proc = np.hstack( ( small_test_labels for small_test_labels in result ) )

In [ ]:
show_random_digit( test_images, test_labels_proc )

With threads


In [ ]:
# Choose the number of threads and split the data
N_threads = 4
split_arrays = np.array_split( test_images, N_threads )

In [ ]:
%%time
with ThreadPoolExecutor(max_workers=N_threads) as e:
    result = e.map( predict, split_arrays )
    
# Merge the result from each thread into a single array
test_labels_threads = np.hstack( ( small_test_labels for small_test_labels in result ) )

In [ ]:
show_random_digit( test_images, test_labels_threads )