What is large-scale and cluster computing?
mpi4py
and ...)pyopencl
and pycuda
)cloud
, mrjob
, and Apache's mesos
/spark
/hadoop
/zookeeper
/...)We'll focus on mpi4py
, as it's probably the most stable and active of the python MPI modules, and generally provides the most in terms of classic scalability to instutional class resources.
Getting started with mpi4py
easy_install
-- thanks MPI"hello"
In [2]:
%%file hellompi.py
"""
Parallel Hello World
"""
from mpi4py import MPI
import sys
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()
sys.stdout.write(
"Hello, World! I am process %d of %d on %s.\n"
% (rank, size, name))
mpiexec
In [4]:
!mpiexec -n 4 python2.7 hellompi.py
Point to point communication
In [24]:
%%file mpipt2pt.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
if rank == 0:
data = range(10)
more = range(0,20,2)
print 'rank %i sends data:' % rank, data
comm.send(data, dest=1, tag=1337)
print 'rank %i sends data:' % rank, more
comm.send(more, dest=2 ,tag=1456)
elif rank == 1:
data = comm.recv(source=0, tag=1337)
print 'rank %i got data:' % rank, data
elif rank == 2:
more = comm.recv(source=0, tag=1456)
print 'rank %i got data:' % rank, more
In [25]:
!mpiexec -n 4 python2.7 mpipt2pt.py
In [45]:
%%file mpipt2pt2.py
'''nonblocking communication
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
pair = {0:1, 1:0} # rank 0 sends to 1 and vice versa
sendbuf = np.zeros(5) + rank
recvbuf = np.empty_like(sendbuf)
print 'rank %i sends data:' % rank, sendbuf
sreq = comm.Isend(sendbuf, dest=pair[rank], tag=1337)
rreq = comm.Irecv(recvbuf, source=pair[rank], tag=1337)
# rreq.Wait(); sreq.Wait()
MPI.Request.Waitall([rreq, sreq])
if rank == 1:
time.sleep(0.001) # delay slightly for better printing
print 'rank %i got data:' % rank, recvbuf
In [46]:
!mpiexec -n 2 python2.7 mpipt2pt2.py
Collective communication
In [52]:
%%file mpiscattered.py
'''mpi scatter
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
if rank == 0:
data = np.arange(10)
print 'rank %i has data' % rank, data
data_split_list = np.array_split(data, size)
else:
data_split_list = None
data_split = comm.scatter(data_split_list, root=0)
# some delays for printing purposes
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i got data' % rank, data_split
In [53]:
!mpiexec -n 3 python2.7 mpiscattered.py
In [70]:
%%file mpibroadcasted.py
'''mpi broadcast
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
N = 10.
data = np.arange(N) if rank == 0 else np.zeros(N)
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i has data' % rank, data
comm.Bcast(data, root=0)
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i got data' % rank, data
In [71]:
!mpiexec -n 3 python2.7 mpibroadcasted.py
Not covered: shared memory and shared objects
Better serialization
In [27]:
from mpi4py import MPI
try:
import dill
MPI._p_pickle.dumps = dill.dumps
MPI._p_pickle.loads = dill.loads
except ImportError, AttributeError:
pass
Working with cluster schedulers, the JOB
file
In [47]:
%%file jobscript.sh
#!/bin/sh
#PBS -l nodes=1:ppn=4
#PBS -l walltime=00:03:00
cd ${PBS_O_WORKDIR} || exit 2
mpiexec -np 4 python hellompi.py
Beyond mpi4py
Pool
: pyina
and emcee.utils
In [8]:
%%file pyinapool.py
def test_pool(obj):
from pyina.launchers import Mpi
x = range(6)
p = Mpi(8)
# worker pool strategy + dill
p.scatter = False
print p.map(obj, x)
# worker pool strategy + dill.source
p.source = True
print p.map(obj, x)
# scatter-gather strategy + dill.source
p.scatter = True
print p.map(obj, x)
# scatter-gather strategy + dill
p.source = False
print p.map(obj, x)
if __name__ == '__main__':
from math import sin
f = lambda x:x+1
def g(x):
return x+2
for func in [g, f, abs, sin]:
test_pool(func)
In [9]:
!python2.7 pyinapool.py
For emcee, see: http://dan.iel.fm/emcee/current/user/advanced/
MPI_import
pyina
and IPython.parallel
pyina
and ipython-cluster-helper
multiprocessing
interface: ...We are in the tall weeds here...
The other end of the spectrum is high-performance parallel instead of large-scale parallel.