MPI - point to point operations

We will use mpi4py


In [1]:
import numpy as np

In [2]:
import ipyparallel as ipp
c = ipp.Client(profile='mpi')
print(c.ids)
view = c[:]
view.activate()


[0, 1, 2, 3, 4, 5]

Parallel eigenvalues:

import numpy as np
%time np.max(np.real(np.linalg.eigvals(np.random.randn(400,400))))

A task: find a biggest entry in a random matrix:


In [8]:
%time np.max(np.random.randn(5000,5000))


CPU times: user 1.06 s, sys: 84 ms, total: 1.14 s
Wall time: 1.14 s
Out[8]:
5.1695004271475078

In [3]:
%%px --block
from mpi4py import MPI
import time
import numpy as np 
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
t = MPI.Wtime()
print("result =",np.max(np.random.randn(5000,5000//4)))
t = MPI.Wtime() - t
print(rank,":: execution time:",t)


[stdout:0] 
result = 4.92475096345
5 :: execution time: 0.32053208351135254
[stdout:1] 
result = 5.19015297189
0 :: execution time: 0.320300817489624
[stdout:2] 
result = 4.96516606574
1 :: execution time: 0.2753469944000244
[stdout:3] 
result = 5.31471805457
2 :: execution time: 0.27109289169311523
[stdout:4] 
result = 4.89196635269
3 :: execution time: 0.31802892684936523
[stdout:5] 
result = 5.0994157818
4 :: execution time: 0.31693005561828613

Send and receive

  • We use rank to differentiate code between processors.
  • Note that mpi4py serializes arbitrary data before send.

Important!

  • In MPI for Python, the Send(), Recv() and Sendrecv() can communicate memory buffers.
  • The variants send(), recv() and sendrecv() can communicate generic Python objects.

In [11]:
%%px --block

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

data = None

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1)

elif rank == 1:
    data = comm.recv(source=0)

print("OK, rank= ",rank,"dane: ",data)


[stdout:0] OK, rank=  3 dane:  None
[stdout:1] OK, rank=  0 dane:  {'a': 7, 'b': 3.14}
[stdout:2] OK, rank=  1 dane:  {'a': 7, 'b': 3.14}
[stdout:3] OK, rank=  2 dane:  None

Sending and receiving numpy arrays

  • we can send the whole array

In [6]:
%%px --block
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

a = np.zeros((2,2))

if rank == 0:
    a[:] = 2
    comm.send(a, dest=1)
elif rank == 1:
    a = comm.recv(source=0)

print ("OK,",rank,np.sum(a))


[stdout:0] OK, 3 0.0
[stdout:1] OK, 2 0.0
[stdout:2] OK, 1 8.0
[stdout:3] OK, 0 8.0
  • we can send a slice !

In [6]:
%%px --block
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

a = np.zeros((2,2))

if rank == 0:
    a[:] = 2
    comm.send(a[0,:], dest=1)
elif rank == 1:
    a[0,:] = comm.recv(source=0)

print("OK,",rank,np.sum(a))


[stdout:0] OK, 0 8.0
[stdout:1] OK, 5 0.0
[stdout:2] OK, 2 0.0
[stdout:3] OK, 3 0.0
[stdout:4] OK, 4 0.0
[stdout:5] OK, 1 4.0

In [7]:
view['rank']


Out[7]:
[0, 5, 2, 3, 4, 1]

In [11]:
view['a'][5]


Out[11]:
array([[ 2.,  2.],
       [ 0.,  0.]])

In [15]:
np.argsort(view['rank'])


Out[15]:
array([0, 5, 2, 3, 4, 1])

In [16]:
view['a'][np.argsort(view['rank'])[1]]


Out[16]:
array([[ 2.,  2.],
       [ 0.,  0.]])

In [9]:
print(view['a'][view['rank'][0]])
print(view['a'][view['rank'][1]])


[[ 2.  2.]
 [ 2.  2.]]
[[ 2.  2.]
 [ 0.  0.]]

In [10]:
%%px --block

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

a = np.zeros((2,2))
if rank == 0:
    a[:] = 2
    comm.send(a[:,0], dest=1)
elif rank == 1:
    a[:,0] = comm.recv(source=0)

print ("OK,",rank,np.sum(a))


[stdout:0] OK, 3 0.0
[stdout:1] OK, 2 0.0
[stdout:2] OK, 1 4.0
[stdout:3] OK, 0 8.0

In [11]:
print(view['a'][view['rank'][0]])
print(view['a'][view['rank'][1]])


[[ 2.  2.]
 [ 2.  2.]]
[[ 2.  0.]
 [ 2.  0.]]

Communicating memorybuffers: Send and Recv


In [12]:
%%px --block

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

a = np.zeros((2,2))
if rank == 0:
    a[:] = 2
    comm.Send(a[0,:], dest=1)
elif rank == 1:
    comm.Recv(a[0,:], source=0)

print ("OK,",rank,np.sum(a))


[stdout:0] OK, 3 0.0
[stdout:1] OK, 0 8.0
[stdout:2] OK, 1 4.0
[stdout:3] OK, 2 0.0

In [13]:
%%px --block

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

a = np.zeros((2,2))
if rank == 0:
    a[:] = 2
    comm.Send(a[:,0], dest=1)
elif rank == 1:
    comm.Recv(a[:,0], source=0)

print ("OK,",rank,np.sum(a))


[1:execute]: 
---------------------------------------------------------------------------ValueError                                Traceback (most recent call last)<ipython-input-8-f7e06c0094a9> in <module>()
      9 if rank == 0:
     10     a[:] = 2
---> 11     comm.Send(a[:,0], dest=1)
     12 elif rank == 1:
     13     comm.Recv(a[:,0], source=0)
MPI/Comm.pyx in mpi4py.MPI.Comm.Send (src/mpi4py.MPI.c:92870)()
MPI/msgbuffer.pxi in mpi4py.MPI.message_p2p_send (src/mpi4py.MPI.c:33688)()
MPI/msgbuffer.pxi in mpi4py.MPI._p_msg_p2p.for_send (src/mpi4py.MPI.c:33550)()
MPI/msgbuffer.pxi in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:30349)()
MPI/msgbuffer.pxi in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:29359)()
MPI/asbuffer.pxi in mpi4py.MPI.getbuffer (src/mpi4py.MPI.c:9310)()
MPI/asbuffer.pxi in mpi4py.MPI.PyObject_GetBufferEx (src/mpi4py.MPI.c:8070)()
ValueError: ndarray is not contiguous

