Prerequisites

This notebook contains examples which are expected to be run with exactly 4 MPI processes; not because they wouldn't work otherwise, but simply because it's what their description assumes. For this, you need to:

  • Install an MPI distribution on your system, such as OpenMPI, MPICH, or Intel MPI (if not already available).
  • Install some optional dependencies, including mpi4py and ipyparallel; from the root Devito directory, run
    pip install -r requirements-optional.txt
  • Create an ipyparallel MPI profile, by running our simple setup script. From the root directory, run
    ./scripts/create_ipyparallel_mpi_profile.sh

Launch and connect to an ipyparallel cluster

We're finally ready to launch an ipyparallel cluster. Open a new terminal and run the following command

ipcluster start --profile=mpi -n 4

Once the engines have started successfully, we can connect to the cluster


In [1]:
import ipyparallel as ipp
c = ipp.Client(profile='mpi')

In this tutorial, to run commands in parallel over the engines, we will use the %px line magic.


In [2]:
%%px --group-outputs=engine

from mpi4py import MPI
print(f"Hi, I'm rank %d." % MPI.COMM_WORLD.rank)


[stdout:0] Hi, I'm rank 0.
[stdout:1] Hi, I'm rank 1.
[stdout:2] Hi, I'm rank 2.
[stdout:3] Hi, I'm rank 3.

Overview of MPI in Devito

