Pleasantly Parallel

Linh B. Ngo

  • Embarrassingly parallel/naturally parallel/pleasantly parallel
  • “A computation that can obviously be divided into a number of completely different parts, each of which can be executed by a separate process.”
  • No communication or very little communication among the processes.
  • Each process can do its tasks without any interaction with the other processes.

Example: Trapezoid Calculation


In [1]:
N = 8; a = 0; b = 2; h = (b - a)/N;

In [2]:
# With 4 processors (cores)
size = 4; rank = 1
local_N = N / size
local_a = a + rank * h * local_N
local_b = local_a + h * local_N
print (local_a, local_b)


0.5 1.0
  • Which workload goes to which process?
    if (rank == i) {
      do great things
    }
  • Start with small number of processes
  • Calculation workload assignment manually for each count of processes
  • Generalize assignment for process i based on sample calculations

In [3]:
%%writefile codes/openmpi/static.c
/*
 * MPI implementation of the trapezoid approach to integral calculation following a static
 * workload distribution and standard send()/recv() calls. 
 * We assume that the number of trapezoids is divisible by the number of MPI process. 
 */

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

double Trap(double a, double b, int n);
double f(double x);

int main(int argc, char * argv[] ) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double local_a, local_b; /* left and right endpoints on each MPI process */
  int local_n;  /* number of trapezoids assigned to each individual MPI process */
  double result;       /* final integration result */
  double local_result; /* partial integration result at each process */
  int p;        /* counter */
  MPI_Status status;

  MPI_Init(&argc,&argv);
  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
  MPI_Comm_size(MPI_COMM_WORLD,&size);

  a = atof(argv[1]);
  b = atof(argv[2]);
  n = atoi(argv[3]);

  // calculate work interval for each process
  h = (b - a) / n;
  local_n = n / size;
  local_a = a + rank * local_n * h;
  local_b = local_a + local_n * h;
  local_result = Trap(local_a,local_b,local_n);

  // sending the results back to the master
  if (rank == 0){
    result = local_result;
    for (p = 1; p < size; p++){
      MPI_Recv(&local_result,1,MPI_DOUBLE,p,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      result += local_result;
    }
  } 
  else{
    MPI_Send(&local_result,1,MPI_DOUBLE,0,0,MPI_COMM_WORLD);  
  }

  // displaying output at the master node
  if (rank == 0){
    printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
    printf("The integral is %lf\n", result);  
  }
  MPI_Finalize();
}

double Trap(double a, double b, int n) {
  double len, area;
  double x;
  int i;
  len = (b - a) / n;
  area = 0.5 * (f(a) + f(b));
  x = a + len;
  for (i=1; i<n; i++) {
    area = area + f(x);
    x = x + len;
  }
  area = area * len;
  return area;
}

double f(double x) {
  return ( x*x );
}


Overwriting codes/openmpi/static.c

In [4]:
!mpicc codes/openmpi/static.c -o ~/static
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/static 0 100 1000


Calculating the integral of f(x) from 0.000000 to 100.000000
The integral is 333333.500000

Does each process receive the same amount of work?


In [5]:
%%writefile codes/openmpi/static_wtiming.c
/*
 * MPI implementation of the trapezoid approach to integral calculation following a static
 * workload distribution and standard send()/recv() calls. 
 * We assume that the number of trapezoids is divisible by the number of MPI process. 
 */

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

double Trap(double a, double b, int n);
double f(double x);

