In [1]:
    
N = 8; a = 0; b = 2; h = (b - a)/N;
    
In [2]:
    
# With 4 processors (cores)
size = 4; rank = 1
local_N = N / size
local_a = a + rank * h * local_N
local_b = local_a + h * local_N
print (local_a, local_b)
    
    
if (rank == i) {
  do great things
}
In [3]:
    
%%writefile codes/openmpi/static.c
/*
 * MPI implementation of the trapezoid approach to integral calculation following a static
 * workload distribution and standard send()/recv() calls. 
 * We assume that the number of trapezoids is divisible by the number of MPI process. 
 */
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
double Trap(double a, double b, int n);
double f(double x);
int main(int argc, char * argv[] ) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double local_a, local_b; /* left and right endpoints on each MPI process */
  int local_n;  /* number of trapezoids assigned to each individual MPI process */
  double result;       /* final integration result */
  double local_result; /* partial integration result at each process */
  int p;        /* counter */
  MPI_Status status;
  MPI_Init(&argc,&argv);
  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
  MPI_Comm_size(MPI_COMM_WORLD,&size);
  a = atof(argv[1]);
  b = atof(argv[2]);
  n = atoi(argv[3]);
  // calculate work interval for each process
  h = (b - a) / n;
  local_n = n / size;
  local_a = a + rank * local_n * h;
  local_b = local_a + local_n * h;
  local_result = Trap(local_a,local_b,local_n);
  // sending the results back to the master
  if (rank == 0){
    result = local_result;
    for (p = 1; p < size; p++){
      MPI_Recv(&local_result,1,MPI_DOUBLE,p,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      result += local_result;
    }
  } 
  else{
    MPI_Send(&local_result,1,MPI_DOUBLE,0,0,MPI_COMM_WORLD);  
  }
  // displaying output at the master node
  if (rank == 0){
    printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
    printf("The integral is %lf\n", result);  
  }
  MPI_Finalize();
}
double Trap(double a, double b, int n) {
  double len, area;
  double x;
  int i;
  len = (b - a) / n;
  area = 0.5 * (f(a) + f(b));
  x = a + len;
  for (i=1; i<n; i++) {
    area = area + f(x);
    x = x + len;
  }
  area = area * len;
  return area;
}
double f(double x) {
  return ( x*x );
}
    
    
In [4]:
    
!mpicc codes/openmpi/static.c -o ~/static
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/static 0 100 1000
    
    
In [5]:
    
%%writefile codes/openmpi/static_wtiming.c
/*
 * MPI implementation of the trapezoid approach to integral calculation following a static
 * workload distribution and standard send()/recv() calls. 
 * We assume that the number of trapezoids is divisible by the number of MPI process. 
 */
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
double Trap(double a, double b, int n);
double f(double x);
int main(int argc, char * argv[] ) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double local_a, local_b; /* left and right endpoints on each MPI process */
  int local_n;  /* number of trapezoids assigned to each individual MPI process */
  double result;       /* final integration result */
  double local_result; /* partial integration result at each process */
  double start, stop, tpar, tcomm;  /* timing variables */
  int p;        /* counter */
  MPI_Status status;
  MPI_Init(&argc,&argv);
  MPI_Comm_rank(MPI_COMM_WORLD,&rank);
  MPI_Comm_size(MPI_COMM_WORLD,&size);
  a = atof(argv[1]);
  b = atof(argv[2]);
  n = atoi(argv[3]);
  // calculate work interval for each process
  start = MPI_Wtime();
  h = (b - a) / n;
  local_n = n / size;
  local_a = a + rank * local_n * h;
  local_b = local_a + local_n * h;
  local_result = Trap(local_a,local_b,local_n);
  stop = MPI_Wtime();
  tpar = stop - start;
  printf("Process %d uses %lfs to calculate partial result %lf\n", rank, tpar, local_result);
  // sending the results back to the master
  start = MPI_Wtime();
  if (rank == 0){
    result = local_result;
    for (p = 1; p < size; p++){
      MPI_Recv(&local_result,1,MPI_DOUBLE,p,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      result += local_result;
    }
  } 
  else{
    MPI_Send(&local_result,1,MPI_DOUBLE,0,0,MPI_COMM_WORLD);  
  }
  stop  = MPI_Wtime();
  tcomm = stop - start;
  // displaying output at the master node
  if (rank == 0){
    printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
    printf("The integral is %lf\n", result);  
    printf("Communication time: %.5fs\n",tcomm);
  }
  MPI_Finalize();
}
double Trap(double a, double b, int n) {
  double len, area;
  double x;
  int i;
  len = (b - a) / n;
  area = 0.5 * (f(a) + f(b));
  x = a + len;
  for (i=1; i<n; i++) {
    area = area + f(x);
    x = x + len;
  }
  area = area * len;
  return area;
}
double f(double x) {
  return ( x*x );
}
    
    
In [6]:
    
!mpicc codes/openmpi/static_wtiming.c -o ~/static_wtiming
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/static_wtiming 0 100 1000
    
    
In [7]:
    
%%writefile codes/openmpi/cyclic.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
float f(float x);
main(int argc, char** argv) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double local_a, local_b; /* left and right endpoints on each MPI process */
  int local_n;  /* number of trapezoids assigned to each individual MPI process */
  double result;       /* final integration result */
  double local_result; /* partial integration result at each process */
  double start, stop, tpar, tcomm;  /* timing variables */
  int i;        /* counter */
  MPI_Status  status;
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
  a = atof(argv[1]);               
  b = atof(argv[2]);              
  n = atoi(argv[3]);               
 
  // calculate work interval for each process
  start = MPI_Wtime(); 
  h = (b-a)/n;    /* height is the same for all processes */
  local_n = n/size;  /*  so is the number of trapezoids */
  /* Each process' interval starts at: */
  local_a = a + rank * h;
  local_b = local_a + h;
  local_result = 0;
  for (i = 0; i < n/size; i++){
    local_result = local_result + h * (f(local_a) +  f(local_b)) / 2;
    local_a += h * size;
    local_b = local_a + h;
  }
  stop = MPI_Wtime();
  tpar = stop - start;
  printf("Process %d uses %lfs to calculate partial result %lf\n", rank, tpar, local_result);
  
  // sending the results back to the master using reduce  
  start = MPI_Wtime();
  MPI_Reduce(&local_result, &result, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
  stop = MPI_Wtime();
  tcomm = stop - start;
  /* Print the result */
  if (rank == 0){
    printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
    printf("The integral is %lf\n", result);  
    printf("Communication time: %.5fs\n",tcomm);
  }
  MPI_Finalize();
}
float f(float x) {
    return ( x*x );
}
    
    
In [8]:
    
!mpicc codes/openmpi/cyclic.c -o ~/cyclic
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/cyclic 0 100 1000
    
    
In [9]:
    
%%writefile codes/openmpi/dynamic.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#define SEND 1
#define STOP 0
float f(float x);
main(int argc, char** argv) {
  int rank;     /* rank of each MPI process */
  int size;     /* total number of MPI processes */
  double a, b;  /* default left and right endpoints of the interval */
  int n;        /* total number of trapezoids */
  double h;        /* height of the trapezoids */
  double param[3]; /* array containing end points and height for each individual trapezoid
                      for communication purpose */
  double local_result = 0.0;  /* area of each individual trapezoid */
  double partial_result = 0.0; /* amount of area calculated by each process */
  double result = 0.0;     /* Total integral            */
  int source;    /* Process sending the partial integral  */
  int dest = 0;  /* All messages go to 0      */
  int tag = 0;
  double start, stop, tpar, tcomm;
  int i,count, partial_count;
  MPI_Status  status;
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
    start = MPI_Wtime();
  /* initial job distribution is handled only by process 0 */
  if (rank == 0){
    a = atof(argv[1]);               
    b = atof(argv[2]);               
    n = atoi(argv[3]);              
    h = (b-a)/n;    
    count = 0;
    /* send out the first round of work assignment, incrementing count as needed */
    for (i = 1; i < size; i++){
      param[0] = a + count * h;
      param[1] = param[0] + h;
      param[2] = h;
      MPI_Send(param,3,MPI_DOUBLE,i,SEND,MPI_COMM_WORLD);
      count = count + 1;
    }
  }
  else {
    MPI_Recv(param,3,MPI_DOUBLE,0,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
  }
    
  tpar = 0.0;
  tcomm = 0.0;
  partial_count = 0; 
  /* Each process that is not process 0 works on its portion, send the partial result back to 0, 
   * and wait for new workload unless the TAG of the message is 0 
   */
  if (rank != 0){
    do {
      start = MPI_Wtime();
      local_result = param[2] * (f(param[1]) +  f(param[0])) / 2;
      partial_result += local_result;
      stop = MPI_Wtime(); 
      tpar += stop - start;
      partial_count += 1;
      start = MPI_Wtime();
      MPI_Send(&local_result,1,MPI_DOUBLE,0,SEND,MPI_COMM_WORLD);      
      MPI_Recv(param,3,MPI_DOUBLE,0,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      stop = MPI_Wtime();
      tcomm += stop - start;
    } while(status.MPI_TAG != 0);
    printf("Process %d uses %lfs to calculate partial result %lf of %d portions and %lfs for communication \n", rank, tpar, partial_result, partial_count, tcomm);
  }
  
  /* Process 0 receives results and sends out work while there is still work left to be sent
   * (count < n) */
  if (rank == 0) {
    do {
      MPI_Recv(&local_result,1,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      result = result + local_result;    
      param[0] = a + count * h;
      param[1] = param[0] + h;
      param[2] = h;
      MPI_Send(param,3,MPI_DOUBLE,status.MPI_SOURCE,SEND,MPI_COMM_WORLD); 
      count = count + 1; 
    }   
    while (count < n);  
    /* Make sure that we receive everything */
    for (i = 0; i < (size - 1); i++){
      MPI_Recv(&local_result,1,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
      result = result + local_result;  
    } 
  }
  /* All the work has been sent, */
  if (rank == 0){
    for (i = 1; i < size; i++){
      MPI_Send(param,3,MPI_DOUBLE,i,STOP,MPI_COMM_WORLD);
    }
  }
    /* Print the result */
    if (rank == 0) {
        printf("With n = %d trapezoids, our estimate\n",
            n);
        printf("of the integral from %f to %f = %f\n",
            a, b, result);
    }
    /* Shut down MPI */
    MPI_Finalize();
} /*  main  */
float f(float x) {
    return ( x*x );
}
    
    
In [10]:
    
!mpicc codes/openmpi/dynamic.c -o ~/dynamic
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/dynamic 0 100 1000
    
    
In [11]:
    
%%writefile codes/openmpi/pi_mc.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <math.h>
int main(int argc, char* argv[]) {
  int nPointsInCircle  = 0;
  int i = 0;
  int nPointsTotal     = 0;
  int nPointsPerRegion = 0;
  int pointsReceived   = 0;
  double piEstimate;
  double x_start, y_start;
  double x_rand, y_rand, rand_radius; 
  int rank, size, squareWidth;
  MPI_Status status;
  nPointsTotal = atoi(argv[1]);
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
  // Seed RNG and make calculations for constants
  nPointsPerRegion = nPointsTotal / size;
  srand( (unsigned)time(NULL) + rank ); // seed differently per node
  squareWidth = (int) sqrt(size);
  // Place and record points in the circle
  x_start = (double)(rank % squareWidth) / squareWidth;
  y_start = (double)((rank / squareWidth)) / squareWidth;
  //printf("Rank %d out of %d has starting x %f and starting y %f on a square of size %d \n", 
  //       rank, size, x_start, y_start, squareWidth);
    
  for (i = 0; i < nPointsPerRegion; i++) {
    x_rand = (double)rand() / ((double)RAND_MAX * squareWidth) + x_start;
    y_rand = (double)rand() / ((double)RAND_MAX * squareWidth) + y_start;
    rand_radius = (x_rand - 0.5) * (x_rand - 0.5) + (y_rand - 0.5) * (y_rand - 0.5);
    if (rand_radius <= 0.25) {
      nPointsInCircle += 1;
    }
  }
  
  MPI_Reduce(&nPointsInCircle, &pointsReceived, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
  if (rank == 0) {
    piEstimate = (double)(pointsReceived * 4) / nPointsTotal;
    printf("%f\n", piEstimate);
  } 
  MPI_Finalize();
  return 0;
}
    
    
In [12]:
    
!mpicc -lm codes/openmpi/pi_mc.c -o ~/pi_mc
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/pi_mc 100000