Distributed-memory parallelism via MPI is designed so that users can "think sequentially" for as much as possible. The few things requested to the user are:

  • Like any other MPI program, run with mpirun -np X python ...
  • Some pre- and/or post-processing may be rank-specific (e.g., we may want to plot on a given MPI rank only, even though this might be hidden away in the next Devito releases, when newer support APIs will be provided.
  • Parallel I/O (if and when necessary) to populate the MPI-distributed datasets in input to a Devito Operator. If a shared file system is available, there are a few simple alternatives to pick from, such as NumPy’s memory-mapped arrays.

To enable MPI, users have two options. Either export the environment variable DEVITO_MPI=1 or, programmatically:


In [3]:
%%px
from devito import configuration
configuration['mpi'] = True

In [4]:
%%px
# Keep generated code as simple as possible
configuration['language'] = 'C'
# Fix platform so that this notebook can be tested by py.test --nbval
configuration['platform'] = 'knl7210'

An Operator will then generate MPI code, including sends/receives for halo exchanges. Below, we introduce a running example through which we explain how domain decomposition as well as data access (read/write) and distribution work. Performance optimizations are discussed in a later section.

Let's start by creating a TimeFunction.


In [5]:
%%px
from devito import Grid, TimeFunction, Eq, Operator 
grid = Grid(shape=(4, 4))
u = TimeFunction(name="u", grid=grid, space_order=2, time_order=0)

Domain decomposition is performed when creating a Grid. Users may supply their own domain decomposition, but this is not shown in this notebook. Devito exploits the MPI Cartesian topology abstraction to logically split the Grid over the available MPI processes. Since u is defined over a decomposed Grid, its data get distributed too.


In [6]:
%%px --group-outputs=engine
u.data


[output:0]
Out[0:5]: 
Data([[[0., 0.],
       [0., 0.]]], dtype=float32)
[output:1]
Out[1:5]: 
Data([[[0., 0.],
       [0., 0.]]], dtype=float32)
[output:2]
Out[2:5]: 
Data([[[0., 0.],
       [0., 0.]]], dtype=float32)
[output:3]
Out[3:5]: 
Data([[[0., 0.],
       [0., 0.]]], dtype=float32)

Globally, u consists of 4x4 points -- this is what users "see". But locally, as shown above, each rank has got a 2x2 subdomain. The key point is: for the user, the fact that u.data is distributed is completely abstracted away -- the perception is that of indexing into a classic NumPy array, regardless of whether MPI is enabled or not. All sort of NumPy indexing schemes (basic, slicing, etc.) are supported. For example, we can write into a slice-generated view of our data.


In [7]:
%%px
u.data[0, 1:-1, 1:-1] = 1.

In [8]:
%%px --group-outputs=engine
u.data


[output:0]
Out[0:7]: 
Data([[[0., 0.],
       [0., 1.]]], dtype=float32)
[output:1]
Out[1:7]: 
Data([[[0., 0.],
       [1., 0.]]], dtype=float32)
[output:2]
Out[2:7]: 
Data([[[0., 1.],
       [0., 0.]]], dtype=float32)
[output:3]
Out[3:7]: 
Data([[[1., 0.],
       [0., 0.]]], dtype=float32)

The only limitation, currently, is that a data access cannot require a direct data exchange among two or more processes (e.g., the assignment u.data[0, 0] = u.data[3, 3] will raise an exception unless both entries belong to the same MPI rank).

We can finally write out a trivial Operator to try running something.


In [9]:
%%px
op = Operator(Eq(u.forward, u + 1))
summary = op.apply(time_M=0)


[stderr:0] Operator `Kernel` run in 0.00 s
[stderr:1] Operator `Kernel` run in 0.00 s
[stderr:2] Operator `Kernel` run in 0.00 s
[stderr:3] Operator `Kernel` run in 0.00 s

And we can now check again the (distributed) content of our u.data


In [10]:
%%px --group-outputs=engine
u.data


[output:0]
Out[0:9]: 
Data([[[1., 1.],
       [1., 2.]]], dtype=float32)
[output:1]
Out[1:9]: 
Data([[[1., 1.],
       [2., 1.]]], dtype=float32)
[output:2]
Out[2:9]: 
Data([[[1., 2.],
       [1., 1.]]], dtype=float32)
[output:3]
Out[3:9]: 
Data([[[2., 1.],
       [1., 1.]]], dtype=float32)

Everything as expected. We could also peek at the generated code, because we may be curious to see what sort of MPI calls Devito has generated...


In [11]:
%%px --targets 0
print(op)


#define _POSIX_C_SOURCE 200809L
#include "stdlib.h"
#include "math.h"
#include "sys/time.h"
#include "xmmintrin.h"
#include "pmmintrin.h"
#include "mpi.h"

struct dataobj
{
  void *restrict data;
  int * size;
  int * npsize;
  int * dsize;
  int * hsize;
  int * hofs;
  int * oofs;
} ;

struct profiler
{
  double section0;
} ;


int Kernel(struct dataobj *restrict u_vec, const int time_M, const int time_m, struct profiler * timers, const int x_M, const int x_m, const int y_M, const int y_m)
{
  float (*restrict u)[u_vec->size[1]][u_vec->size[2]] __attribute__ ((aligned (64))) = (float (*)[u_vec->size[1]][u_vec->size[2]]) u_vec->data;
  /* Flush denormal numbers to zero in hardware */
  _MM_SET_DENORMALS_ZERO_MODE(_MM_DENORMALS_ZERO_ON);
  _MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON);
  for (int time = time_m, t0 = (time)%(1), t1 = (time)%(1); time <= time_M; time += 1, t0 = (time)%(1), t1 = (time)%(1))
  {
    struct timeval start_section0, end_section0;
    gettimeofday(&start_section0, NULL);
    /* Begin section0 */
    for (int x = x_m; x <= x_M; x += 1)
    {
      #pragma omp simd aligned(u:64)
      for (int y = y_m; y <= y_M; y += 1)
      {
        u[t1][x + 2][y + 2] = u[t0][x + 2][y + 2] + 1;
      }
    }
    /* End section0 */
    gettimeofday(&end_section0, NULL);
    timers->section0 += (double)(end_section0.tv_sec-start_section0.tv_sec)+(double)(end_section0.tv_usec-start_section0.tv_usec)/1000000;
  }
  return 0;
}

Hang on. There's nothing MPI-specific here! At least apart from the header file #include "mpi.h". What's going on? Well, it's simple. Devito was smart enough to realize that this trivial Operator doesn't even need any sort of halo exchange -- the Eq implements a pure "map computation" (i.e., fully parallel), so it can just let each MPI process do its job without ever synchronizing with halo exchanges. We might want try again with a proper stencil Eq.


In [12]:
%%px --targets 0
op = Operator(Eq(u.forward, u.dx + 1))
print(op)


#define _POSIX_C_SOURCE 200809L
#include "stdlib.h"
#include "math.h"
#include "sys/time.h"
#include "xmmintrin.h"
#include "pmmintrin.h"
#include "mpi.h"

struct dataobj
{
  void *restrict data;
  int * size;
  int * npsize;
  int * dsize;
  int * hsize;
  int * hofs;
  int * oofs;
} ;

struct neighborhood
{
  int ll, lc, lr;
  int cl, cc, cr;
  int rl, rc, rr;
} ;