[2:execute]: 
---------------------------------------------------------------------------ValueError                                Traceback (most recent call last)<ipython-input-8-f7e06c0094a9> in <module>()
     11     comm.Send(a[:,0], dest=1)
     12 elif rank == 1:
---> 13     comm.Recv(a[:,0], source=0)
     14 
     15 print ("OK,",rank,np.sum(a))
MPI/Comm.pyx in mpi4py.MPI.Comm.Recv (src/mpi4py.MPI.c:93084)()
MPI/msgbuffer.pxi in mpi4py.MPI.message_p2p_recv (src/mpi4py.MPI.c:33765)()
MPI/msgbuffer.pxi in mpi4py.MPI._p_msg_p2p.for_recv (src/mpi4py.MPI.c:33610)()
MPI/msgbuffer.pxi in mpi4py.MPI.message_simple (src/mpi4py.MPI.c:30349)()
MPI/msgbuffer.pxi in mpi4py.MPI.message_basic (src/mpi4py.MPI.c:29359)()
MPI/asbuffer.pxi in mpi4py.MPI.getbuffer (src/mpi4py.MPI.c:9310)()
MPI/asbuffer.pxi in mpi4py.MPI.PyObject_GetBufferEx (src/mpi4py.MPI.c:8070)()
ValueError: ndarray is not contiguous

Contiguous memory buffers


In [14]:
a = np.zeros((2,2))
a.flags


Out[14]:
  C_CONTIGUOUS : True
  F_CONTIGUOUS : False
  OWNDATA : True
  WRITEABLE : True
  ALIGNED : True
  UPDATEIFCOPY : False

In [15]:
a[:,0].flags


Out[15]:
  C_CONTIGUOUS : False
  F_CONTIGUOUS : False
  OWNDATA : False
  WRITEABLE : True
  ALIGNED : True
  UPDATEIFCOPY : False

In [16]:
a[0,:].flags


Out[16]:
  C_CONTIGUOUS : True
  F_CONTIGUOUS : True
  OWNDATA : False
  WRITEABLE : True
  ALIGNED : True
  UPDATEIFCOPY : False

In [17]:
%%px --block

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

a = np.zeros((2,2))
if rank == 0:
    a[:] = 2
    buf = a[:,0].copy()
    comm.Send(buf, dest=1)   
elif rank == 1:
    buf = np.empty(2)
    comm.Recv(buf, source=0)
    a[:,0] = buf 
    print ("OK,",np.sum(a))


[stdout:2] OK, 4.0

In [1]:
import ipyparallel as ipp
c = ipp.Client(profile='mpi')
print(c.ids)
view = c[:]
view.activate()


[0, 1, 2, 3]

In [5]:
%%px --block --target :3
print("OK")


[stdout:0] OK
[stdout:1] OK
[stdout:2] OK

In [6]:
%%px --block
print("OK")


[stdout:0] OK
[stdout:1] OK
[stdout:2] OK
[stdout:3] OK

In [9]:
%%px --block
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

a = np.zeros((2,2))

if rank == 0:
    import os
    print(os.getcwd())


[stdout:3] /home/users/marcin.kostur/ProgramowanieRownolegle/MPI

In [ ]: