In [ ]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

Introduction to MPI and mpi4py

MPI stands for Message Passing Interface. It is a library that allows to:

  • spawn several processes
  • adress them individually
  • have them communicate between them

MPI can be used in many languages (C, C++, Fortran), and is extensively used in High-Performance Computing.
mpi4py is the Python interface to MPI.

Installation

conda install -c conda-forge mpi4py

(The standard Anaconda channel for mpi4py is broken. It is necessary to use the conda-forge channel instead.)

Example

Let us try to get a feeling on how mpi4py works by looking at the example below:


In [ ]:
%%file example.py

from mpi4py.MPI import COMM_WORLD as communicator
import random

# Draw one random integer between 0 and 100
i = random.randint(0, 100)
print('Rank %d' %communicator.rank + ' drew a random integer: %d' %i )

# Gather the results
integer_list = communicator.gather( i, root=0 )
if communicator.rank == 0:
    print('\nRank 0 gathered the results:')
    print(integer_list)

In [ ]:
! mpirun -np 3 python example.py

What happened?

  • "mpirun -np 3" spawns 3 processes.

  • All processes execute the same code. (In this case, they all execute the same Python script: example.py.)

  • Each process gets a unique identification number (communicator.rank).

  • Based on this identifier and e.g. based on if statements, the different processes can be addressed individually, and perform different work.

  • MPI provides functions (like communicator.gather) that allow processes to communicate data (even between different nodes).

NB: There are many other communication functions, e.g.:

  • one-to-one communication (send, receive, isend, ireceive)
  • all-to-one communication (gather, reduce)
  • one-to-all communication (scatter, broadcast)
  • all-to-all (allgather, allreduce)

See the mpi4py documentation for more information.


Digit classification with mpi4py

Let us now apply mpi4py to our problem: digit classification.

As mentioned earlier, the parallelization, in this case, is conceptually trivial: the process should split the test data among themselves and each process should perform the prediction only its share of the data.

On two processes

Let start with only with only two processes. In the script below, the data test_images is split into a smaller array small_test_images, which is different for each process.


In [ ]:
%%file parallel_script.py

from classification import nearest_neighbor_prediction
import numpy as np
from mpi4py.MPI import COMM_WORLD as communicator

# Load data
train_images = np.load('./data/train_images.npy')
train_labels = np.load('./data/train_labels.npy')
test_images = np.load('./data/test_images.npy')

# Use only the data that this rank needs
N_test = len(test_images)
if communicator.rank == 0:
    i_start = 0
    i_end = N_test/2
elif communicator.rank == 1:
    i_start = N_test/2
    i_end = N_test    
small_test_images = test_images[i_start:i_end]

# Predict the results
small_test_labels = nearest_neighbor_prediction(small_test_images, train_images, train_labels)

# Assignement: gather the labels on one process and have it write it to a file
# Hint: you can use np.hstack to merge a list of arrays into a single array, 
# and np.save to save an array to a file.

In [ ]:
%%time
! mpirun -np 2 python parallel_script.py

The code executes faster than the serial example, because each process has a smaller amount of work, and the two processes execute this work in parallel.

However, at the end of the script, each process has the corresponding label array small_test_labels. But these arrays still need to be concatenated together and written to a single file.

Assignement: based on the previous script (example.py), use the functionalities of mpi4py to gather the labels on one rank, and have this rank write the data to a single file data/test_labels_parallel.npy.

On more processes

The above code works for two processes, but does not generalize easily to an arbitrary number of processes.

In order to split the initial array test_images into an arbitrary number of arrays (one per process), we can use the function np.array_split, which splits an array and returns a list of smaller arrays.

Note: Below, the number 784 corresponds to 28x28, i.e. the number of pixels for each image.


In [ ]:
# Load and split the set of test images
test_images = np.load('data/test_images.npy')
split_arrays_list = np.array_split( test_images, 4 )

# Print the corresponding shape
print( 'Shape of the original array:' )
print( test_images.shape )
print('Shape of the splitted arrays:')
for array in split_arrays_list:
    print( array.shape )

Assignement: in the code below, use the function array_split to split test_images between an arbitrary number of processes, and have each process pick their own small array.

Note: Within the script, communicator.size gives the number of processes that have been spawn by mpirun.


In [ ]:
%%file parallel_script.py

from classification import nearest_neighbor_prediction
import numpy as np
from mpi4py.MPI import COMM_WORLD as communicator

# Load data
train_images = np.load('./data/train_images.npy')
train_labels = np.load('./data/train_labels.npy')
test_images = np.load('./data/test_images.npy')

# Assignement: use the function np.array_split the data `test_images` among the processes
# Have each process select their own small array.
small_test_images = #.....

# Predict the results and gather it on rank 0
small_test_labels = nearest_neighbor_prediction(small_test_images, train_images, train_labels)

# Assignement: gather the labels on one process and have it write it to a file
# Hint: you can use np.hstack to merge a list of arrays into a single array, 
# and np.save to save an array to a file.

In [ ]:
%%time
! mpirun -np 4 python parallel_script.py

Check the results

Finally we can check that the results are valid.


In [ ]:
# Load the data from the file
test_images = np.load('data/test_images.npy')
test_labels_parallel = np.load('data/test_labels_parallel.npy')

# Define function to have a look at the data
def show_random_digit( images, labels=None ):
    """"Show a random image out of `images`, 
    with the corresponding label if available"""
    i = np.random.randint(len(images))
    image = images[i].reshape((28, 28))
    plt.imshow( image, cmap='Greys' )
    if labels is not None:
        plt.title('Label: %d' %labels[i])

In [ ]:
show_random_digit( test_images, test_labels_parallel )

Next tutorial

Let us now see how to perform the same tasks with concurrent.futures.