struct profiler
{
  double section0;
} ;

void haloupdate0(struct dataobj *restrict a_vec, MPI_Comm comm, struct neighborhood * nb, int otime);
void sendrecv0(struct dataobj *restrict a_vec, const int buf_x_size, const int buf_y_size, int ogtime, int ogx, int ogy, int ostime, int osx, int osy, int fromrank, int torank, MPI_Comm comm);
void gather0(float *restrict buf_vec, const int buf_x_size, const int buf_y_size, struct dataobj *restrict a_vec, int otime, int ox, int oy);
void scatter0(float *restrict buf_vec, const int buf_x_size, const int buf_y_size, struct dataobj *restrict a_vec, int otime, int ox, int oy);

int Kernel(const float h_x, struct dataobj *restrict u_vec, MPI_Comm comm, struct neighborhood * nb, const int time_M, const int time_m, struct profiler * timers, const int x_M, const int x_m, const int y_M, const int y_m)
{
  float (*restrict u)[u_vec->size[1]][u_vec->size[2]] __attribute__ ((aligned (64))) = (float (*)[u_vec->size[1]][u_vec->size[2]]) u_vec->data;
  /* Flush denormal numbers to zero in hardware */
  _MM_SET_DENORMALS_ZERO_MODE(_MM_DENORMALS_ZERO_ON);
  _MM_SET_FLUSH_ZERO_MODE(_MM_FLUSH_ZERO_ON);
  for (int time = time_m, t0 = (time)%(1), t1 = (time)%(1); time <= time_M; time += 1, t0 = (time)%(1), t1 = (time)%(1))
  {
    struct timeval start_section0, end_section0;
    gettimeofday(&start_section0, NULL);
    /* Begin section0 */
    haloupdate0(u_vec,comm,nb,t0);
    for (int x = x_m; x <= x_M; x += 1)
    {
      #pragma omp simd aligned(u:64)
      for (int y = y_m; y <= y_M; y += 1)
      {
        float r0 = 1.0/h_x;
        u[t1][x + 2][y + 2] = 5.0e-1F*(-r0*u[t0][x + 1][y + 2] + r0*u[t0][x + 3][y + 2]) + 1;
      }
    }
    /* End section0 */
    gettimeofday(&end_section0, NULL);
    timers->section0 += (double)(end_section0.tv_sec-start_section0.tv_sec)+(double)(end_section0.tv_usec-start_section0.tv_usec)/1000000;
  }
  return 0;
}

void haloupdate0(struct dataobj *restrict a_vec, MPI_Comm comm, struct neighborhood * nb, int otime)
{
  sendrecv0(a_vec,a_vec->hsize[3],a_vec->npsize[2],otime,a_vec->oofs[2],a_vec->hofs[4],otime,a_vec->hofs[3],a_vec->hofs[4],nb->rc,nb->lc,comm);
  sendrecv0(a_vec,a_vec->hsize[2],a_vec->npsize[2],otime,a_vec->oofs[3],a_vec->hofs[4],otime,a_vec->hofs[2],a_vec->hofs[4],nb->lc,nb->rc,comm);
}

void sendrecv0(struct dataobj *restrict a_vec, const int buf_x_size, const int buf_y_size, int ogtime, int ogx, int ogy, int ostime, int osx, int osy, int fromrank, int torank, MPI_Comm comm)
{
  float (*bufs)[buf_y_size];
  float (*bufg)[buf_y_size];
  posix_memalign((void**)&bufs, 64, sizeof(float[buf_x_size][buf_y_size]));
  posix_memalign((void**)&bufg, 64, sizeof(float[buf_x_size][buf_y_size]));
  MPI_Request rrecv;
  MPI_Request rsend;
  MPI_Irecv((float *)bufs,buf_x_size*buf_y_size,MPI_FLOAT,fromrank,13,comm,&rrecv);
  if (torank != MPI_PROC_NULL)
  {
    gather0((float *)bufg,buf_x_size,buf_y_size,a_vec,ogtime,ogx,ogy);
  }
  MPI_Isend((float *)bufg,buf_x_size*buf_y_size,MPI_FLOAT,torank,13,comm,&rsend);
  MPI_Wait(&rsend,MPI_STATUS_IGNORE);
  MPI_Wait(&rrecv,MPI_STATUS_IGNORE);
  if (fromrank != MPI_PROC_NULL)
  {
    scatter0((float *)bufs,buf_x_size,buf_y_size,a_vec,ostime,osx,osy);
  }
  free(bufs);
  free(bufg);
}