int main(int argc, char * argv[] ) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double local_a, local_b; /* left and right endpoints on each MPI process */
  int local_n;  /* number of trapezoids assigned to each individual MPI process */
  double result;       /* final integration result */
  double local_result; /* partial integration result at each process */
  double start, stop, tpar, tcomm;  /* timing variables */
  int p;        /* counter */
  MPI_Status status;

  MPI_Init(&argc,&argv);
  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
  MPI_Comm_size(MPI_COMM_WORLD,&size);

  a = atof(argv[1]);
  b = atof(argv[2]);
  n = atoi(argv[3]);

  // calculate work interval for each process
  start = MPI_Wtime();
  h = (b - a) / n;
  local_n = n / size;
  local_a = a + rank * local_n * h;
  local_b = local_a + local_n * h;
  local_result = Trap(local_a,local_b,local_n);
  stop = MPI_Wtime();
  tpar = stop - start;

  printf("Process %d uses %lfs to calculate partial result %lf\n", rank, tpar, local_result);

  // sending the results back to the master
  start = MPI_Wtime();
  if (rank == 0){
    result = local_result;
    for (p = 1; p < size; p++){
      MPI_Recv(&local_result,1,MPI_DOUBLE,p,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      result += local_result;
    }
  } 
  else{
    MPI_Send(&local_result,1,MPI_DOUBLE,0,0,MPI_COMM_WORLD);  
  }
  stop  = MPI_Wtime();
  tcomm = stop - start;

  // displaying output at the master node
  if (rank == 0){
    printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
    printf("The integral is %lf\n", result);  
    printf("Communication time: %.5fs\n",tcomm);
  }
  MPI_Finalize();
}

double Trap(double a, double b, int n) {
  double len, area;
  double x;
  int i;
  len = (b - a) / n;
  area = 0.5 * (f(a) + f(b));
  x = a + len;
  for (i=1; i<n; i++) {
    area = area + f(x);
    x = x + len;
  }
  area = area * len;
  return area;
}

double f(double x) {
  return ( x*x );
}


Overwriting codes/openmpi/static_wtiming.c

In [6]:
!mpicc codes/openmpi/static_wtiming.c -o ~/static_wtiming
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/static_wtiming 0 100 1000


Process 1 uses 0.000001s to calculate partial result 4557.312500
Process 6 uses 0.000001s to calculate partial result 82682.312500
Process 0 uses 0.000001s to calculate partial result 651.062500
Process 5 uses 0.000001s to calculate partial result 59244.812500
Process 4 uses 0.000001s to calculate partial result 39713.562500
Process 7 uses 0.000001s to calculate partial result 110026.062500
Process 2 uses 0.000001s to calculate partial result 12369.812500
Process 3 uses 0.000001s to calculate partial result 24088.562500
Calculating the integral of f(x) from 0.000000 to 100.000000
The integral is 333333.500000
Communication time: 0.00032s

In [7]:
%%writefile codes/openmpi/cyclic.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

float f(float x);

main(int argc, char** argv) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double local_a, local_b; /* left and right endpoints on each MPI process */
  int local_n;  /* number of trapezoids assigned to each individual MPI process */
  double result;       /* final integration result */
  double local_result; /* partial integration result at each process */
  double start, stop, tpar, tcomm;  /* timing variables */
  int i;        /* counter */
  MPI_Status  status;

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  a = atof(argv[1]);               
  b = atof(argv[2]);              
  n = atoi(argv[3]);               
 
  // calculate work interval for each process
  start = MPI_Wtime(); 
  h = (b-a)/n;    /* height is the same for all processes */
  local_n = n/size;  /*  so is the number of trapezoids */

  /* Each process' interval starts at: */
  local_a = a + rank * h;
  local_b = local_a + h;
  local_result = 0;

  for (i = 0; i < n/size; i++){
    local_result = local_result + h * (f(local_a) +  f(local_b)) / 2;
    local_a += h * size;
    local_b = local_a + h;
  }
  stop = MPI_Wtime();
  tpar = stop - start;

  printf("Process %d uses %lfs to calculate partial result %lf\n", rank, tpar, local_result);
  
  // sending the results back to the master using reduce  
  start = MPI_Wtime();
  MPI_Reduce(&local_result, &result, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
  stop = MPI_Wtime();
  tcomm = stop - start;

  /* Print the result */
  if (rank == 0){
    printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
    printf("The integral is %lf\n", result);  
    printf("Communication time: %.5fs\n",tcomm);
  }
  MPI_Finalize();
}

float f(float x) {
    return ( x*x );
}


Overwriting codes/openmpi/cyclic.c

