What is large-scale and cluster computing?
mpi4py and ...)pyopencl and pycuda)cloud, mrjob, and Apache's mesos/spark/hadoop/zookeeper/...)We'll focus on mpi4py, as it's probably the most stable and active of the python MPI modules, and generally provides the most in terms of classic scalability to instutional class resources.
Getting started with mpi4py
easy_install -- thanks MPI"hello"
In [2]:
%%file hellompi.py
"""
Parallel Hello World
"""
from mpi4py import MPI
import sys
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()
sys.stdout.write(
"Hello, World! I am process %d of %d on %s.\n"
% (rank, size, name))
mpiexec
In [4]:
!mpiexec -n 4 python2.7 hellompi.py
Point to point communication
In [24]:
%%file mpipt2pt.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
if rank == 0:
data = range(10)
more = range(0,20,2)
print 'rank %i sends data:' % rank, data
comm.send(data, dest=1, tag=1337)
print 'rank %i sends data:' % rank, more
comm.send(more, dest=2 ,tag=1456)
elif rank == 1:
data = comm.recv(source=0, tag=1337)
print 'rank %i got data:' % rank, data
elif rank == 2:
more = comm.recv(source=0, tag=1456)
print 'rank %i got data:' % rank, more
In [25]:
!mpiexec -n 4 python2.7 mpipt2pt.py
In [45]:
%%file mpipt2pt2.py
'''nonblocking communication
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
pair = {0:1, 1:0} # rank 0 sends to 1 and vice versa
sendbuf = np.zeros(5) + rank
recvbuf = np.empty_like(sendbuf)
print 'rank %i sends data:' % rank, sendbuf
sreq = comm.Isend(sendbuf, dest=pair[rank], tag=1337)
rreq = comm.Irecv(recvbuf, source=pair[rank], tag=1337)
# rreq.Wait(); sreq.Wait()
MPI.Request.Waitall([rreq, sreq])
if rank == 1:
time.sleep(0.001) # delay slightly for better printing
print 'rank %i got data:' % rank, recvbuf
In [46]:
!mpiexec -n 2 python2.7 mpipt2pt2.py
Collective communication
In [52]:
%%file mpiscattered.py
'''mpi scatter
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
if rank == 0:
data = np.arange(10)
print 'rank %i has data' % rank, data
data_split_list = np.array_split(data, size)
else:
data_split_list = None
data_split = comm.scatter(data_split_list, root=0)
# some delays for printing purposes
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i got data' % rank, data_split
In [53]:
!mpiexec -n 3 python2.7 mpiscattered.py
In [70]:
%%file mpibroadcasted.py
'''mpi broadcast
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
N = 10.
data = np.arange(N) if rank == 0 else np.zeros(N)
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i has data' % rank, data
comm.Bcast(data, root=0)
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i got data' % rank, data
In [71]:
!mpiexec -n 3 python2.7 mpibroadcasted.py
Not covered: shared memory and shared objects
Better serialization
In [27]:
from mpi4py import MPI
try:
import dill
MPI._p_pickle.dumps = dill.dumps
MPI._p_pickle.loads = dill.loads
except ImportError, AttributeError:
pass
Working with cluster schedulers, the JOB file
In [47]:
%%file jobscript.sh
#!/bin/sh
#PBS -l nodes=1:ppn=4
#PBS -l walltime=00:03:00
cd ${PBS_O_WORKDIR} || exit 2
mpiexec -np 4 python hellompi.py
Beyond mpi4py
Pool: pyina and emcee.utils
In [8]:
%%file pyinapool.py
def test_pool(obj):
from pyina.launchers import Mpi
x = range(6)
p = Mpi(8)
# worker pool strategy + dill
p.scatter = False
print p.map(obj, x)
# worker pool strategy + dill.source
p.source = True
print p.map(obj, x)
# scatter-gather strategy + dill.source
p.scatter = True
print p.map(obj, x)
# scatter-gather strategy + dill
p.source = False
print p.map(obj, x)
if __name__ == '__main__':
from math import sin
f = lambda x:x+1
def g(x):
return x+2
for func in [g, f, abs, sin]:
test_pool(func)
In [9]:
!python2.7 pyinapool.py
For emcee, see: http://dan.iel.fm/emcee/current/user/advanced/
MPI_importpyina and IPython.parallelpyina and ipython-cluster-helpermultiprocessing interface: ...We are in the tall weeds here...
The other end of the spectrum is high-performance parallel instead of large-scale parallel.