void gather0(float *restrict buf_vec, const int buf_x_size, const int buf_y_size, struct dataobj *restrict a_vec, int otime, int ox, int oy)
{
  float (*restrict buf)[buf_y_size] __attribute__ ((aligned (64))) = (float (*)[buf_y_size]) buf_vec;
  float (*restrict a)[a_vec->size[1]][a_vec->size[2]] __attribute__ ((aligned (64))) = (float (*)[a_vec->size[1]][a_vec->size[2]]) a_vec->data;
  for (int x = 0; x <= buf_x_size - 1; x += 1)
  {
    for (int y = 0; y <= buf_y_size - 1; y += 1)
    {
      buf[x][y] = a[otime][x + ox][y + oy];
    }
  }
}

void scatter0(float *restrict buf_vec, const int buf_x_size, const int buf_y_size, struct dataobj *restrict a_vec, int otime, int ox, int oy)
{
  float (*restrict buf)[buf_y_size] __attribute__ ((aligned (64))) = (float (*)[buf_y_size]) buf_vec;
  float (*restrict a)[a_vec->size[1]][a_vec->size[2]] __attribute__ ((aligned (64))) = (float (*)[a_vec->size[1]][a_vec->size[2]]) a_vec->data;
  for (int x = 0; x <= buf_x_size - 1; x += 1)
  {
    for (int y = 0; y <= buf_y_size - 1; y += 1)
    {
      a[otime][x + ox][y + oy] = buf[x][y];
    }
  }
}

Uh-oh -- now the generated code looks more complicated than before, though it still is pretty much human-readable. We can spot the following routines:

  • haloupdate0 performs a blocking halo exchange, relying on three additional functions, gather0, sendrecv0, and scatter0;
  • gather0 copies the (generally non-contiguous) boundary data into a contiguous buffer;
  • sendrecv0 takes the buffered data and sends it to one or more neighboring processes; then it waits until all data from the neighboring processes is received;
  • scatter0 copies the received data into the proper array locations.

This is the simplest halo exchange scheme available in Devito. There are a few, and some of them apply aggressive optimizations, as shown later on.

Before looking at other scenarios and performance optimizations, there is one last thing it is worth discussing -- the data_with_halo view.


In [13]:
%%px --group-outputs=engine
u.data_with_halo


[output:0]
Out[0:12]: 
Data([[[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 1., 1.],
       [0., 0., 1., 2.]]], dtype=float32)
