Pleasantly Parallel

Linh B. Ngo

  • Embarrassingly parallel/naturally parallel/pleasantly parallel
  • “A computation that can obviously be divided into a number of completely different parts, each of which can be executed by a separate process.”
  • No communication or very little communication among the processes.
  • Each process can do its tasks without any interaction with the other processes.

Example: Trapezoid Calculation

In [1]:
N = 8; a = 0; b = 2; h = (b - a)/N;

In [2]:
# With 4 processors (cores)
size = 4; rank = 1
local_N = N / size
local_a = a + rank * h * local_N
local_b = local_a + h * local_N
print (local_a, local_b)

0.5 1.0
  • Which workload goes to which process?
    if (rank == i) {
      do great things
  • Start with small number of processes
  • Calculation workload assignment manually for each count of processes
  • Generalize assignment for process i based on sample calculations

In [3]:
%%writefile codes/openmpi/static.c
 * MPI implementation of the trapezoid approach to integral calculation following a static
 * workload distribution and standard send()/recv() calls. 
 * We assume that the number of trapezoids is divisible by the number of MPI process. 

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

double Trap(double a, double b, int n);
double f(double x);

int main(int argc, char * argv[] ) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double local_a, local_b; /* left and right endpoints on each MPI process */
  int local_n;  /* number of trapezoids assigned to each individual MPI process */
  double result;       /* final integration result */
  double local_result; /* partial integration result at each process */
  int p;        /* counter */
  MPI_Status status;


  a = atof(argv[1]);
  b = atof(argv[2]);
  n = atoi(argv[3]);

  // calculate work interval for each process
  h = (b - a) / n;
  local_n = n / size;
  local_a = a + rank * local_n * h;
  local_b = local_a + local_n * h;
  local_result = Trap(local_a,local_b,local_n);

  // sending the results back to the master
  if (rank == 0){
    result = local_result;
    for (p = 1; p < size; p++){
      result += local_result;

  // displaying output at the master node
  if (rank == 0){
    printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
    printf("The integral is %lf\n", result);  

double Trap(double a, double b, int n) {
  double len, area;
  double x;
  int i;
  len = (b - a) / n;
  area = 0.5 * (f(a) + f(b));
  x = a + len;
  for (i=1; i<n; i++) {
    area = area + f(x);
    x = x + len;
  area = area * len;
  return area;

double f(double x) {
  return ( x*x );

Overwriting codes/openmpi/static.c

In [4]:
!mpicc codes/openmpi/static.c -o ~/static
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/static 0 100 1000

Calculating the integral of f(x) from 0.000000 to 100.000000
The integral is 333333.500000

Does each process receive the same amount of work?

In [5]:
%%writefile codes/openmpi/static_wtiming.c
 * MPI implementation of the trapezoid approach to integral calculation following a static
 * workload distribution and standard send()/recv() calls. 
 * We assume that the number of trapezoids is divisible by the number of MPI process. 

#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"

double Trap(double a, double b, int n);
double f(double x);

int main(int argc, char * argv[] ) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double local_a, local_b; /* left and right endpoints on each MPI process */
  int local_n;  /* number of trapezoids assigned to each individual MPI process */
  double result;       /* final integration result */
  double local_result; /* partial integration result at each process */
  double start, stop, tpar, tcomm;  /* timing variables */
  int p;        /* counter */
  MPI_Status status;


  a = atof(argv[1]);
  b = atof(argv[2]);
  n = atoi(argv[3]);

  // calculate work interval for each process
  start = MPI_Wtime();
  h = (b - a) / n;
  local_n = n / size;
  local_a = a + rank * local_n * h;
  local_b = local_a + local_n * h;
  local_result = Trap(local_a,local_b,local_n);
  stop = MPI_Wtime();
  tpar = stop - start;

  printf("Process %d uses %lfs to calculate partial result %lf\n", rank, tpar, local_result);

  // sending the results back to the master
  start = MPI_Wtime();
  if (rank == 0){
    result = local_result;
    for (p = 1; p < size; p++){
      result += local_result;
  stop  = MPI_Wtime();
  tcomm = stop - start;

  // displaying output at the master node
  if (rank == 0){
    printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
    printf("The integral is %lf\n", result);  
    printf("Communication time: %.5fs\n",tcomm);

double Trap(double a, double b, int n) {
  double len, area;
  double x;
  int i;
  len = (b - a) / n;
  area = 0.5 * (f(a) + f(b));
  x = a + len;
  for (i=1; i<n; i++) {
    area = area + f(x);
    x = x + len;
  area = area * len;
  return area;

double f(double x) {
  return ( x*x );

Overwriting codes/openmpi/static_wtiming.c

In [6]:
!mpicc codes/openmpi/static_wtiming.c -o ~/static_wtiming
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/static_wtiming 0 100 1000

Process 1 uses 0.000001s to calculate partial result 4557.312500
Process 6 uses 0.000001s to calculate partial result 82682.312500
Process 0 uses 0.000001s to calculate partial result 651.062500
Process 5 uses 0.000001s to calculate partial result 59244.812500
Process 4 uses 0.000001s to calculate partial result 39713.562500
Process 7 uses 0.000001s to calculate partial result 110026.062500
Process 2 uses 0.000001s to calculate partial result 12369.812500
Process 3 uses 0.000001s to calculate partial result 24088.562500
Calculating the integral of f(x) from 0.000000 to 100.000000
The integral is 333333.500000
Communication time: 0.00032s

In [7]:
%%writefile codes/openmpi/cyclic.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

float f(float x);

main(int argc, char** argv) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double local_a, local_b; /* left and right endpoints on each MPI process */
  int local_n;  /* number of trapezoids assigned to each individual MPI process */
  double result;       /* final integration result */
  double local_result; /* partial integration result at each process */
  double start, stop, tpar, tcomm;  /* timing variables */
  int i;        /* counter */
  MPI_Status  status;

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  a = atof(argv[1]);               
  b = atof(argv[2]);              
  n = atoi(argv[3]);               
  // calculate work interval for each process
  start = MPI_Wtime(); 
  h = (b-a)/n;    /* height is the same for all processes */
  local_n = n/size;  /*  so is the number of trapezoids */

  /* Each process' interval starts at: */
  local_a = a + rank * h;
  local_b = local_a + h;
  local_result = 0;

  for (i = 0; i < n/size; i++){
    local_result = local_result + h * (f(local_a) +  f(local_b)) / 2;
    local_a += h * size;
    local_b = local_a + h;
  stop = MPI_Wtime();
  tpar = stop - start;

  printf("Process %d uses %lfs to calculate partial result %lf\n", rank, tpar, local_result);
  // sending the results back to the master using reduce  
  start = MPI_Wtime();
  MPI_Reduce(&local_result, &result, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
  stop = MPI_Wtime();
  tcomm = stop - start;

  /* Print the result */
  if (rank == 0){
    printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
    printf("The integral is %lf\n", result);  
    printf("Communication time: %.5fs\n",tcomm);

float f(float x) {
    return ( x*x );

Overwriting codes/openmpi/cyclic.c

In [8]:
!mpicc codes/openmpi/cyclic.c -o ~/cyclic
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/cyclic 0 100 1000

Process 2 uses 0.000003s to calculate partial result 41478.812410
Process 4 uses 0.000003s to calculate partial result 41728.562518
Process 7 uses 0.000003s to calculate partial result 42105.062441
Process 1 uses 0.000003s to calculate partial result 41354.312473
Process 3 uses 0.000003s to calculate partial result 41603.562379
Process 0 uses 0.000003s to calculate partial result 41230.062246
Process 6 uses 0.000003s to calculate partial result 41979.312502
Process 5 uses 0.000003s to calculate partial result 41853.812428
Calculating the integral of f(x) from 0.000000 to 100.000000
The integral is 333333.499397
Communication time: 0.00050s

In [9]:
%%writefile codes/openmpi/dynamic.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

#define SEND 1
#define STOP 0

float f(float x);

main(int argc, char** argv) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double param[3]; /* array containing end points and height for each individual trapezoid
                      for communication purpose */
  double local_result = 0.0;  /* area of each individual trapezoid */
  double partial_result = 0.0; /* amount of area calculated by each process */
  double result = 0.0;     /* Total integral            */
  int source;    /* Process sending the partial integral  */
  int dest = 0;  /* All messages go to 0      */
  int tag = 0;
  double start, stop, tpar, tcomm;
  int i,count, partial_count;
  MPI_Status  status;

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
    start = MPI_Wtime();

  /* initial job distribution is handled only by process 0 */
  if (rank == 0){
    a = atof(argv[1]);               
    b = atof(argv[2]);               
    n = atoi(argv[3]);              
    h = (b-a)/n;    
    count = 0;
    /* send out the first round of work assignment, incrementing count as needed */
    for (i = 1; i < size; i++){
      param[0] = a + count * h;
      param[1] = param[0] + h;
      param[2] = h;
      count = count + 1;
  else {
  tpar = 0.0;
  tcomm = 0.0;
  partial_count = 0; 
  /* Each process that is not process 0 works on its portion, send the partial result back to 0, 
   * and wait for new workload unless the TAG of the message is 0 
  if (rank != 0){
    do {
      start = MPI_Wtime();
      local_result = param[2] * (f(param[1]) +  f(param[0])) / 2;
      partial_result += local_result;
      stop = MPI_Wtime(); 
      tpar += stop - start;
      partial_count += 1;
      start = MPI_Wtime();
      stop = MPI_Wtime();
      tcomm += stop - start;
    } while(status.MPI_TAG != 0);
    printf("Process %d uses %lfs to calculate partial result %lf of %d portions and %lfs for communication \n", rank, tpar, partial_result, partial_count, tcomm);

  /* Process 0 receives results and sends out work while there is still work left to be sent
   * (count < n) */
  if (rank == 0) {
    do {
      result = result + local_result;    
      param[0] = a + count * h;
      param[1] = param[0] + h;
      param[2] = h;
      count = count + 1; 
    while (count < n);  

    /* Make sure that we receive everything */
    for (i = 0; i < (size - 1); i++){
      result = result + local_result;  

  /* All the work has been sent, */
  if (rank == 0){
    for (i = 1; i < size; i++){

    /* Print the result */
    if (rank == 0) {
        printf("With n = %d trapezoids, our estimate\n",
        printf("of the integral from %f to %f = %f\n",
            a, b, result);

    /* Shut down MPI */
} /*  main  */

float f(float x) {
    return ( x*x );

Overwriting codes/openmpi/dynamic.c

In [10]:
!mpicc codes/openmpi/dynamic.c -o ~/dynamic
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/dynamic 0 100 1000

With n = 1000 trapezoids, our estimate
of the integral from 0.000000 to 100.000000 = 333333.499397
Process 3 uses 0.000011s to calculate partial result 48070.134273 of 145 portions and 0.005426s for communication 
Process 4 uses 0.000011s to calculate partial result 47221.898243 of 144 portions and 0.005598s for communication 
Process 6 uses 0.000013s to calculate partial result 46641.744142 of 145 portions and 0.005714s for communication 
Process 7 uses 0.000010s to calculate partial result 48331.990086 of 132 portions and 0.004679s for communication 
Process 2 uses 0.000012s to calculate partial result 50906.612141 of 153 portions and 0.005935s for communication 
Process 5 uses 0.000012s to calculate partial result 45724.445379 of 143 portions and 0.005760s for communication 
Process 1 uses 0.000012s to calculate partial result 46436.675133 of 138 portions and 0.006154s for communication 

In [11]:
%%writefile codes/openmpi/pi_mc.c

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <math.h>

int main(int argc, char* argv[]) {

  int nPointsInCircle  = 0;
  int i = 0;
  int nPointsTotal     = 0;
  int nPointsPerRegion = 0;
  int pointsReceived   = 0;
  double piEstimate;
  double x_start, y_start;
  double x_rand, y_rand, rand_radius; 
  int rank, size, squareWidth;
  MPI_Status status;

  nPointsTotal = atoi(argv[1]);

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  // Seed RNG and make calculations for constants
  nPointsPerRegion = nPointsTotal / size;
  srand( (unsigned)time(NULL) + rank ); // seed differently per node
  squareWidth = (int) sqrt(size);

  // Place and record points in the circle
  x_start = (double)(rank % squareWidth) / squareWidth;
  y_start = (double)((rank / squareWidth)) / squareWidth;

  //printf("Rank %d out of %d has starting x %f and starting y %f on a square of size %d \n", 
  //       rank, size, x_start, y_start, squareWidth);
  for (i = 0; i < nPointsPerRegion; i++) {
    x_rand = (double)rand() / ((double)RAND_MAX * squareWidth) + x_start;
    y_rand = (double)rand() / ((double)RAND_MAX * squareWidth) + y_start;
    rand_radius = (x_rand - 0.5) * (x_rand - 0.5) + (y_rand - 0.5) * (y_rand - 0.5);
    if (rand_radius <= 0.25) {
      nPointsInCircle += 1;
  MPI_Reduce(&nPointsInCircle, &pointsReceived, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
  if (rank == 0) {
    piEstimate = (double)(pointsReceived * 4) / nPointsTotal;
    printf("%f\n", piEstimate);

  return 0;

Overwriting codes/openmpi/pi_mc.c

In [12]:
!mpicc -lm codes/openmpi/pi_mc.c -o ~/pi_mc
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/pi_mc 100000

codes/openmpi/pi_mc.c: In function ‘main’:
codes/openmpi/pi_mc.c:36:51: error: expected ‘;’ before ‘)’ token
          rank, size, x_start, y_start, squareWidth);
codes/openmpi/pi_mc.c:36:51: error: expected statement before ‘)’ token
Rank 0 out of 4 has starting x 0.000000 and starting y 0.000000 on a square of size 2 
Rank 3 out of 4 has starting x 0.500000 and starting y 0.500000 on a square of size 2 
Rank 1 out of 4 has starting x 0.500000 and starting y 0.000000 on a square of size 2 
Rank 2 out of 4 has starting x 0.000000 and starting y 0.500000 on a square of size 2 