Setting up halo exchange


In [1]:
import numpy as np
import itertools
from test_life import hash_grid
from life import life_step

In [2]:
shape = (4,4)

In [3]:
A = np.random.randint(0,2,shape)
B = np.random.randint(0,2,shape)

print A
print B


[[0 0 1 0]
 [1 1 0 1]
 [1 1 0 0]
 [0 1 0 1]]
[[1 1 1 0]
 [0 0 1 0]
 [1 0 0 1]
 [0 0 0 1]]

Think of A and B as part of an overlapping horizontal decomposition of C, where the only "true" values are in the interior cells. The first thing we'll do is determine what the interior of C looks like


In [4]:
C = np.vstack((A[1:3,1:3],B[1:3,1:3]))
print C


[[1 0]
 [1 0]
 [0 1]
 [0 0]]

Now let's communicate from A to B and B to A so that they have correct neighbor information


In [5]:
A[-1,1:-1] = B[1,1:-1]
B[0,1:-1] = A[2,1:-1]

print A
print B


[[0 0 1 0]
 [1 1 0 1]
 [1 1 0 0]
 [0 0 1 1]]
[[1 1 0 0]
 [0 0 1 0]
 [1 0 0 1]
 [0 0 0 1]]

Note that we only communicated an "interior" slice, as if we were using the diagonal trick. If we want to communicate an exterior slice, this is easy.


In [6]:
A[-1,:] = B[1,:]
B[0,:] = A[-2,:]

print A
print B


[[0 0 1 0]
 [1 1 0 1]
 [1 1 0 0]
 [0 0 1 0]]
[[1 1 0 0]
 [0 0 1 0]
 [1 0 0 1]
 [0 0 0 1]]

Okay, now we have the general idea, let's check it over the interior of a set of subgrids.


In [7]:
shape = (8,8)
A = np.random.randint(0,2,shape)
B = A.copy()
ng = (2, 2)
gridl = [s/n for s,n in zip(shape,ng)]
gs = [[l * i for i in range(ngi)] for ngi,l in zip(ng,gridl)]
gs = list(itertools.product(*gs))
slices = [[(slice(i,i+l)) for i, l in zip(gsi,gridl)] for gsi in gs]
sliceA = [slice(0,s) for s in shape]
grids = [A[i] for i in slices]

hashes = [hash_grid(g, s) for g, s in zip(grids, slices)]
print sum(hashes)
print hash_grid(A, sliceA)


261
261

Augment the four slices.


In [8]:
lgs = [[l*i-1 if i > 0 else 0 for i in range(ngi)] for ngi,l in zip(ng, gridl)]
lge = [[l*(i+1)+1 if i+1 < ngi else l*(i+1) for i in range(ngi)] for ngi,l in zip(ng, gridl)]
lgs = list(itertools.product(*lgs))
lge = list(itertools.product(*lge))
print lgs
print lge
#slices = [[slice(i,i+l) for i, l in zip(gsi, gridl)] for gsi in gs]
lslices = [[slice(i,j) for i, j in zip(gsi, gei)] for gsi, gei in zip(lgs, lge)]

lgrids = [A[i] for i in lslices]


[(0, 0), (0, 3), (3, 0), (3, 3)]
[(5, 5), (5, 8), (8, 5), (8, 8)]

For the next test, we'll perform ten time steps on an augmented set of grids, using communication, then compute the hash on the individual grids and the full grid.


In [9]:
for g in lgrids:
    print g


[[1 1 1 1 1]
 [1 1 0 1 0]
 [1 1 0 1 0]
 [1 0 1 0 1]
 [1 0 1 0 1]]
[[1 1 0 0 0]
 [1 0 0 1 0]
 [1 0 0 0 0]
 [0 1 0 0 0]
 [0 1 0 1 0]]
[[1 0 1 0 1]
 [1 0 1 0 1]
 [0 1 0 1 1]
 [0 0 0 1 0]
 [0 1 0 1 0]]
[[0 1 0 0 0]
 [0 1 0 1 0]
 [1 1 1 1 1]
 [1 0 0 1 0]
 [1 0 0 1 1]]