[output:1]
Out[1:10]: 
Data([[[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [1., 1., 0., 0.],
       [2., 1., 0., 0.]]], dtype=float32)
[output:2]
Out[2:10]: 
Data([[[0., 0., 1., 2.],
       [0., 0., 1., 1.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]]], dtype=float32)
[output:3]
Out[3:10]: 
Data([[[2., 1., 0., 0.],
       [1., 1., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]]], dtype=float32)

This is again a global data view. The shown with_halo is the "true" halo surrounding the physical domain, not the halo used for the MPI halo exchanges (often referred to as "ghost region"). So it gets trivial for a user to initialize the "true" halo region (which is typically read by a stencil Eq when an Operator iterates in proximity of the domain bounday).


In [14]:
%%px
u.data_with_halo[:] = 1.

In [15]:
%%px --group-outputs=engine
u.data_with_halo


[output:0]
Out[0:14]: 
Data([[[1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.]]], dtype=float32)
[output:1]
Out[1:12]: 
Data([[[1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.]]], dtype=float32)
[output:2]
Out[2:12]: 
Data([[[1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.]]], dtype=float32)
[output:3]
Out[3:12]: 
Data([[[1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.]]], dtype=float32)

MPI and SparseFunction

A SparseFunction represents a sparse set of points which are generically unaligned with the Grid. A sparse point could be anywhere within a grid, and is therefore attached some coordinates. Given a sparse point, Devito looks at its coordinates and, based on the domain decomposition, logically assigns it to a given MPI process; this is purely logical ownership, as in Python-land, before running an Operator, the sparse point physically lives on the MPI rank which created it. Within op.apply, right before jumping to C-land, the sparse points are scattered to their logical owners; upon returning to Python-land, the sparse points are gathered back to their original location.

In the following example, we attempt injection of four sparse points into the neighboring grid points via linear interpolation.


In [16]:
%%px
from devito import Function, SparseFunction
grid = Grid(shape=(4, 4), extent=(3.0, 3.0))
x, y = grid.dimensions
f = Function(name='f', grid=grid)
coords = [(0.5, 0.5), (1.5, 2.5), (1.5, 1.5), (2.5, 1.5)]
sf = SparseFunction(name='sf', grid=grid, npoint=len(coords), coordinates=coords)

Let:

  • O be a grid point
  • x be a halo point
  • A, B, C, D be the sparse points

We show the global view, that is what the user "sees".

O --- O --- O --- O
|  A  |     |     |
O --- O --- O --- O
|     |  C  |  B  |
O --- O --- O --- O
|     |  D  |     |
O --- O --- O --- O

And now the local view, that is what the MPI ranks own when jumping to C-land.

Rank 0          Rank 1
O --- O --- x   x --- O --- O
|  A  |     |   |     |     |
O --- O --- x   x --- O --- O
|     |  C  |   |  C  |  B  |
x --- x --- x   x --- x --- x

Rank 2           Rank 3
x --- x --- x   x --- x --- x
|     |  C  |   |  C  |  B  |
O --- O --- x   x --- O --- O
|     |  D  |   |  D  |     |
O --- O --- x   x --- O --- O

We observe that the sparse points along the boundary of two or more MPI ranks are duplicated and thus redundantly computed over multiple processes. However, the contributions from these points to the neighboring halo points are naturally ditched, so the final result of the interpolation is as expected. Let's convince ourselves that this is the case. We assign a value of $5$ to each sparse point. Since we are using linear interpolation and all points are placed at the exact center of a grid quadrant, we expect that the contribution of each sparse point to a neighboring grid point will be $5 * 0.25 = 1.25$. Based on the global view above, we eventually expect f to look like as follows:

1.25 --- 1.25 --- 0.00 --- 0.00
|         |        |        |
1.25 --- 2.50 --- 2.50 --- 1.25
|         |        |        |
0.00 --- 2.50 --- 3.75 --- 1.25
|         |        |        |
0.00 --- 1.25 --- 1.25 --- 0.00

Let's check this out.


In [17]:
%%px
sf.data[:] = 5.
op = Operator(sf.inject(field=f, expr=sf))
summary = op.apply()


[stderr:0] Operator `Kernel` run in 0.00 s
[stderr:1] Operator `Kernel` run in 0.00 s
[stderr:2] Operator `Kernel` run in 0.00 s
[stderr:3] Operator `Kernel` run in 0.00 s

In [18]:
%%px --group-outputs=engine
f.data


[output:0]
Out[0:17]: 
Data([[1.25, 1.25],
      [1.25, 2.5 ]], dtype=float32)
[output:1]
Out[1:15]: 
Data([[0.  , 0.  ],
      [2.5 , 1.25]], dtype=float32)
[output:2]
Out[2:15]: 
Data([[0.  , 2.5 ],
      [0.  , 1.25]], dtype=float32)
[output:3]
Out[3:15]: 
Data([[3.75, 1.25],
      [1.25, 0.  ]], dtype=float32)

Performance optimizations

The Devito compiler applies several optimizations before generating code.

  • Redundant halo exchanges are identified and removed. A halo exchange is redundant if a prior halo exchange carries out the same Function update and the data is not “dirty” yet.
  • Computation/communication overlap, with explicit prodding of the asynchronous progress engine to make sure that non-blocking communications execute in background during the compute part.
  • Halo exchanges could also be reshuffled to maximize the extension of the computation/communication overlap region.

To run with all these optimizations enabled, instead of DEVITO_MPI=1, users should set DEVITO_MPI=full, or, equivalently


In [19]:
%%px
configuration['mpi'] = 'full'

We could now peek at the generated code to see that things now look differently.


In [20]:
%%px
op = Operator(Eq(u.forward, u.dx + 1))
# Uncomment below to show code (it's quite verbose)
# print(op)

The body of the time-stepping loop has changed, as it now implements a classic computation/communication overlap scheme:

  • haloupdate0 triggers non-blocking communications;
  • compute0 executes the core domain region, that is the sub-region which doesn't require reading from halo data to be computed;
  • halowait0 wait and terminates the non-blocking communications;
  • remainder0, which internally calls compute0, computes the boundary region requiring the now up-to-date halo data.