In [ ]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
mpi4py
MPI stands for Message Passing Interface. It is a library that allows to:
MPI can be used in many languages (C, C++, Fortran), and is extensively used in High-Performance Computing.
mpi4py
is the Python interface to MPI.
conda install -c conda-forge mpi4py
(The standard Anaconda channel for mpi4py is broken. It is necessary to use the conda-forge channel instead.)
Let us try to get a feeling on how mpi4py
works by looking at the example below:
In [ ]:
%%file example.py
from mpi4py.MPI import COMM_WORLD as communicator
import random
# Draw one random integer between 0 and 100
i = random.randint(0, 100)
print('Rank %d' %communicator.rank + ' drew a random integer: %d' %i )
# Gather the results
integer_list = communicator.gather( i, root=0 )
if communicator.rank == 0:
print('\nRank 0 gathered the results:')
print(integer_list)
In [ ]:
! mpirun -np 3 python example.py
"mpirun -np 3
" spawns 3 processes.
All processes execute the same code. (In this case, they all execute the same Python script: example.py
.)
Each process gets a unique identification number (communicator.rank
).
Based on this identifier and e.g. based on if
statements, the different processes can be addressed individually, and perform different work.
MPI provides functions (like communicator.gather
) that allow processes to communicate data (even between different nodes).
NB: There are many other communication functions, e.g.:
send
, receive
, isend
, ireceive
)gather
, reduce
)scatter
, broadcast
)allgather
, allreduce
)See the mpi4py documentation for more information.
mpi4py
Let us now apply mpi4py
to our problem: digit classification.
As mentioned earlier, the parallelization, in this case, is conceptually trivial: the process should split the test data among themselves and each process should perform the prediction only its share of the data.
Let start with only with only two processes. In the script below, the data test_images
is split into a smaller array small_test_images
, which is different for each process.
In [ ]:
%%file parallel_script.py
from classification import nearest_neighbor_prediction
import numpy as np
from mpi4py.MPI import COMM_WORLD as communicator
# Load data
train_images = np.load('./data/train_images.npy')
train_labels = np.load('./data/train_labels.npy')
test_images = np.load('./data/test_images.npy')
# Use only the data that this rank needs
N_test = len(test_images)
if communicator.rank == 0:
i_start = 0
i_end = N_test/2
elif communicator.rank == 1:
i_start = N_test/2
i_end = N_test
small_test_images = test_images[i_start:i_end]
# Predict the results
small_test_labels = nearest_neighbor_prediction(small_test_images, train_images, train_labels)
# Assignement: gather the labels on one process and have it write it to a file
# Hint: you can use np.hstack to merge a list of arrays into a single array,
# and np.save to save an array to a file.
In [ ]:
%%time
! mpirun -np 2 python parallel_script.py
The code executes faster than the serial example, because each process has a smaller amount of work, and the two processes execute this work in parallel.
However, at the end of the script, each process has the corresponding label array small_test_labels
. But these arrays still need to be concatenated together and written to a single file.
Assignement: based on the previous script (example.py
), use the functionalities of mpi4py
to gather the labels on one rank, and have this rank write the data to a single file data/test_labels_parallel.npy
.
The above code works for two processes, but does not generalize easily to an arbitrary number of processes.
In order to split the initial array test_images
into an arbitrary number of arrays (one per process), we can use the function np.array_split
, which splits an array and returns a list of smaller arrays.
Note: Below, the number 784 corresponds to 28x28, i.e. the number of pixels for each image.
In [ ]:
# Load and split the set of test images
test_images = np.load('data/test_images.npy')
split_arrays_list = np.array_split( test_images, 4 )
# Print the corresponding shape
print( 'Shape of the original array:' )
print( test_images.shape )
print('Shape of the splitted arrays:')
for array in split_arrays_list:
print( array.shape )
Assignement: in the code below, use the function array_split
to split test_images
between an arbitrary number of processes, and have each process pick their own small array.
Note: Within the script, communicator.size
gives the number of processes that have been spawn by mpirun
.
In [ ]:
%%file parallel_script.py
from classification import nearest_neighbor_prediction
import numpy as np
from mpi4py.MPI import COMM_WORLD as communicator
# Load data
train_images = np.load('./data/train_images.npy')
train_labels = np.load('./data/train_labels.npy')
test_images = np.load('./data/test_images.npy')
# Assignement: use the function np.array_split the data `test_images` among the processes
# Have each process select their own small array.
small_test_images = #.....
# Predict the results and gather it on rank 0
small_test_labels = nearest_neighbor_prediction(small_test_images, train_images, train_labels)
# Assignement: gather the labels on one process and have it write it to a file
# Hint: you can use np.hstack to merge a list of arrays into a single array,
# and np.save to save an array to a file.
In [ ]:
%%time
! mpirun -np 4 python parallel_script.py
In [ ]:
# Load the data from the file
test_images = np.load('data/test_images.npy')
test_labels_parallel = np.load('data/test_labels_parallel.npy')
# Define function to have a look at the data
def show_random_digit( images, labels=None ):
""""Show a random image out of `images`,
with the corresponding label if available"""
i = np.random.randint(len(images))
image = images[i].reshape((28, 28))
plt.imshow( image, cmap='Greys' )
if labels is not None:
plt.title('Label: %d' %labels[i])
In [ ]:
show_random_digit( test_images, test_labels_parallel )
Let us now see how to perform the same tasks with concurrent.futures
.