All processes send data to rank 0, and 0 writes it to the file
Each process writes to a separate file
What is Parallel I/O?
Multiple processes of a parallel program accessing data (reading or writing) from a common file
Why Parallel I/O?
Why is MPI a good setting for Parallel I/O?
Opening a File
int MPI_File_open(MPI_Comm comm, const char *filename, int amode, MPI_Info info, MPI_File *fh)
The modes MPI_MODE_RDONLY, MPI_MODE_RDWR, MPI_MODE_WRONLY, and MPI_MODE_CREATE have identical semantics to their POSIX counterparts. It is erroneous to specify MPI_MODE_CREATE in conjunction with MPI_MODE_RDONLY. Errors related to the access mode are raised in the class MPI_ERR_AMODE.
Set File View
int MPI_File_set_view(MPI_File fh, MPI_Offset disp, MPI_Datatype etype, MPI_Datatype filetype, const char *datarep, MPI_Info info)
Reading a File
int MPI_File_read(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status)
Writing to a File
int MPI_File_write(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_Status *status)
Seeking inside a File
int MPI_File_seek(MPI_File fh, MPI_Offset offset, int whence)
MPI_File_seek updates the individual file pointer according to whence, which could have the following possible values:
The offset can be negative, which allows seeking backwards. It is erroneous to seek to a negative position in the file. The end of the file is defined to be the location of the next elementary data item immediately after the last accessed data item, even if that location is a hole.
Closing a File
MPI_File_close(MPI_File *fh)
In [1]:
%%writefile codes/openmpi/mpiio_seqwrite.c
#include "mpi.h"
#include <stdio.h>
#define array_size 4
static char filename[] = "output.dat";
main(int argc, char **argv)
{
int rank, size;
MPI_File outfile;
MPI_Status status;
MPI_Datatype arraytype;
int nbytes, myarray[array_size], mode, i;
double start, stop, resolution;
/* For testing purposes */
FILE *fptr;
int oneNum;
/* initialize MPI */
MPI_Init( &argc, &argv );
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Type_contiguous(array_size, MPI_INT, &arraytype);
MPI_Type_commit(&arraytype);
/* initialize array */
for (i=0; i < array_size; i++) {
myarray[i] = rank;
}
/* open file */
mode = MPI_MODE_CREATE | MPI_MODE_WRONLY;
MPI_File_open(MPI_COMM_WORLD, filename, mode, MPI_INFO_NULL, &outfile);
/* set file view */
MPI_File_set_view(outfile, rank * array_size * sizeof(MPI_INT), MPI_INT, arraytype, "native", MPI_INFO_NULL);
/* write buffer to file*/
MPI_File_write(outfile, &myarray[0], array_size, MPI_INT, &status);
/* print out number of bytes written */
MPI_Get_elements(&status, MPI_CHAR, &nbytes);
printf("TASK %d wrote %d bytes\n", rank, nbytes);
/* close file */
MPI_File_close( &outfile );
MPI_Barrier(MPI_COMM_WORLD);
/* finalize MPI */
MPI_Finalize();
return 0;
}
In [2]:
!mpicc codes/openmpi/mpiio_seqwrite.c -o ~/mpiio_seqwrite
!rm output.dat
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/mpiio_seqwrite
!hexdump -C output.dat
In [5]:
%%writefile codes/openmpi/mpiio_seqread.c
#include "mpi.h"
#include <stdio.h>
#define array_size 4
static char filename[] = "output.dat";
main(int argc, char **argv)
{
int rank, size;
MPI_File infile;
MPI_Status status;
int nbytes, myarray[array_size], mode, i;
double start, stop;
/* initialize MPI */
MPI_Init( &argc, &argv );
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
MPI_Comm_size( MPI_COMM_WORLD, &size );
/* open file */
mode = MPI_MODE_RDONLY;
MPI_File_open( MPI_COMM_WORLD, filename, mode, MPI_INFO_NULL, &infile );
/* set file view */
MPI_File_set_view( infile, rank*array_size*sizeof(MPI_INT), MPI_INT, MPI_INT, "native", MPI_INFO_NULL );
/* read file */
MPI_File_read( infile, &myarray[0], array_size, MPI_INT, &status );
/* close file */
MPI_File_close( &infile );
/* print out results */
for (i=0; i < array_size; i++)
printf("%2d%c", myarray[i], i%4==3 ? '\n' : ' ');
/* finalize MPI */
MPI_Finalize();
}
In [6]:
!mpicc codes/openmpi/mpiio_seqread.c -o ~/mpiio_seqread
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/mpiio_seqread
In [7]:
%%writefile codes/openmpi/mpiio_cirwrite.c
#include "mpi.h"
#include <stdio.h>
#define array_size 4
static char filename[] = "output.dat";
main(int argc, char **argv)
{
int rank, size;
MPI_File outfile;
MPI_Status status;
MPI_Datatype arraytype;
int nbytes, myarray[array_size], mode, i;
double start, stop, resolution;
/* initialize MPI */
MPI_Init( &argc, &argv );
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Type_vector(array_size, 1, size * array_size, MPI_INT, &arraytype);
MPI_Type_commit(&arraytype);
/* initialize array */
for (i=0; i < array_size; i++) {
myarray[i] = rank;
}
/* open file */
mode = MPI_MODE_CREATE | MPI_MODE_WRONLY;
MPI_File_open(MPI_COMM_WORLD, filename, mode, MPI_INFO_NULL, &outfile);
/* set file view */
MPI_File_set_view(outfile, rank * sizeof(MPI_INT), MPI_INT, arraytype, "native", MPI_INFO_NULL);
/* write buffer to file*/
MPI_File_write(outfile, &myarray[0], array_size, MPI_INT, &status);
/* print out number of bytes written */
MPI_Get_elements(&status, MPI_CHAR, &nbytes);
printf("TASK %d wrote %d bytes\n", rank, nbytes);
/* close file */
MPI_File_close( &outfile );
/* finalize MPI */
MPI_Finalize();
return 0;
}
In [8]:
!mpicc codes/openmpi/mpiio_cirwrite.c -o ~/mpiio_cirwrite
!rm output.dat
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/mpiio_cirwrite
!hexdump -C output.dat
In [9]:
%%writefile codes/openmpi/mpiio_cirread.c
#include "mpi.h"
#include <stdio.h>
#define array_size 4
static char filename[] = "output.dat";
main(int argc, char **argv)
{
int rank, size;
MPI_File infile;
MPI_Status status;
MPI_Datatype arraytype;
int nbytes, myarray[array_size], mode, i;
double start, stop;
/* initialize MPI */
MPI_Init( &argc, &argv );
MPI_Comm_rank( MPI_COMM_WORLD, &rank );
MPI_Comm_size( MPI_COMM_WORLD, &size );
MPI_Type_vector(array_size, 1, size*array_size, MPI_INT, &arraytype);
MPI_Type_commit(&arraytype);
/* open file */
mode = MPI_MODE_RDONLY;
MPI_File_open( MPI_COMM_WORLD, filename, mode, MPI_INFO_NULL, &infile );
/* set file view */
MPI_File_set_view( infile, rank*sizeof(MPI_INT), MPI_INT, arraytype, "native", MPI_INFO_NULL );
/* read file */
MPI_File_read( infile, &myarray[0], array_size, MPI_INT, &status );
/* close file */
MPI_File_close( &infile );
/* print out results */
for (i=0; i < array_size; i++)
printf("%2d%c", myarray[i], i%4==3 ? '\n' : ' ');
/* finalize MPI */
MPI_Finalize();
}
In [10]:
!mpicc codes/openmpi/mpiio_cirread.c -o ~/mpiio_cirread
!mpirun -np 4 --map-by core:OVERSUBSCRIBE ~/mpiio_cirread
In [6]:
%%writefile codes/openmpi/mpiio_multiples.c
/*
* (C) 2001 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpi.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#define SIZE (6553600)
/* Each process writes to separate files and reads them back.
The file name is taken as a command-line argument, and the process rank
is appended to it. */
int main(int argc, char **argv)
{
int *buf, i, rank, nints, len;
char *filename, *tmp;
int errs = 0, toterrs;
MPI_File fh;
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
/* Process 0 takes the file name as a command-line argument and
broadcasts it to other processes */
if (!rank) {
i = 1;
while ((i < argc) && strcmp("-fname", *argv)) {
i++;
argv++;
}
if (i >= argc) {
fprintf(stderr, "\n*# Usage: simple -fname filename\n\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
argv++;
len = strlen(*argv);
filename = (char *) malloc(len+10);
strcpy(filename, *argv);
MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(filename, len+10, MPI_CHAR, 0, MPI_COMM_WORLD);
} else {
MPI_Bcast(&len, 1, MPI_INT, 0, MPI_COMM_WORLD);
filename = (char *) malloc(len+10);
MPI_Bcast(filename, len+10, MPI_CHAR, 0, MPI_COMM_WORLD);
}
buf = (int *) malloc(SIZE);
nints = SIZE/sizeof(int);
for (i=0; i<nints; i++){
buf[i] = rank*100000 + i;
}
/* each process opens a separate file called filename.'myrank' */
tmp = (char *) malloc(len+10);
strcpy(tmp, filename);
sprintf(filename, "%s.%d", tmp, rank);
MPI_File_open(MPI_COMM_SELF, filename, MPI_MODE_CREATE | MPI_MODE_RDWR,MPI_INFO_NULL, &fh);
MPI_File_write(fh, buf, nints, MPI_INT, &status);
MPI_File_close(&fh);
/* reopen the file and read the data back */
for (i=0; i<nints; i++){
buf[i] = 0;
}
MPI_File_open(MPI_COMM_SELF, filename, MPI_MODE_CREATE | MPI_MODE_RDWR, MPI_INFO_NULL, &fh);
MPI_File_read(fh, buf, nints, MPI_INT, &status);
MPI_File_close(&fh);
/* check if the data read is correct */
for (i=0; i<nints; i++) {
if (buf[i] != (rank*100000 + i)) {
errs++;
fprintf(stderr, "Process %d: error, read %d, should be %d\n", rank, buf[i], rank*100000+i);
}
}
MPI_Allreduce( &errs, &toterrs, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD );
if (rank == 0) {
if( toterrs > 0) {
fprintf( stderr, "Found %d errors\n", toterrs );
}
else {
fprintf( stdout, " No Errors\n" );
}
}
free(buf);
free(filename);
free(tmp);
MPI_Finalize();
return 0;
}
In [7]:
!mpicc codes/openmpi/mpiio_multiples.c -o ~/mpiio_multiples
!mpirun -np 8 --map-by core:OVERSUBSCRIBE ~/mpiio_multiples -fname testmpiio
A "Bridges regular memory" allocation allows you to use Bridges' RSM (128GB) nodes. Partitions available to "Bridges regular memory" allocations are
A "Bridges GPU" allocation allows you to use Bridges' GPU nodes. Partitions available to "Bridges GPU" allocations are:
In [ ]:
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#define array_size 512000000
static char filename[] = "output.dat";
main(int argc, char **argv)
{
int rank, size, offset, local_size;
MPI_File outfile;
MPI_Status status;
int nbytes, mode, i;
int *myarray;
/* initialize MPI */
MPI_Init( &argc, &argv );
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
/* initialize array */
local_size = array_size / size;
myarray = malloc(local_size * sizeof(int));
for (i=0; i < local_size; i++) {
myarray[i] = rank;
}
/* open file */
mode = MPI_MODE_CREATE | MPI_MODE_WRONLY;
MPI_File_open(MPI_COMM_WORLD, filename, mode, MPI_INFO_NULL, &outfile);
offset = rank * (array_size / size) * sizeof(int);
/* write buffer to file*/
MPI_File_write_at(outfile, offset, myarray, local_size, MPI_INT, &status);
/* print out number of bytes written */
MPI_Get_elements(&status, MPI_CHAR, &nbytes);
printf("TASK %d wrote %d bytes\n", rank, nbytes);
/* close file */
MPI_File_close( &outfile );
MPI_Barrier(MPI_COMM_WORLD);
/* finalize MPI */
MPI_Finalize();
return 0;
}
In [3]:
%%writefile codes/mpi4py/mpiio_bigwrite.py
#!/usr/bin/env python
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
amode = MPI.MODE_WRONLY|MPI.MODE_CREATE
comm = MPI.COMM_WORLD
fh = MPI.File.Open(comm, "/scratch1/lngo/datafile.contig", amode)
local_count = (int)(1600000000 / size)
buffer = np.empty(local_count, dtype=np.int)
buffer[:] = rank
offset = comm.Get_rank()*buffer.nbytes
fh.Write_at_all(offset, buffer)
fh.Close()
!chmod 755 codes/mpi4py/mpiio_bigwrite.py
!module load gcc/5.3.0 openmpi/1.10.3;time mpirun -np 8 --mca mpi_cuda_support 0 codes/mpi4py/mpiio_bigwrite.py
!ls -lh /scratch1/lngo/datafile.contig; rm /scratch1/lngo/datafile.contig
!module load gcc/5.3.0 openmpi/1.10.3;time mpirun -np 16 --mca mpi_cuda_support 0 codes/mpi4py/mpiio_bigwrite.py
!ls -lh /scratch1/lngo/datafile.contig; rm /scratch1/lngo/datafile.contig
!module load gcc/5.3.0 openmpi/1.10.3;time mpirun -np 32 --mca mpi_cuda_support 0 codes/mpi4py/mpiio_bigwrite.py
!ls -lh /scratch1/lngo/datafile.contig; rm /scratch1/lngo/datafile.contig
real 1m36.914s
user 0m4.480s
sys 0m25.080s
-rw-r--r-- 1 lngo bigdata 12G Oct 23 11:35 /scratch1/lngo/datafile.contig
real 0m13.298s
user 0m15.483s
sys 0m28.579s
-rw-r--r-- 1 lngo bigdata 12G Oct 23 11:40 /scratch1/lngo/datafile.contig
real 0m9.252s
user 0m21.329s
sys 0m14.430s
-rw-r--r-- 1 lngo bigdata 12G Oct 23 11:40 /scratch1/lngo/datafile.contig
In [5]:
%%writefile codes/mpi4py/mpiio_bigwrite_2.py
#!/usr/bin/env python
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
amode = MPI.MODE_WRONLY|MPI.MODE_CREATE
comm = MPI.COMM_WORLD
fh = MPI.File.Open(comm, "/home/lngo/datafile.contig", amode)
local_count = (int)(1600000000 / size)
buffer = np.empty(local_count, dtype=np.int)
buffer[:] = rank
offset = comm.Get_rank()*buffer.nbytes
fh.Write_at_all(offset, buffer)
fh.Close()
!chmod 755 codes/mpi4py/mpiio_bigwrite_2.py
!module load gcc/5.3.0 openmpi/1.10.3;time mpirun -np 8 --mca mpi_cuda_support 0 codes/mpi4py/mpiio_bigwrite_2.py
!ls -lh /home/lngo/datafile.contig; rm /home/lngo/datafile.contig
!module load gcc/5.3.0 openmpi/1.10.3;time mpirun -np 16 --mca mpi_cuda_support 0 codes/mpi4py/mpiio_bigwrite_2.py
!ls -lh /home/lngo/datafile.contig; rm /home/lngo/datafile.contig
!module load gcc/5.3.0 openmpi/1.10.3;time mpirun -np 32 --mca mpi_cuda_support 0 codes/mpi4py/mpiio_bigwrite_2.py
!ls -lh /home/lngo/datafile.contig; rm /home/lngo/datafile.contig
real 0m27.454s
user 0m2.941s
sys 0m31.769s
-rw-r--r-- 1 lngo bigdata 12G Oct 23 11:41 /home/lngo/datafile.contig
real 0m29.080s
user 0m58.651s
sys 0m24.621s
-rw-r--r-- 1 lngo bigdata 12G Oct 23 11:41 /home/lngo/datafile.contig
real 0m28.408s
user 1m45.810s
sys 0m17.878s
-rw-r--r-- 1 lngo bigdata 12G Oct 23 11:42 /home/lngo/datafile.contig