In [10]:
def communicate(grid, left, right, up, down):
    """ Communicates grid data.  
    
    Performs a simulated inner 'put' from grid into two left-right neighbors, then a full 'put' from grid into up-down neighbors"""

    if left != None:
        left[1:-1,-1] = grid[1:-1,1]

    if right != None:
        right[1:-1,0] = grid[1:-1,-2]

    if up != None:
        up[-1,:] = grid[1,:]

    if down != None:
        down[0,:] = grid[-2,:]

In [11]:
g00,g01,g10,g11 = [l.copy() for l in lgrids]

print A


[[1 1 1 1 1 0 0 0]
 [1 1 0 1 0 0 1 0]
 [1 1 0 1 0 0 0 0]
 [1 0 1 0 1 0 0 0]
 [1 0 1 0 1 0 1 0]
 [0 1 0 1 1 1 1 1]
 [0 0 0 1 0 0 1 0]
 [0 1 0 1 0 0 1 1]]

In [12]:
print g00


[[1 1 1 1 1]
 [1 1 0 1 0]
 [1 1 0 1 0]
 [1 0 1 0 1]
 [1 0 1 0 1]]

In [13]:
print g01


[[1 1 0 0 0]
 [1 0 0 1 0]
 [1 0 0 0 0]
 [0 1 0 0 0]
 [0 1 0 1 0]]

In [14]:
print g10


[[1 0 1 0 1]
 [1 0 1 0 1]
 [0 1 0 1 1]
 [0 0 0 1 0]
 [0 1 0 1 0]]

In [15]:
print g11


[[0 1 0 0 0]
 [0 1 0 1 0]
 [1 1 1 1 1]
 [1 0 0 1 0]
 [1 0 0 1 1]]

In [16]:
n_00 = (None, g01, None, g10)
n_01 = (g00, None, None, g11)
n_10 = (None, g11, g00, None)
n_11 = (g10, None, g01, None)

In [17]:
g00[4,1:3] = 0
g00[1:3,4] = 0

In [18]:
communicate(g00, *n_00)
communicate(g01, *n_01)
communicate(g10, *n_10)
communicate(g11, *n_11)

In [19]:
print B


[[1 1 1 1 1 0 0 0]
 [1 1 0 1 0 0 1 0]
 [1 1 0 1 0 0 0 0]
 [1 0 1 0 1 0 0 0]
 [1 0 1 0 1 0 1 0]
 [0 1 0 1 1 1 1 1]
 [0 0 0 1 0 0 1 0]
 [0 1 0 1 0 0 1 1]]

In [20]:
print g00


[[1 1 1 1 1]
 [1 1 0 1 0]
 [1 1 0 1 0]
 [1 0 1 0 1]
 [1 0 1 0 1]]

In [21]:
print g01


[[1 1 0 0 0]
 [1 0 0 1 0]
 [1 0 0 0 0]
 [0 1 0 0 0]
 [0 1 0 1 0]]

In [118]:
def build_grids(A, ng):
    """Create a decomposition of the grids on A"""
    
    shape = A.shape
    gridl = [s/n for s,n in zip(shape,ng)]
    gs = [[l * i for i in range(ngi)] for ngi,l in zip(ng,gridl)]
    gs = list(itertools.product(*gs))
    slices = [[(slice(i,i+l)) for i, l in zip(gsi,gridl)] for gsi in gs]
    sliceA = [slice(0,s) for s in shape]
    return sliceA, slices, [A[i] for i in slices]
    
def build_local_grids(A, ng):
    """Create a copy of the local grids on A"""
    
    shape = A.shape
    gridl = [s/n for s,n in zip(shape,ng)]

    lgs = [[l*i-1 if i > 0 else 0 for i in range(ngi)] for ngi,l in zip(ng, gridl)]
    lge = [[l*(i+1)+1 if i+1 < ngi else l*(i+1) for i in range(ngi)] for ngi,l in zip(ng, gridl)]
    lgs = list(itertools.product(*lgs))
    lge = list(itertools.product(*lge))
    lslices = [[slice(i,j) for i, j in zip(gsi, gei)] for gsi, gei in zip(lgs, lge)]
    return [A[i].copy() for i in lslices]
    
def setup_4(A, local_grids, ng):    
    l_00, l_01, l_10, l_11 = [grid for grid in local_grids]
    
    n_00 = (None, l_01, None, l_10)
    n_01 = (l_00, None, None, l_11)
    n_10 = (None, l_11, l_00, None)
    n_11 = (l_10, None, l_01, None)  
    
    shape = A.shape
    gridl = [s/n for s,n in zip(shape,ng)]

    m0 = gridl[0]
    m1 = gridl[1]
    
    g_00 = l_00[:m0,:m1]
    l_00[m0,:] = 0
    l_00[1:m0-1,m1] = 0
    
    g_01 = l_01[:m0,1:]
    l_01[m0,:] = 0
    l_01[1:m0-1,0] = 0
    
    g_10 = l_10[1:,:m1]
    l_10[0,:] = 0
    l_10[1:m0-1,m1] = 0
        
    g_11 = l_11[1:,1:]
    l_11[0,:] = 0
    l_11[1:m1-1,0] = 0

    def comm_all():
        communicate(l_00, *n_00[:2] + (None, None))
        communicate(l_01, *n_01[:2] + (None, None))
        communicate(l_10, *n_10[:2] + (None, None))
        communicate(l_11, *n_11[:2] + (None, None))
        communicate(l_00, *(None, None) + n_00[2:])
        communicate(l_01, *(None, None) + n_01[2:])
        communicate(l_10, *(None, None) + n_10[2:])
        communicate(l_11, *(None, None) + n_11[2:])
    
    grids = (g_00, g_01, g_10, g_11)
    
    return grids, comm_all

In [119]:
def test_communicate():
    """Verify communications between processes"""

    shape = (8,8)
    A = np.random.randint(0,2,shape)
    
    ng = (2, 2)
    sliceA, slices, grids = build_grids(A, ng)
    local_grids = build_local_grids(A, ng)
    
    # check that subgrids match before we start
    hashes = [hash_grid(g, s) for g, s in zip(grids, slices)]
    assert(sum(hashes) == hash_grid(A, sliceA))
        
    grids, comm_all = setup_4(A, local_grids, ng)
    
    comm_all()
    
    c_hashes = [hash_grid(g, s) for g, s in zip(grids, slices)]
    
    assert(sum(hashes) == hash_grid(A, sliceA))
    assert(sum(c_hashes) == hash_grid(A, sliceA))

In [120]:
test_communicate()

In [123]:
def test_communicating_steps():
    """Verify communications in running code between processes"""

    shape = (4,4)
    A = np.random.randint(0,2,shape)
    sliceA = [slice(0,s) for s in shape]
    slices = [[(slice(i,i+l)) for i, l in zip(gsi,gridl)] for gsi in gs]

    B = A.copy()
    B_swap = B.copy()
    
    ng = (2, 2)
    sliceA, slices, grids = build_grids(A, ng)
    local_grids = build_local_grids(A, ng)

    swap_local_grids = [local_grid.copy() for local_grid in local_grids]
    
    # check that subgrids match before we start
    hashes = [hash_grid(g, s) for g, s in zip(grids, slices)]
    assert(sum(hashes) == hash_grid(A, sliceA))
        
    grids, comm_all = setup_4(A, local_grids, ng)
    swap_grids, swap_comm_all = setup_4(A, swap_local_grids, ng)
    
    iters = 10

    for i in range(iters):
        comm_all()
        
        for grid, swap_local_grid in zip(local_grids, swap_local_grids):
            life_step(grid, swap_local_grid)
            
        (local_grids, comm_all, grids, swap_local_grids, swap_comm_all, swap_grids) = \
            swap_local_grids, swap_comm_all, swap_grids, local_grids, comm_all, grids
    
        life_step(B, B_swap)
        B_swap, B = B, B_swap

        hashes = [hash_grid(g, s) for g, s in zip(grids, slices)]

        if sum(hashes) != hash_grid(B, sliceA):       
            for grid in swap_local_grids:
                print grid
            print '-'*40
            print B_swap

            print '*'*40
            for grid in local_grids:
                print grid
            print '-'
            print B

    hashes = [hash_grid(g, s) for g, s in zip(grids, slices)]
             
    assert(sum(hashes) == hash_grid(B, sliceA))

In [124]:
test_communicating_steps()

In [105]:



> <ipython-input-102-cad44b2cebdc>(58)comm_all()
     57     def comm_all():
