Partitioning simply divides the problem into parts and then compute the parts and combine results
The basis of all parallel programming, in one form or another.
Pleasantly parallel used partitioning without any interaction between the parts.
Most partitioning formulation require the results of the parts to be combined to obtain the desired results.
Partitioning can be applied to the program data. This is call data partitioning or domain decomposition.
Partitioning can also be applied to the functions of a program. This is called functional decomposition.
Characterized by dividing problem into sub-problems of same form as larger problem. Further divisions into still smaller sub-problems, usually done by recursion.
Recursive divide and conquer amenable to parallelization because separate processes can be used for divided pairs. Also usually data is naturally localized.
In [3]:
%%writefile codes/openmpi/divide.c
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include "mpi.h"
int main(int argc, char * argv[] ) {
int rank; /* rank of each MPI process */
int size; /* total number of MPI processes */
int i; /* counter */
int distance; /* distance between sender and receiver */
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
/* Am I sender or receiver? */
/* Who am I sending/receiving to/from */
distance = 1;
i = 1;
while (distance <= size / 2){
if (rank < distance) {
printf ("At time step %d, sender %d sends to %d\n", i, rank, rank + distance);
}
if ((rank >= distance) && (rank < distance * 2)){
printf ("At time step %d, receiver %d receives from %d\n", i, rank, rank - distance);
}
printf ("Process %d has distance value %d and time step %d\n", rank, distance, i);
distance = distance * 2;
i += 1;
}
MPI_Finalize();
return 0;
}
In [4]:
!mpicc codes/openmpi/divide.c -lm -o ~/divide
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/divide
In [5]:
%%writefile codes/openmpi/conquer.c
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include "mpi.h"
int main(int argc, char * argv[] ) {
int rank; /* rank of each MPI process */
int size; /* total number of MPI processes */
int i; /* counter */
int distance; /* distance between sender and receiver */
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
/* Am I sender or receiver? */
/* Who am I sending/receiving to/from */
distance = (int)(size / 2);
i = 1;
while (distance >= 1){
if ((rank >= distance) && (rank < distance * 2)){
printf ("At time step %d, sender %d sends to %d\n", i, rank, rank - distance);
}
if (rank < distance) {
printf ("At time step %d, receiver %d receives from %d\n", i, rank, rank + distance);
}
distance = distance / 2;
i += 1;
}
MPI_Finalize();
return 0;
}
In [6]:
!mpicc codes/openmpi/conquer.c -lm -o ~/conquer
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/conquer
Many sorting algorithms can be parallelized by partitioning using divide and conquer
Simple approach to parallel bucket sort
In [ ]:
int MPI_Scatter(
void *sendbuf,
int sendcount,
MPI_Datatype sendtype,
void *recvbuf,
int recvcnt,
MPI_Datatype recvtype,
int root,
MPI_Comm comm);
In [ ]:
int MPI_Scatterv(
void *sendbuf,
int *sendcnts,
int *displs,
MPI_Datatype sendtype,
void *recvbuf,
int recvcnt,
MPI_Datatype recvtype,
int root,
MPI_Comm comm
);
In [7]:
%%writefile codes/openmpi/scatterv.c
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
int main(int argc, char *argv[])
{
int rank, size;
int i;
int sendcounts[4] = {1,2,3,4}; /* each process will receive its rank plus 1 numbers from the sendbuf array */
int displs[4] = {0,0,0,0}; /* array describing the displacements where each segment begins and is initialized to all 0s */
int sendbuf[10] = {2,13,4,3,5,1,0,12,10,8}; /* the buffer to be sent */
int *recvbuf; /* array at each process to receive data. To be initialized based on process rank */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
/* initializes recvbuf to contain exactly rank plus 1 numbers */
recvbuf = malloc(sizeof(int)* (rank + 1));
// calculate displacements
for (i = 1; i < 4; i++) {
displs[i] = displs[i-1] + sendcounts[i-1];
}
// divide the data among processes as described by sendcounts and displs
MPI_Scatterv(sendbuf, sendcounts, displs, MPI_INT, recvbuf, (rank + 1), MPI_INT, 0, MPI_COMM_WORLD);
// print what each process received
printf("%d: ", rank);
for (i = 0; i < sendcounts[rank]; i++) {
printf("%d ", recvbuf[i]);
}
printf("\n");
free(recvbuf);
MPI_Finalize();
return 0;
}
In [8]:
!mpicc codes/openmpi/scatterv.c -o ~/scatterv
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/scatterv
In [ ]:
int MPI_Gather(
void *sendbuff,
int sendcount,
MPI_Datatype sendtype,
void *recvbuff,
int recvcnt,
MPI_Datatype recvtype,
int root,
MPI_Comm comm);
In [ ]:
int MPI_Gatherv(
void *sendbuf,
int sendcnt,
MPI_Datatype sendtype,
void *recvbuf,
int *recvcnts,
int *displs,
MPI_Datatype recvtype,
int root,
MPI_Comm comm
);
In [13]:
%%writefile codes/openmpi/gatherv.c
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
int main(int argc, char *argv[])
{
int rank, size;
int i;
int recvcounts[4] = {1,2,3,4}; /* process 0 will receive from each process that process rank */
/* plus 1 numbers */
int displs[4] = {0,0,0,0}; /* array describing the displacements where each segment begins and is initialized to all 0s */
int *sendbuf; /* the buffer to be sent. will be initialized individually at each process */
int *recvbuf; /* arrayto receive data. will only be initialized at process 0*/
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
/* initializes recvbuf to receive 10 numbers */
if (rank == 0){
recvbuf = malloc(sizeof(int) * (10));
for (i = 0; i < 10; i ++)
recvbuf[i] = -1;
}
/* initializes sendbuf to receive 10 numbers */
sendbuf = malloc(sizeof(int) * (rank + 1));
for (i = 0; i < (rank + 1); i++){
sendbuf[i] = rank;
}
// calculate displacements
for (i = 1; i < 4; i++) {
displs[i] = displs[i-1] + recvcounts[i-1] - 1;
}
// divide the data among processes as described by sendcounts and displs
MPI_Gatherv(sendbuf, rank + 1, MPI_INT, recvbuf, recvcounts, displs, MPI_INT, 0, MPI_COMM_WORLD);
// print what process has at the end
if (rank == 0){
for (i = 0; i < 10; i++) {
printf("%d ", recvbuf[i]);
}
printf("\n");
free(recvbuf);
}
MPI_Finalize();
return 0;
}
In [14]:
!mpicc codes/openmpi/gatherv.c -o ~/gatherv
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/gatherv
Parallel Bucket Sort version 1
In [1]:
%%writefile codes/openmpi/bucket1.c
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#define N 64
int main(int argc, char* argv[]){
int rawNum[N];
int sortNum[N];
int* local_bucket;
int rank,size;
int* proc_count;
int* disp;
MPI_Status status;
int i,j,counter;
int local_min,local_max;
int tmp;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
if (rank == 0){
/* Initialize a random array with N integers whose values range between 0 and N */
for (i = 0; i < N; i++){
rawNum[i] = rand() % N;
}
}
/* Broadcast contents of rawNum from 0 to all other processes */
MPI_Bcast(rawNum, N, MPI_INT, 0, MPI_COMM_WORLD);
/* Each process only works with numbers within their assigned interval */
counter = 0;
local_min = rank * (N/size);
local_max = (rank + 1) * (N/size);
for (i = 0; i < N; i++){
if ((rawNum[i] >= local_min) && (rawNum[i] < local_max)){
counter += 1;
}
}
printf("For rank %d, max is %d, min is %d, and there are %d elements in rawNum that falls within max and min \n",
rank,local_max,local_min,counter);
/* Each process creates its own bucket containing values that fall within its interval */
local_bucket = malloc(counter * sizeof(int));
counter = 0;
for (i = 0; i < N; i++){
if ((rawNum[i] >= local_min) && (rawNum[i] < local_max)){
local_bucket[counter] = rawNum[i];
counter += 1;
}
}
/* Insertion sort */
for (i = 0; i < counter; i++){
for (j = i+1; j < counter; j++){
if (local_bucket[i] > local_bucket[j]){
tmp = local_bucket[i];
local_bucket[i] = local_bucket[j];
local_bucket[j] = tmp;
}
}
}
for (i = 0; i < counter; i++){
printf("%d %d \n",rank,local_bucket[i]);
}
/* set up root process */
if (rank == 0){
proc_count = malloc(size * sizeof(int));
disp = malloc(size * sizeof(int));
}
/* populate proc_count */
MPI_Gather(&counter,1,MPI_INT,proc_count,1,MPI_INT,0,MPI_COMM_WORLD);
if (rank == 0){
disp[0] = 0;
for (i = 0; i < size-1; i++){
disp[i+1] = disp[i] + proc_count[i];
}
}
// receive final result
MPI_Gatherv(local_bucket,counter,MPI_INT,sortNum,proc_count,disp,MPI_INT,0,MPI_COMM_WORLD);
if (rank == 0){
printf("Before sort: \n");
for (i = 0; i < N; i++) printf("%d ",rawNum[i]);
printf("\nAfter sort: \n");
for (i = 0; i < N; i++) printf("%d ",sortNum[i]);
}
MPI_Finalize();
return 0;
}
In [2]:
!mpicc codes/openmpi/bucket1.c -o ~/bucket1
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/bucket1
In [ ]:
int MPI_Alltoall(
void *sendbuf,
int sendcount,
MPI_Datatype sendtype,
void *recvbuf,
int recvcount,
MPI_Datatype recvtype,
MPI_Comm comm
);
In [5]:
%%writefile codes/openmpi/alltoall.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
int main(int argc,char *argv[]){
int rank, size;
int *sray,*rray;
int *sdisp,*scounts,*rdisp,*rcounts;
int ssize,rsize,i,k,j;
float z;
MPI_Init(&argc,&argv);
MPI_Comm_size( MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
scounts=(int*)malloc(sizeof(int)*size);
rcounts=(int*)malloc(sizeof(int)*size);
sdisp=(int*)malloc(sizeof(int)*size);
rdisp=(int*)malloc(sizeof(int)*size);
z = (float) rand() / RAND_MAX;
for(i=0; i < size; i++){
scounts[i]= rank * i + i;
}
printf("myid = %d scounts = ",rank);
for(i=0;i<size;i++)
printf("%d ",scounts[i]);
printf("\n");
/* send the data */
MPI_Alltoall(scounts,1,MPI_INT,rcounts,1,MPI_INT,MPI_COMM_WORLD);
printf("myid = %d rcounts = ",rank);
for(i=0;i<size;i++)
printf("%d ",rcounts[i]);
printf("\n");
MPI_Finalize();
}
In [6]:
!mpicc codes/openmpi/alltoall.c -o ~/alltoall
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/alltoall
In [ ]:
int MPI_Alltoallv(
void *sendbuf,
int *sendcnts,
int *sdispls,
MPI_Datatype sendtype,
void *recvbuf,
int *recvcnts,
int *rdispls,
MPI_Datatype recvtype,
MPI_Comm comm
);
In [7]:
%%writefile codes/openmpi/alltoallv.c
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
int main(int argc,char *argv[]){
int size, rank;
int *sray,*rray;
int *sdisp,*scounts,*rdisp,*rcounts;
int ssize,rsize,i,k,j;
float z;
MPI_Init(&argc,&argv);
MPI_Comm_size( MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
scounts=(int*)malloc(sizeof(int)*size);
rcounts=(int*)malloc(sizeof(int)*size);
sdisp=(int*)malloc(sizeof(int)*size);
rdisp=(int*)malloc(sizeof(int)*size);
/* find out how much data to send */
srand((unsigned int) rank);
for(i=0;i<size;i++){
z = (float) rand()/RAND_MAX;
scounts[i]=(int)(5.0 * z) + 1;
}
printf("rank= %d scounts= %d %d %d %d\n",rank,scounts[0],scounts[1],scounts[2],scounts[3]);
for(i=0;i<size;i++)
printf("%d ",scounts[i]);
printf("\n");
/* tell the other processors how much data is coming */
MPI_Alltoall(scounts,1,MPI_INT,rcounts,1,MPI_INT,MPI_COMM_WORLD);
/* calculate displacements and the size of the arrays */
sdisp[0]=0;
for(i=1;i<size;i++){
sdisp[i]=scounts[i-1]+sdisp[i-1];
}
rdisp[0]=0;
for(i=1;i<size;i++){
rdisp[i]=rcounts[i-1]+rdisp[i-1];
}
ssize=0;
rsize=0;
for(i=0;i<size;i++){
ssize=ssize+scounts[i];
rsize=rsize+rcounts[i];
}
/* allocate send and rec arrays */
sray=(int*)malloc(sizeof(int)*ssize);
rray=(int*)malloc(sizeof(int)*rsize);
for(i=0;i<ssize;i++)
sray[i]=rank;
/* send/rec different amounts of data to/from each processor */
MPI_Alltoallv( sray,scounts,sdisp,MPI_INT,rray,rcounts,rdisp,MPI_INT,MPI_COMM_WORLD);
printf("rank= %d rray=",rank);
for(i=0;i<rsize;i++)
printf("%d ",rray[i]);
printf("\n");
MPI_Finalize();
}
In [8]:
!mpicc codes/openmpi/alltoallv.c -o ~/alltoallv
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/alltoallv
In [9]:
%%writefile codes/openmpi/bucket2.c
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#define N 32
int main(int argc, char* argv[]){
int rank,size;
MPI_Status status;
int rawNum[N];
int sortNum[N];
int* local_array;
int i,j;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
// initialize the unsorted array at process 0
if (rank == 0){
for (i = 0; i < N; i++){
rawNum[i] = rand() % N;
}
}
// prepare the local container, then distribution equal portions of the
// unsorted array to all the processes from process 0
local_array = malloc((N/size) * sizeof(int));
MPI_Scatter(rawNum,(N/size),MPI_INT,local_array,(N/size),MPI_INT,0,MPI_COMM_WORLD);
// initialize the local bucket matrix
int local_bucket[size][N/size];
for (i = 0; i < size; i++){
for (j = 0; j < N/size; j++){
local_bucket[i][j] = RAND_MAX;
}
}
int counter = 0;
int local_min,local_max;
// populate the bucket matrix
for (i = 0; i < size; i++){
counter = 0;
for (j = 0; j < N/size; j++){
local_min = i * N/size;
local_max = (i + 1) * N / size;
if ((local_array[j] >= local_min)&&(local_array[j] < local_max)){
local_bucket[i][counter] = local_array[j];
counter += 1;
}
}
}
// sort the bucket matrix.
int tmp = 0;
for (i = 0; i < size; i++){
for (j = 0; j < N/size; j++){
for (counter = j; counter < N/size; counter++){
if (local_bucket[i][j] > local_bucket[i][counter]){
tmp = local_bucket[i][j];
local_bucket[i][j] = local_bucket[i][counter];
local_bucket[i][counter] = tmp;
}
}
}
}
// placing the number from the buckets back into the main array
counter = 0;
int array_counter[size];
for (i = 0; i < size; i++){
for (j = 0; j < N/size; j++){
if (local_bucket[i][j] != RAND_MAX){
local_array[counter] = local_bucket[i][j];
counter += 1;
}
else {
array_counter[i] = j;
printf("Rank %d counter %d \n",rank,array_counter[i]);
break;
}
}
}
/*
for (i = 0; i < N/size; i++){
printf("rank %d sorted num %d \n",rank,local_array[i]);
}
*/
if (rank == 0) printf("-----------------\n");
MPI_Barrier(MPI_COMM_WORLD);
// preparation for bucket gathering
int recvbuf[size];
int rdisp[size];
int sdisp[size];
sdisp[0] = 0;
for (i = 0; i < size - 1; i++){
sdisp[i+1] = sdisp[i] + array_counter[i];
printf("%d send displace %d \n",rank,sdisp[i+1]);
}
MPI_Alltoall(array_counter,1,MPI_INT,recvbuf,1,MPI_INT,MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
int sum = 0;
for (i = 0; i < size; i++){
sum += recvbuf[i];
printf("rank %d recvbuf %d \n",rank,recvbuf[i]);
}
printf("rank %d total recv buf %d \n", rank,sum);
MPI_Barrier(MPI_COMM_WORLD);
rdisp[0] = 0;
for (i = 0; i < size - 1; i++){
rdisp[i+1] = rdisp[i] + recvbuf[i];
printf("%d recv displace %d \n",rank,rdisp[i+1]);
}
int local_array_alltoall[sum];
// initialize local_array_alltoall for testing purpose
for (i = 0; i < sum; i++) local_array_alltoall[i] = -1;
MPI_Alltoallv(local_array,array_counter,sdisp,MPI_INT,local_array_alltoall,recvbuf,rdisp,MPI_INT,MPI_COMM_WORLD);
for (i = 0; i < sum; i++){
printf("rank %d semi-sorted num %d \n",rank,local_array_alltoall[i]);
}
// local sort on big bucket one more time
for (i = 0; i < sum; i++){
for (j = i; j < sum; j++){
if (local_array_alltoall[i] > local_array_alltoall[j]){
tmp = local_array_alltoall[i];
local_array_alltoall[i] = local_array_alltoall[j];
local_array_alltoall[j] = tmp;
}
}
}
// preparation for the final gathering
int proc_count[size];
int disp[size];
MPI_Gather(&sum,1,MPI_INT,proc_count,1,MPI_INT,0,MPI_COMM_WORLD);
if (rank == 0){
disp[0] = 0;
for (i = 0; i < size-1; i++){
disp[i+1] = disp[i] + proc_count[i];
}
}
MPI_Gatherv(local_array_alltoall,sum,MPI_INT,sortNum,proc_count,disp,MPI_INT,0,MPI_COMM_WORLD);
if (rank == 0){
printf("Before sort: \n");
for (i = 0; i < N; i++) printf("%d ",rawNum[i]);
printf("\nAfter sort: \n");
for (i = 0; i < N; i++) printf("%d ",sortNum[i]);
}
MPI_Finalize();
return 0;
}
In [10]:
!mpicc codes/openmpi/bucket2.c -o ~/bucket2
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/bucket2
Fundamental settings for most, if not all, of computational simulation problems:
Start with whole region in which one square contains the bodies (or particles).