In [ ]:
import numpy as np
import matplotlib.pyplot as plt
from classification import nearest_neighbor_prediction
%matplotlib inline
concurrent.futures
concurrent.futures
is a high-level interface for asynchronous, parallel calculations.
It is a new interface that replaces the multiprocessing
and threading
interfaces.
As a reminder, here is comparison between mpi4py
and concurrent.futures
:
mpi4py |
concurrent.futures |
---|---|
Can run on several nodes | Runs only on one node |
Only handles processes | Can handle processes and threads |
No interactivity | Some interactivity (e.g. integrates in Jupyter notebook) |
Allows elaborate communications between processes | No communication between processes or threads |
In this notebook, the feature that will be most visible is the fact that concurrent.futures
integrates well into interactive python environments (e.g. Jupyter notebook, ipython). In particular, there is no need to execute code in a separate script, no need to save the data in a file and then to read it in the interactive python environment. The results are directly accessible in the interactive python environment.
Python 3: already installed by default
Python 2: conda install futures
In [ ]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
In the example, we determine which integers in a list of integers are even.
The statement with ProcessPoolExecutor(max_workers=4) as e
spawns 4 processes and suppresses them once they have finished the calculation.
The function map
distributes the integers of the list among the processes.
The returned object result
is an iterator, with each element corresponding to the elements of list_integers
.
In [ ]:
# Define function
def is_even( i ):
"""Determine if the integer i is even"""
even = (i%2==0)
return( even )
list_integers = [ 4, 3, 6, 7, 9, 10, 124, 325 ]
# Spawn 4 processes
with ProcessPoolExecutor(max_workers=4) as e:
result = e.map( is_even, list_integers )
for answer in result:
print(answer)
Note that, because concurrent.futures
integrates in the Jupyter Notebook, the object result
is directly accessible in the notebook. We did not have to save it to a file and to load it in the Jupyter notebook. We also did not need to gather the results on one processor.
In [ ]:
# Load the data
train_images = np.load('./data/train_images.npy')
train_labels = np.load('./data/train_labels.npy')
test_images = np.load('./data/test_images.npy')
# Define function to have a look at the data
def show_random_digit( images, labels=None ):
""""Show a random image out of `images`,
with the corresponding label if available"""
i = np.random.randint(len(images))
image = images[i].reshape((28, 28))
plt.imshow( image, cmap='Greys' )
if labels is not None:
plt.title('Label: %d' %labels[i])
In addition, we need to define a function that takes only one argument, in order to pass it to the function map
.
In [ ]:
def predict( test_images ):
return( nearest_neighbor_prediction( test_images, train_images, train_labels ) )
In [ ]:
%%time
test_labels_serial = predict( test_images )
In [ ]:
# Choose the number of processes and split the data
N_processes = 4
split_arrays = np.array_split( test_images, N_processes )
In [ ]:
%%time
with ProcessPoolExecutor(max_workers=N_processes) as e:
result = e.map( predict, split_arrays )
# Merge the result from each process into a single array
test_labels_proc = np.hstack( ( small_test_labels for small_test_labels in result ) )
In [ ]:
show_random_digit( test_images, test_labels_proc )
In [ ]:
# Choose the number of threads and split the data
N_threads = 4
split_arrays = np.array_split( test_images, N_threads )
In [ ]:
%%time
with ThreadPoolExecutor(max_workers=N_threads) as e:
result = e.map( predict, split_arrays )
# Merge the result from each thread into a single array
test_labels_threads = np.hstack( ( small_test_labels for small_test_labels in result ) )
In [ ]:
show_random_digit( test_images, test_labels_threads )