---> 58         communicate(l_00, *(n_00[:2], None, None))
     59         communicate(l_01, *(n_01[:2], None, None))

ipdb> print n_00[:2]
(None, array([[1, 1, 1, 1, 1],
       [0, 0, 1, 0, 0],
       [0, 0, 1, 1, 1],
       [0, 1, 0, 1, 0],
       [0, 0, 0, 0, 0]]))
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-105-fe3bb68c9ebd> in <module>()
----> 1 get_ipython().magic(u'debug')

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
   2180         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2181         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2182         return self.run_line_magic(magic_name, magic_arg_s)
   2183 
   2184     #-------------------------------------------------------------------------

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
   2101                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2102             with self.builtin_trap:
-> 2103                 result = fn(*args,**kwargs)
   2104             return result
   2105 

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in debug(self, parameter_s)

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
    189     # but it's overkill for just that one bit of state.
    190     def magic_deco(arg):
--> 191         call = lambda f, *a, **k: f(*a, **k)
    192 
    193         if callable(arg):

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in debug(self, parameter_s)
    307         the %pdb magic for more details.
    308         """
--> 309         self.shell.debugger(force=True)
    310 
    311     @line_magic

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in debugger(self, force)
    942 
    943         with self.readline_no_record:
--> 944             pm()
    945 
    946     #-------------------------------------------------------------------------

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in <lambda>()
    939         else:
    940             # fallback to our internal debugger
--> 941             pm = lambda : self.InteractiveTB.debugger(force=True)
    942 
    943         with self.readline_no_record:

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/core/ultratb.pyc in debugger(self, force)
   1006                     etb = etb.tb_next
   1007                 self.pdb.botframe = etb.tb_frame
-> 1008                 self.pdb.interaction(self.tb.tb_frame, self.tb)
   1009 
   1010         if hasattr(self,'tb'):

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/core/debugger.pyc in interaction(self, frame, traceback)
    259     def interaction(self, frame, traceback):
    260         self.shell.set_completer_frame(frame)
--> 261         OldPdb.interaction(self, frame, traceback)
    262 
    263     def new_do_up(self, arg):

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pdb.pyc in interaction(self, frame, traceback)
    208         self.setup(frame, traceback)
    209         self.print_stack_entry(self.stack[self.curindex])
--> 210         self.cmdloop()
    211         self.forget()
    212 

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/cmd.pyc in cmdloop(self, intro)
    128                     if self.use_rawinput:
    129                         try:
--> 130                             line = raw_input(self.prompt)
    131                         except EOFError:
    132                             line = 'EOF'

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/kernel/zmq/ipkernel.pyc in <lambda>(prompt)
    353         # raw_input in the user namespace.
    354         if content.get('allow_stdin', False):
--> 355             raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
    356         else:
    357             raw_input = lambda prompt='' : self._no_raw_input()

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/kernel/zmq/ipkernel.pyc in _raw_input(self, prompt, ident, parent)
    765         while True:
    766             try:
--> 767                 ident, reply = self.session.recv(self.stdin_socket, 0)
    768             except Exception:
    769                 self.log.warn("Invalid Message:", exc_info=True)

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/IPython/kernel/zmq/session.pyc in recv(self, socket, mode, content, copy)
    646             socket = socket.socket
    647         try:
--> 648             msg_list = socket.recv_multipart(mode, copy=copy)
    649         except zmq.ZMQError as e:
    650             if e.errno == zmq.EAGAIN:

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/zmq/core/pysocket.pyc in recv_multipart(self, flags, copy, track)
    208 
    209     """
--> 210     parts = [self.recv(flags, copy=copy, track=track)]
    211     # have first part already, only loop while more to receive
    212     while self.getsockopt(zmq.RCVMORE):

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/zmq/core/socket.so in zmq.core.socket.Socket.recv (zmq/core/socket.c:5677)()

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/zmq/core/socket.so in zmq.core.socket.Socket.recv (zmq/core/socket.c:5499)()

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/zmq/core/socket.so in zmq.core.socket._recv_copy (zmq/core/socket.c:1724)()

/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/zmq/core/error.so in zmq.core.error.ZMQError.__init__ (zmq/core/error.c:1081)()

KeyboardInterrupt: 

In [ ]: