MPI and cluster computing

What is large-scale and cluster computing?

  • MPI: the message passing interface (mpi4py and ...)
  • GPU: graphics processing-based parallelism (pyopencl and pycuda)
  • Cloud computing (cloud, mrjob, and Apache's mesos/spark/hadoop/zookeeper/...)

We'll focus on mpi4py, as it's probably the most stable and active of the python MPI modules, and generally provides the most in terms of classic scalability to instutional class resources.

Getting started with mpi4py

  • Typically not an easy_install -- thanks MPI
  • Getting started: say "hello"

In [2]:
%%file hellompi.py
"""
Parallel Hello World
"""

from mpi4py import MPI
import sys

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()

sys.stdout.write(
    "Hello, World! I am process %d of %d on %s.\n" 
    % (rank, size, name))


Writing hellompi.py
  • Executes with mpiexec

In [4]:
!mpiexec -n 4 python2.7 hellompi.py


Hello, World! I am process 2 of 4 on hilbert.local.
Hello, World! I am process 1 of 4 on hilbert.local.
Hello, World! I am process 0 of 4 on hilbert.local.
Hello, World! I am process 3 of 4 on hilbert.local.
  • Coding for multiple "personalities" (nodes, actually)

Point to point communication


In [24]:
%%file mpipt2pt.py
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

if rank == 0:
    data = range(10)
    more = range(0,20,2)
    print 'rank %i sends data:' % rank, data
    comm.send(data, dest=1, tag=1337)
    print 'rank %i sends data:' % rank, more
    comm.send(more, dest=2 ,tag=1456)
elif rank == 1:
    data = comm.recv(source=0, tag=1337)
    print 'rank %i got data:' % rank, data
elif rank == 2:
    more = comm.recv(source=0, tag=1456)
    print 'rank %i got data:' % rank, more


Overwriting mpipt2pt.py

In [25]:
!mpiexec -n 4 python2.7 mpipt2pt.py


rank 0 sends data: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
rank 0 sends data: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
rank 2 got data: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
rank 1 got data: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [45]:
%%file mpipt2pt2.py
'''nonblocking communication
'''
from mpi4py import MPI
import numpy as np
import time

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

pair = {0:1, 1:0} # rank 0 sends to 1 and vice versa
sendbuf = np.zeros(5) + rank
recvbuf = np.empty_like(sendbuf)

print 'rank %i sends data:' % rank, sendbuf
sreq = comm.Isend(sendbuf, dest=pair[rank], tag=1337)
rreq = comm.Irecv(recvbuf, source=pair[rank], tag=1337)

# rreq.Wait(); sreq.Wait()
MPI.Request.Waitall([rreq, sreq])
if rank == 1: 
    time.sleep(0.001) # delay slightly for better printing
print 'rank %i got data:' % rank, recvbuf


Overwriting mpipt2pt2.py

In [46]:
!mpiexec -n 2 python2.7 mpipt2pt2.py


rank 1 sends data: [ 1.  1.  1.  1.  1.]
rank 0 sends data: [ 0.  0.  0.  0.  0.]
rank 0 got data: [ 1.  1.  1.  1.  1.]
rank 1 got data: [ 0.  0.  0.  0.  0.]

Collective communication


In [52]:
%%file mpiscattered.py
'''mpi scatter
'''
from mpi4py import MPI
import numpy as np
import time

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

if rank == 0:
    data = np.arange(10)
    print 'rank %i has data' % rank, data
    data_split_list = np.array_split(data, size)
else:
    data_split_list = None
data_split = comm.scatter(data_split_list, root=0)

# some delays for printing purposes
if rank == 1:
    time.sleep(0.001)
elif rank == 2:
    time.sleep(0.002)
print 'rank %i got data' % rank, data_split


Overwriting mpiscattered.py

In [53]:
!mpiexec -n 3 python2.7 mpiscattered.py


rank 0 has data [0 1 2 3 4 5 6 7 8 9]
rank 0 got data [0 1 2 3]
rank 1 got data [4 5 6]
rank 2 got data [7 8 9]

In [70]:
%%file mpibroadcasted.py
'''mpi broadcast
'''
from mpi4py import MPI
import numpy as np
import time

comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()

N = 10.
data = np.arange(N) if rank == 0 else np.zeros(N)
if rank == 1:
    time.sleep(0.001)
elif rank == 2:
    time.sleep(0.002)
print 'rank %i has data' % rank, data

comm.Bcast(data, root=0)
if rank == 1:
    time.sleep(0.001)
elif rank == 2:
    time.sleep(0.002)
print 'rank %i got data' % rank, data


Overwriting mpibroadcasted.py

In [71]:
!mpiexec -n 3 python2.7 mpibroadcasted.py


rank 1 has data [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
rank 2 has data [ 0.  0.  0.  0.  0.  0.  0.  0.  0.  0.]
rank 0 has data [ 0.  1.  2.  3.  4.  5.  6.  7.  8.  9.]
rank 0 got data [ 0.  1.  2.  3.  4.  5.  6.  7.  8.  9.]
rank 1 got data [ 0.  1.  2.  3.  4.  5.  6.  7.  8.  9.]
rank 2 got data [ 0.  1.  2.  3.  4.  5.  6.  7.  8.  9.]

Not covered: shared memory and shared objects

Better serialization


In [27]:
from mpi4py import MPI

try:
    import dill
    MPI._p_pickle.dumps = dill.dumps
    MPI._p_pickle.loads = dill.loads
except ImportError, AttributeError:
    pass

Working with cluster schedulers, the JOB file


In [47]:
%%file jobscript.sh
#!/bin/sh
#PBS -l nodes=1:ppn=4
#PBS -l walltime=00:03:00
cd ${PBS_O_WORKDIR} || exit 2
mpiexec -np 4 python hellompi.py


Writing jobscript.sh

Beyond mpi4py

  • The task Pool: pyina and emcee.utils

In [8]:
%%file pyinapool.py

def test_pool(obj):
    from pyina.launchers import Mpi
    x = range(6)
    p = Mpi(8)
    
    # worker pool strategy + dill
    p.scatter = False
    print p.map(obj, x)
    
    # worker pool strategy + dill.source 
    p.source = True
    print p.map(obj, x)
    
    # scatter-gather strategy + dill.source
    p.scatter = True
    print p.map(obj, x)
    
    # scatter-gather strategy + dill
    p.source = False
    print p.map(obj, x)


if __name__ == '__main__':

    from math import sin
    f = lambda x:x+1
    def g(x):
        return x+2

    for func in [g, f, abs, sin]:
        test_pool(func)


Writing pyinapool.py

In [9]:
!python2.7 pyinapool.py


[2, 3, 4, 5, 6, 7]
[2, 3, 4, 5, 6, 7]
[2, 3, 4, 5, 6, 7]
[2, 3, 4, 5, 6, 7]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6]
[1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3, 4, 5]
[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385]
[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385]
[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385]
[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385]
  • Loooooong import times: MPI_import
  • Interactive MPI: pyina and IPython.parallel

In [ ]:
$ ipcluster mpiexec -n 16 --mpi=mpi4py

In [ ]:
from IPython.kernel import client
mec = client.MultiEngineClient()
mec.activate()

%px from mpi4py import MPI
%px print(MPI.Get_processor_name())
  • Working with schedulers directly: pyina and ipython-cluster-helper

In [ ]:
from pyina.launchers import SerialMapper
from pyina.schedulers import Torque
from pyina.mpi import _save, _debug

def host(id):
    import socket
    return "Rank: %d -- %s" % (id, socket.gethostname())

print "Submit a non-parallel job to torque in the 'productionQ' queue..."
print "Using 5 items over 1 nodes and the default mapping strategy"

torque = Torque(queue='productionQ', timelimit='20:00:00', workdir='.')
pool = SerialMapper(scheduler=torque) 
res = pool.map(host, range(5))

print pool
print '\n'.join(res)

In [ ]:
from pyina.launchers import Mpi
from pyina.schedulers import Torque
from pyina.mpi import _save, _debug

def host(id):
    import socket
    return "Rank: %d -- %s" % (id, socket.gethostname())

print "Submit an mpi job to torque in the 'productionQ' queue..."
print "Using 15 items over 5 nodes and the scatter-gather strategy"

torque = Torque('5:ppn=2', queue='productionQ', timelimit='20:00:00', workdir='.')
pool = Mpi(scheduler=torque, scatter=True)
res = pool.map(host, range(15))

print pool
print '\n'.join(res)

print "hello from master"
  • Issue: conforming to the multiprocessing interface: ...

We are in the tall weeds here...

The other end of the spectrum is high-performance parallel instead of large-scale parallel.