What is large-scale and cluster computing?
mpi4py
and ...)pyopencl
and pycuda
)cloud
, mrjob
, and Apache's mesos
/spark
/hadoop
/zookeeper
/...)We'll focus on mpi4py
, as it's probably the most stable and active of the python MPI modules, and generally provides the most in terms of classic scalability to instutional class resources.
Getting started with mpi4py
easy_install
-- thanks MPI"hello"
In [2]:
%%file hellompi.py
"""
Parallel Hello World
"""
from mpi4py import MPI
import sys
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()
sys.stdout.write(
"Hello, World! I am process %d of %d on %s.\n"
% (rank, size, name))
mpiexec
In [4]:
!mpiexec -n 4 python2.7 hellompi.py
Point to point communication
In [24]:
%%file mpipt2pt.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
if rank == 0:
data = range(10)
more = range(0,20,2)
print 'rank %i sends data:' % rank, data
comm.send(data, dest=1, tag=1337)
print 'rank %i sends data:' % rank, more
comm.send(more, dest=2 ,tag=1456)
elif rank == 1:
data = comm.recv(source=0, tag=1337)
print 'rank %i got data:' % rank, data
elif rank == 2:
more = comm.recv(source=0, tag=1456)
print 'rank %i got data:' % rank, more
In [25]:
!mpiexec -n 4 python2.7 mpipt2pt.py
In [45]:
%%file mpipt2pt2.py
'''nonblocking communication
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
pair = {0:1, 1:0} # rank 0 sends to 1 and vice versa
sendbuf = np.zeros(5) + rank
recvbuf = np.empty_like(sendbuf)
print 'rank %i sends data:' % rank, sendbuf
sreq = comm.Isend(sendbuf, dest=pair[rank], tag=1337)
rreq = comm.Irecv(recvbuf, source=pair[rank], tag=1337)
# rreq.Wait(); sreq.Wait()
MPI.Request.Waitall([rreq, sreq])
if rank == 1:
time.sleep(0.001) # delay slightly for better printing
print 'rank %i got data:' % rank, recvbuf
In [46]:
!mpiexec -n 2 python2.7 mpipt2pt2.py
Collective communication
In [52]:
%%file mpiscattered.py
'''mpi scatter
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
if rank == 0:
data = np.arange(10)
print 'rank %i has data' % rank, data
data_split_list = np.array_split(data, size)
else:
data_split_list = None
data_split = comm.scatter(data_split_list, root=0)
# some delays for printing purposes
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i got data' % rank, data_split
In [53]:
!mpiexec -n 3 python2.7 mpiscattered.py
In [70]:
%%file mpibroadcasted.py
'''mpi broadcast
'''
from mpi4py import MPI
import numpy as np
import time
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
N = 10.
data = np.arange(N) if rank == 0 else np.zeros(N)
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i has data' % rank, data
comm.Bcast(data, root=0)
if rank == 1:
time.sleep(0.001)
elif rank == 2:
time.sleep(0.002)
print 'rank %i got data' % rank, data
In [71]:
!mpiexec -n 3 python2.7 mpibroadcasted.py
Not covered: shared memory and shared objects
Better serialization
In [27]:
from mpi4py import MPI
try:
import dill
MPI._p_pickle.dumps = dill.dumps
MPI._p_pickle.loads = dill.loads
except ImportError, AttributeError:
pass
Working with cluster schedulers, the JOB
file
In [47]:
%%file jobscript.sh
#!/bin/sh
#PBS -l nodes=1:ppn=4
#PBS -l walltime=00:03:00
cd ${PBS_O_WORKDIR} || exit 2
mpiexec -np 4 python hellompi.py
Beyond mpi4py
Pool
: pyina
and emcee.utils
In [8]:
%%file pyinapool.py
def test_pool(obj):
from pyina.launchers import Mpi
x = range(6)
p = Mpi(8)
# worker pool strategy + dill
p.scatter = False
print p.map(obj, x)
# worker pool strategy + dill.source
p.source = True
print p.map(obj, x)
# scatter-gather strategy + dill.source
p.scatter = True
print p.map(obj, x)
# scatter-gather strategy + dill
p.source = False
print p.map(obj, x)
if __name__ == '__main__':
from math import sin
f = lambda x:x+1
def g(x):
return x+2
for func in [g, f, abs, sin]:
test_pool(func)
In [9]:
!python2.7 pyinapool.py
For emcee, see: http://dan.iel.fm/emcee/current/user/advanced/
MPI_import
pyina
and IPython.parallel
In [ ]:
$ ipcluster mpiexec -n 16 --mpi=mpi4py
In [ ]:
from IPython.kernel import client
mec = client.MultiEngineClient()
mec.activate()
%px from mpi4py import MPI
%px print(MPI.Get_processor_name())
pyina
and ipython-cluster-helper
In [ ]:
from pyina.launchers import SerialMapper
from pyina.schedulers import Torque
from pyina.mpi import _save, _debug
def host(id):
import socket
return "Rank: %d -- %s" % (id, socket.gethostname())
print "Submit a non-parallel job to torque in the 'productionQ' queue..."
print "Using 5 items over 1 nodes and the default mapping strategy"
torque = Torque(queue='productionQ', timelimit='20:00:00', workdir='.')
pool = SerialMapper(scheduler=torque)
res = pool.map(host, range(5))
print pool
print '\n'.join(res)
In [ ]:
from pyina.launchers import Mpi
from pyina.schedulers import Torque
from pyina.mpi import _save, _debug
def host(id):
import socket
return "Rank: %d -- %s" % (id, socket.gethostname())
print "Submit an mpi job to torque in the 'productionQ' queue..."
print "Using 15 items over 5 nodes and the scatter-gather strategy"
torque = Torque('5:ppn=2', queue='productionQ', timelimit='20:00:00', workdir='.')
pool = Mpi(scheduler=torque, scatter=True)
res = pool.map(host, range(15))
print pool
print '\n'.join(res)
print "hello from master"
multiprocessing
interface: ...We are in the tall weeds here...
The other end of the spectrum is high-performance parallel instead of large-scale parallel.