What is large-scale and cluster computing?
mpi4py and ...)pyopencl and pycuda)cloud, mrjob, and Apache's mesos/spark/hadoop/zookeeper/...)We'll focus on mpi4py, as it's probably the most stable and active of the python MPI modules, and generally provides the most in terms of classic scalability to instutional class resources.
Getting started with mpi4py
easy_install -- thanks MPI"hello"
In [2]:
%%file hellompi.py
"""
Parallel Hello World
"""
from mpi4py import MPI
import sys
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()
sys.stdout.write(
"Hello, World! I am process %d of %d on %s.\n"
% (rank, size, name))
mpiexec
In [4]:
!mpiexec -n 4 python2.7 hellompi.py
Point to point communication
In [24]:
%%file mpipt2pt.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
if rank == 0:
data = range(10)
more = range(0,20,2)
print 'rank %i sends data:' % rank, data
comm.send(data, dest=1, tag=1337)
print 'rank %i sends data:' % rank, more
comm.send(more, dest=2 ,tag=1456)
elif rank == 1:
data = comm.recv(source=0, tag=1337)
print 'rank %i got data:' % rank, data
elif rank == 2:
more = comm.recv(source=0, tag=1456)
print 'rank %i got data:' % rank, more
In [25]:
!mpiexec -n 4 python2.7 mpipt2pt.py
In [45]:
%%file mpipt2pt2.py
'''nonblocking communication
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
pair = {0:1, 1:0} # rank 0 sends to 1 and vice versa
sendbuf = np.zeros(5) + rank
recvbuf = np.empty_like(sendbuf)
print 'rank %i sends data:' % rank, sendbuf
sreq = comm.Isend(sendbuf, dest=pair[rank], tag=1337)
rreq = comm.Irecv(recvbuf, source=pair[rank], tag=1337)
# rreq.Wait(); sreq.Wait()
MPI.Request.Waitall([rreq, sreq])
if rank == 1:
time.sleep(0.001) # delay slightly for better printing
print 'rank %i got data:' % rank, recvbuf
In [46]:
!mpiexec -n 2 python2.7 mpipt2pt2.py
Collective communication
In [52]:
%%file mpiscattered.py
'''mpi scatter
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
if rank == 0:
data = np.arange(10)
print 'rank %i has data' % rank, data
data_split_list = np.array_split(data, size)
else:
data_split_list = None
data_split = comm.scatter(data_split_list, root=0)
# some delays for printing purposes
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i got data' % rank, data_split
In [53]:
!mpiexec -n 3 python2.7 mpiscattered.py
In [70]:
%%file mpibroadcasted.py
'''mpi broadcast
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
N = 10.
data = np.arange(N) if rank == 0 else np.zeros(N)
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i has data' % rank, data
comm.Bcast(data, root=0)
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i got data' % rank, data
In [71]:
!mpiexec -n 3 python2.7 mpibroadcasted.py
Not covered: shared memory and shared objects
Better serialization
In [27]:
from mpi4py import MPI
try:
import dill
MPI._p_pickle.dumps = dill.dumps
MPI._p_pickle.loads = dill.loads
except ImportError, AttributeError:
pass
Working with cluster schedulers, the JOB file
In [47]:
%%file jobscript.sh
#!/bin/sh
#PBS -l nodes=1:ppn=4
#PBS -l walltime=00:03:00
cd ${PBS_O_WORKDIR} || exit 2
mpiexec -np 4 python hellompi.py
Beyond mpi4py
Pool: pyina and emcee.utils
In [8]:
%%file pyinapool.py
def test_pool(obj):
from pyina.launchers import Mpi
x = range(6)
p = Mpi(8)
# worker pool strategy + dill
p.scatter = False
print p.map(obj, x)
# worker pool strategy + dill.source
p.source = True
print p.map(obj, x)
# scatter-gather strategy + dill.source
p.scatter = True
print p.map(obj, x)
# scatter-gather strategy + dill
p.source = False
print p.map(obj, x)
if __name__ == '__main__':
from math import sin
f = lambda x:x+1
def g(x):
return x+2
for func in [g, f, abs, sin]:
test_pool(func)
In [9]:
!python2.7 pyinapool.py
For emcee, see: http://dan.iel.fm/emcee/current/user/advanced/
MPI_importpyina and IPython.parallel
In [ ]:
$ ipcluster mpiexec -n 16 --mpi=mpi4py
In [ ]:
from IPython.kernel import client
mec = client.MultiEngineClient()
mec.activate()
%px from mpi4py import MPI
%px print(MPI.Get_processor_name())
pyina and ipython-cluster-helper
In [ ]:
from pyina.launchers import SerialMapper
from pyina.schedulers import Torque
from pyina.mpi import _save, _debug
def host(id):
import socket
return "Rank: %d -- %s" % (id, socket.gethostname())
print "Submit a non-parallel job to torque in the 'productionQ' queue..."
print "Using 5 items over 1 nodes and the default mapping strategy"
torque = Torque(queue='productionQ', timelimit='20:00:00', workdir='.')
pool = SerialMapper(scheduler=torque)
res = pool.map(host, range(5))
print pool
print '\n'.join(res)
In [ ]:
from pyina.launchers import Mpi
from pyina.schedulers import Torque
from pyina.mpi import _save, _debug
def host(id):
import socket
return "Rank: %d -- %s" % (id, socket.gethostname())
print "Submit an mpi job to torque in the 'productionQ' queue..."
print "Using 15 items over 5 nodes and the scatter-gather strategy"
torque = Torque('5:ppn=2', queue='productionQ', timelimit='20:00:00', workdir='.')
pool = Mpi(scheduler=torque, scatter=True)
res = pool.map(host, range(15))
print pool
print '\n'.join(res)
print "hello from master"
multiprocessing interface: ...We are in the tall weeds here...
The other end of the spectrum is high-performance parallel instead of large-scale parallel.