In [1]:
N = 8; a = 0; b = 2; h = (b - a)/N;
In [2]:
# With 4 processors (cores)
size = 4; rank = 1
local_N = N / size
local_a = a + rank * h * local_N
local_b = local_a + h * local_N
print (local_a, local_b)
if (rank == i) {
do great things
}
In [3]:
%%writefile codes/openmpi/static.c
/*
* MPI implementation of the trapezoid approach to integral calculation following a static
* workload distribution and standard send()/recv() calls.
* We assume that the number of trapezoids is divisible by the number of MPI process.
*/
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
double Trap(double a, double b, int n);
double f(double x);
int main(int argc, char * argv[] ) {
int rank; /* rank of each MPI process */
int size; /* total number of MPI processes */
double a, b; /* default left and right endpoints of the interval */
int n; /* total number of trapezoids */
double h; /* height of the trapezoids */
double local_a, local_b; /* left and right endpoints on each MPI process */
int local_n; /* number of trapezoids assigned to each individual MPI process */
double result; /* final integration result */
double local_result; /* partial integration result at each process */
int p; /* counter */
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
a = atof(argv[1]);
b = atof(argv[2]);
n = atoi(argv[3]);
// calculate work interval for each process
h = (b - a) / n;
local_n = n / size;
local_a = a + rank * local_n * h;
local_b = local_a + local_n * h;
local_result = Trap(local_a,local_b,local_n);
// sending the results back to the master
if (rank == 0){
result = local_result;
for (p = 1; p < size; p++){
MPI_Recv(&local_result,1,MPI_DOUBLE,p,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
result += local_result;
}
}
else{
MPI_Send(&local_result,1,MPI_DOUBLE,0,0,MPI_COMM_WORLD);
}
// displaying output at the master node
if (rank == 0){
printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
printf("The integral is %lf\n", result);
}
MPI_Finalize();
}
double Trap(double a, double b, int n) {
double len, area;
double x;
int i;
len = (b - a) / n;
area = 0.5 * (f(a) + f(b));
x = a + len;
for (i=1; i<n; i++) {
area = area + f(x);
x = x + len;
}
area = area * len;
return area;
}
double f(double x) {
return ( x*x );
}
In [4]:
!mpicc codes/openmpi/static.c -o ~/static
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/static 0 100 1000
In [5]:
%%writefile codes/openmpi/static_wtiming.c
/*
* MPI implementation of the trapezoid approach to integral calculation following a static
* workload distribution and standard send()/recv() calls.
* We assume that the number of trapezoids is divisible by the number of MPI process.
*/
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"
double Trap(double a, double b, int n);
double f(double x);
int main(int argc, char * argv[] ) {
int rank; /* rank of each MPI process */
int size; /* total number of MPI processes */
double a, b; /* default left and right endpoints of the interval */
int n; /* total number of trapezoids */
double h; /* height of the trapezoids */
double local_a, local_b; /* left and right endpoints on each MPI process */
int local_n; /* number of trapezoids assigned to each individual MPI process */
double result; /* final integration result */
double local_result; /* partial integration result at each process */
double start, stop, tpar, tcomm; /* timing variables */
int p; /* counter */
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
a = atof(argv[1]);
b = atof(argv[2]);
n = atoi(argv[3]);
// calculate work interval for each process
start = MPI_Wtime();
h = (b - a) / n;
local_n = n / size;
local_a = a + rank * local_n * h;
local_b = local_a + local_n * h;
local_result = Trap(local_a,local_b,local_n);
stop = MPI_Wtime();
tpar = stop - start;
printf("Process %d uses %lfs to calculate partial result %lf\n", rank, tpar, local_result);
// sending the results back to the master
start = MPI_Wtime();
if (rank == 0){
result = local_result;
for (p = 1; p < size; p++){
MPI_Recv(&local_result,1,MPI_DOUBLE,p,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
result += local_result;
}
}
else{
MPI_Send(&local_result,1,MPI_DOUBLE,0,0,MPI_COMM_WORLD);
}
stop = MPI_Wtime();
tcomm = stop - start;
// displaying output at the master node
if (rank == 0){
printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
printf("The integral is %lf\n", result);
printf("Communication time: %.5fs\n",tcomm);
}
MPI_Finalize();
}
double Trap(double a, double b, int n) {
double len, area;
double x;
int i;
len = (b - a) / n;
area = 0.5 * (f(a) + f(b));
x = a + len;
for (i=1; i<n; i++) {
area = area + f(x);
x = x + len;
}
area = area * len;
return area;
}
double f(double x) {
return ( x*x );
}
In [6]:
!mpicc codes/openmpi/static_wtiming.c -o ~/static_wtiming
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/static_wtiming 0 100 1000
In [7]:
%%writefile codes/openmpi/cyclic.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
float f(float x);
main(int argc, char** argv) {
int rank; /* rank of each MPI process */
int size; /* total number of MPI processes */
double a, b; /* default left and right endpoints of the interval */
int n; /* total number of trapezoids */
double h; /* height of the trapezoids */
double local_a, local_b; /* left and right endpoints on each MPI process */
int local_n; /* number of trapezoids assigned to each individual MPI process */
double result; /* final integration result */
double local_result; /* partial integration result at each process */
double start, stop, tpar, tcomm; /* timing variables */
int i; /* counter */
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
a = atof(argv[1]);
b = atof(argv[2]);
n = atoi(argv[3]);
// calculate work interval for each process
start = MPI_Wtime();
h = (b-a)/n; /* height is the same for all processes */
local_n = n/size; /* so is the number of trapezoids */
/* Each process' interval starts at: */
local_a = a + rank * h;
local_b = local_a + h;
local_result = 0;
for (i = 0; i < n/size; i++){
local_result = local_result + h * (f(local_a) + f(local_b)) / 2;
local_a += h * size;
local_b = local_a + h;
}
stop = MPI_Wtime();
tpar = stop - start;
printf("Process %d uses %lfs to calculate partial result %lf\n", rank, tpar, local_result);
// sending the results back to the master using reduce
start = MPI_Wtime();
MPI_Reduce(&local_result, &result, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
stop = MPI_Wtime();
tcomm = stop - start;
/* Print the result */
if (rank == 0){
printf("Calculating the integral of f(x) from %lf to %lf\n", a, b);
printf("The integral is %lf\n", result);
printf("Communication time: %.5fs\n",tcomm);
}
MPI_Finalize();
}
float f(float x) {
return ( x*x );
}
In [8]:
!mpicc codes/openmpi/cyclic.c -o ~/cyclic
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/cyclic 0 100 1000
In [9]:
%%writefile codes/openmpi/dynamic.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#define SEND 1
#define STOP 0
float f(float x);
main(int argc, char** argv) {
int rank; /* rank of each MPI process */
int size; /* total number of MPI processes */
double a, b; /* default left and right endpoints of the interval */
int n; /* total number of trapezoids */
double h; /* height of the trapezoids */
double param[3]; /* array containing end points and height for each individual trapezoid
for communication purpose */
double local_result = 0.0; /* area of each individual trapezoid */
double partial_result = 0.0; /* amount of area calculated by each process */
double result = 0.0; /* Total integral */
int source; /* Process sending the partial integral */
int dest = 0; /* All messages go to 0 */
int tag = 0;
double start, stop, tpar, tcomm;
int i,count, partial_count;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
start = MPI_Wtime();
/* initial job distribution is handled only by process 0 */
if (rank == 0){
a = atof(argv[1]);
b = atof(argv[2]);
n = atoi(argv[3]);
h = (b-a)/n;
count = 0;
/* send out the first round of work assignment, incrementing count as needed */
for (i = 1; i < size; i++){
param[0] = a + count * h;
param[1] = param[0] + h;
param[2] = h;
MPI_Send(param,3,MPI_DOUBLE,i,SEND,MPI_COMM_WORLD);
count = count + 1;
}
}
else {
MPI_Recv(param,3,MPI_DOUBLE,0,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
}
tpar = 0.0;
tcomm = 0.0;
partial_count = 0;
/* Each process that is not process 0 works on its portion, send the partial result back to 0,
* and wait for new workload unless the TAG of the message is 0
*/
if (rank != 0){
do {
start = MPI_Wtime();
local_result = param[2] * (f(param[1]) + f(param[0])) / 2;
partial_result += local_result;
stop = MPI_Wtime();
tpar += stop - start;
partial_count += 1;
start = MPI_Wtime();
MPI_Send(&local_result,1,MPI_DOUBLE,0,SEND,MPI_COMM_WORLD);
MPI_Recv(param,3,MPI_DOUBLE,0,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
stop = MPI_Wtime();
tcomm += stop - start;
} while(status.MPI_TAG != 0);
printf("Process %d uses %lfs to calculate partial result %lf of %d portions and %lfs for communication \n", rank, tpar, partial_result, partial_count, tcomm);
}
/* Process 0 receives results and sends out work while there is still work left to be sent
* (count < n) */
if (rank == 0) {
do {
MPI_Recv(&local_result,1,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
result = result + local_result;
param[0] = a + count * h;
param[1] = param[0] + h;
param[2] = h;
MPI_Send(param,3,MPI_DOUBLE,status.MPI_SOURCE,SEND,MPI_COMM_WORLD);
count = count + 1;
}
while (count < n);
/* Make sure that we receive everything */
for (i = 0; i < (size - 1); i++){
MPI_Recv(&local_result,1,MPI_DOUBLE,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&status);
result = result + local_result;
}
}
/* All the work has been sent, */
if (rank == 0){
for (i = 1; i < size; i++){
MPI_Send(param,3,MPI_DOUBLE,i,STOP,MPI_COMM_WORLD);
}
}
/* Print the result */
if (rank == 0) {
printf("With n = %d trapezoids, our estimate\n",
n);
printf("of the integral from %f to %f = %f\n",
a, b, result);
}
/* Shut down MPI */
MPI_Finalize();
} /* main */
float f(float x) {
return ( x*x );
}
In [10]:
!mpicc codes/openmpi/dynamic.c -o ~/dynamic
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/dynamic 0 100 1000
In [11]:
%%writefile codes/openmpi/pi_mc.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <math.h>
int main(int argc, char* argv[]) {
int nPointsInCircle = 0;
int i = 0;
int nPointsTotal = 0;
int nPointsPerRegion = 0;
int pointsReceived = 0;
double piEstimate;
double x_start, y_start;
double x_rand, y_rand, rand_radius;
int rank, size, squareWidth;
MPI_Status status;
nPointsTotal = atoi(argv[1]);
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
// Seed RNG and make calculations for constants
nPointsPerRegion = nPointsTotal / size;
srand( (unsigned)time(NULL) + rank ); // seed differently per node
squareWidth = (int) sqrt(size);
// Place and record points in the circle
x_start = (double)(rank % squareWidth) / squareWidth;
y_start = (double)((rank / squareWidth)) / squareWidth;
//printf("Rank %d out of %d has starting x %f and starting y %f on a square of size %d \n",
// rank, size, x_start, y_start, squareWidth);
for (i = 0; i < nPointsPerRegion; i++) {
x_rand = (double)rand() / ((double)RAND_MAX * squareWidth) + x_start;
y_rand = (double)rand() / ((double)RAND_MAX * squareWidth) + y_start;
rand_radius = (x_rand - 0.5) * (x_rand - 0.5) + (y_rand - 0.5) * (y_rand - 0.5);
if (rand_radius <= 0.25) {
nPointsInCircle += 1;
}
}
MPI_Reduce(&nPointsInCircle, &pointsReceived, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
if (rank == 0) {
piEstimate = (double)(pointsReceived * 4) / nPointsTotal;
printf("%f\n", piEstimate);
}
MPI_Finalize();
return 0;
}
In [12]:
!mpicc -lm codes/openmpi/pi_mc.c -o ~/pi_mc
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/pi_mc 100000