In [8]:
!mpicc codes/openmpi/cyclic.c -o ~/cyclic
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/cyclic 0 100 1000


Process 2 uses 0.000003s to calculate partial result 41478.812410
Process 4 uses 0.000003s to calculate partial result 41728.562518
Process 7 uses 0.000003s to calculate partial result 42105.062441
Process 1 uses 0.000003s to calculate partial result 41354.312473
Process 3 uses 0.000003s to calculate partial result 41603.562379
Process 0 uses 0.000003s to calculate partial result 41230.062246
Process 6 uses 0.000003s to calculate partial result 41979.312502
Process 5 uses 0.000003s to calculate partial result 41853.812428
Calculating the integral of f(x) from 0.000000 to 100.000000
The integral is 333333.499397
Communication time: 0.00050s

In [9]:
%%writefile codes/openmpi/dynamic.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

#define SEND 1
#define STOP 0

float f(float x);

main(int argc, char** argv) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double param[3]; /* array containing end points and height for each individual trapezoid
                      for communication purpose */
  double local_result = 0.0;  /* area of each individual trapezoid */
  double partial_result = 0.0; /* amount of area calculated by each process */
  double result = 0.0;     /* Total integral            */
  int source;    /* Process sending the partial integral  */
  int dest = 0;  /* All messages go to 0      */
  int tag = 0;
  double start, stop, tpar, tcomm;
  int i,count, partial_count;
  MPI_Status  status;

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
    start = MPI_Wtime();

  /* initial job distribution is handled only by process 0 */
  if (rank == 0){
    a = atof(argv[1]);               
    b = atof(argv[2]);               
    n = atoi(argv[3]);              
    h = (b-a)/n;    
    count = 0;
    /* send out the first round of work assignment, incrementing count as needed */
    for (i = 1; i < size; i++){
      param[0] = a + count * h;
      param[1] = param[0] + h;
      param[2] = h;
      MPI_Send(param,3,MPI_DOUBLE,i,SEND,MPI_COMM_WORLD);
      count = count + 1;
    }
  }
  else {
    MPI_Recv(param,3,MPI_DOUBLE,0,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
  }
    
  tpar = 0.0;
  tcomm = 0.0;
  partial_count = 0; 
  /* Each process that is not process 0 works on its portion, send the partial result back to 0, 
   * and wait for new workload unless the TAG of the message is 0 
   */
  if (rank != 0){
    do {
      start = MPI_Wtime();
      local_result = param[2] * (f(param[1]) +  f(param[0])) / 2;
      partial_result += local_result;
      stop = MPI_Wtime(); 
      tpar += stop - start;
      partial_count += 1;
      start = MPI_Wtime();
      MPI_Send(&local_result,1,MPI_DOUBLE,0,SEND,MPI_COMM_WORLD);      
      MPI_Recv(param,3,MPI_DOUBLE,0,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      stop = MPI_Wtime();
      tcomm += stop - start;
    } while(status.MPI_TAG != 0);
    printf("Process %d uses %lfs to calculate partial result %lf of %d portions and %lfs for communication \n", rank, tpar, partial_result, partial_count, tcomm);
  }
  

  /* Process 0 receives results and sends out work while there is still work left to be sent
   * (count < n) */
  if (rank == 0) {
    do {
      MPI_Recv(&local_result,1,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      result = result + local_result;    
      param[0] = a + count * h;
      param[1] = param[0] + h;
      param[2] = h;
      MPI_Send(param,3,MPI_DOUBLE,status.MPI_SOURCE,SEND,MPI_COMM_WORLD); 
      count = count + 1; 
    }   
    while (count < n);  

    /* Make sure that we receive everything */
    for (i = 0; i < (size - 1); i++){
      MPI_Recv(&local_result,1,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      result = result + local_result;  
    } 
  }

  /* All the work has been sent, */
  if (rank == 0){
    for (i = 1; i < size; i++){
      MPI_Send(param,3,MPI_DOUBLE,i,STOP,MPI_COMM_WORLD);
    }
  }

    /* Print the result */
    if (rank == 0) {
        printf("With n = %d trapezoids, our estimate\n",
            n);
        printf("of the integral from %f to %f = %f\n",
            a, b, result);
    }

    /* Shut down MPI */
    MPI_Finalize();
} /*  main  */

float f(float x) {
    return ( x*x );
}


Overwriting codes/openmpi/dynamic.c

In [10]:
!mpicc codes/openmpi/dynamic.c -o ~/dynamic
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/dynamic 0 100 1000


With n = 1000 trapezoids, our estimate
of the integral from 0.000000 to 100.000000 = 333333.499397
Process 3 uses 0.000011s to calculate partial result 48070.134273 of 145 portions and 0.005426s for communication 
Process 4 uses 0.000011s to calculate partial result 47221.898243 of 144 portions and 0.005598s for communication 
Process 6 uses 0.000013s to calculate partial result 46641.744142 of 145 portions and 0.005714s for communication 
Process 7 uses 0.000010s to calculate partial result 48331.990086 of 132 portions and 0.004679s for communication 
Process 2 uses 0.000012s to calculate partial result 50906.612141 of 153 portions and 0.005935s for communication 
Process 5 uses 0.000012s to calculate partial result 45724.445379 of 143 portions and 0.005760s for communication 
Process 1 uses 0.000012s to calculate partial result 46436.675133 of 138 portions and 0.006154s for communication 

In [11]:
%%writefile codes/openmpi/pi_mc.c

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <math.h>

int main(int argc, char* argv[]) {

  int nPointsInCircle  = 0;
  int i = 0;
  int nPointsTotal     = 0;
  int nPointsPerRegion = 0;
  int pointsReceived   = 0;
  double piEstimate;
  double x_start, y_start;
  double x_rand, y_rand, rand_radius; 
  int rank, size, squareWidth;
  MPI_Status status;

  nPointsTotal = atoi(argv[1]);

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  // Seed RNG and make calculations for constants
  nPointsPerRegion = nPointsTotal / size;
  srand( (unsigned)time(NULL) + rank ); // seed differently per node
  squareWidth = (int) sqrt(size);

  // Place and record points in the circle
  x_start = (double)(rank % squareWidth) / squareWidth;
  y_start = (double)((rank / squareWidth)) / squareWidth;

  //printf("Rank %d out of %d has starting x %f and starting y %f on a square of size %d \n", 
  //       rank, size, x_start, y_start, squareWidth);
    
  for (i = 0; i < nPointsPerRegion; i++) {
    x_rand = (double)rand() / ((double)RAND_MAX * squareWidth) + x_start;
    y_rand = (double)rand() / ((double)RAND_MAX * squareWidth) + y_start;
    rand_radius = (x_rand - 0.5) * (x_rand - 0.5) + (y_rand - 0.5) * (y_rand - 0.5);
    if (rand_radius <= 0.25) {
      nPointsInCircle += 1;
    }
  }
  
  MPI_Reduce(&nPointsInCircle, &pointsReceived, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
  if (rank == 0) {
    piEstimate = (double)(pointsReceived * 4) / nPointsTotal;
    printf("%f\n", piEstimate);
  } 

  MPI_Finalize();
  return 0;
}


Overwriting codes/openmpi/pi_mc.c

In [12]:
!mpicc -lm codes/openmpi/pi_mc.c -o ~/pi_mc
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/pi_mc 100000


codes/openmpi/pi_mc.c: In function ‘main’:
codes/openmpi/pi_mc.c:36:51: error: expected ‘;’ before ‘)’ token
          rank, size, x_start, y_start, squareWidth);
                                                   ^
codes/openmpi/pi_mc.c:36:51: error: expected statement before ‘)’ token
Rank 0 out of 4 has starting x 0.000000 and starting y 0.000000 on a square of size 2 
Rank 3 out of 4 has starting x 0.500000 and starting y 0.500000 on a square of size 2 
Rank 1 out of 4 has starting x 0.500000 and starting y 0.000000 on a square of size 2 
Rank 2 out of 4 has starting x 0.000000 and starting y 0.500000 on a square of size 2 
3.135760