In [1]:
from IPython import parallel

c = parallel.Client()
dv = c[:]
dv.block = True
print len(dv.targets), dv.targets
lv = c.load_balanced_view()
lv.block = True
with dv.sync_imports():
    from time import time
    import numpy
    import cdms2


4 [0, 1, 2, 3]
importing time from time on engine(s)
importing numpy on engine(s)
importing cdms2 on engine(s)

In [2]:
def in_chunks (files, tot = None, per = 500):
    current = (0, 0)
    got = 0
    chunks = []
    i = 0
    j = 0
    f = None
    done = False
    while not done:
        this_got = 0
        chunk = []
        chunks.append(chunk)
        while True:
            if f is None:
                if len(files) == i:
                    done = True
                    break
                f = cdms2.open(files[i])
                n = f[name].shape[0]
            remain = n - j
            want = per - this_got
            if tot is not None:
                tot_want = tot - got
            if tot is not None and want >= tot_want and remain >= tot_want:
                # finish
                chunk.append((i, j, tot_want))
                done = True
                f.close()
                break
            elif (tot is None or tot_want >= want) and remain >= want:
                # next chunk
                chunk.append((i, j, want))
                j += want
                this_got += want
                got += want
                break
            else:
                # next file
                chunk.append((i, j, remain))
                j = 0
                this_got += remain
                got += remain
                f.close()
                f = None
                i += 1
    return chunks

In [3]:
def regrid_one (chunks):
    from_chunk, to_chunk, timed = chunks
    if timed:
        t0 = time()
    f_to = cdms2.open(fs_to[to_chunk[0][0]])
    to_grid = f_to[name].getGrid()
    for i, start, n in from_chunk:
        f_from = cdms2.open(fs_from[i])
        v = f_from[name][start:start + n]
        v.regrid(to_grid)
        f_from.close()
    f_to.close()
    if timed:
        return time() - t0
    else:
        return []


def regrid_one_diffmean (chunks):
    from_chunk, to_chunk, timed = chunks
    if timed:
        t0 = time()
    shapes = []
    to_arrs = []
    first = True
    for i, start, n in to_chunk:
        f_to = cdms2.open(fs_to[i])
        to_arrs.append(numpy.array(f_to[name][start:start + n]))
        if first:
            f = f_to
            to_grid = f_to[name].getGrid()
        else:
            f_to.close()
    from_arrs = []
    for i, start, n in from_chunk:
        f_from = cdms2.open(fs_from[i])
        v = f_from[name][start:start + n]
        from_arrs.append(numpy.array(v.regrid(to_grid)))
        f_from.close()
    f.close()
    a = numpy.vstack(to_arrs) - numpy.vstack(from_arrs)
    if timed:
        return time() - t0
    return a.mean(1).mean(1)

def regrid (n, view = None, timed = False, do_one = regrid_one):
    if view is not None:
        dv.push({'fs_from': fs_from, 'fs_to': fs_to, 'name': name})
        do = view.map
    else:
        do = map
    from_chunks = in_chunks(fs_from, n)
    to_chunks = in_chunks(fs_to, n)
    rtn = do(do_one, zip(from_chunks, to_chunks, [timed] * len(from_chunks)))
    if timed:
        return rtn
    else:
        return numpy.hstack(rtn)

In [4]:
def get_sizes ():
    ns = []
    for fs in (fs_from, fs_to):
        n = 0
        for f in fs:
            f = cdms2.open(f)
            n += f[name].shape[0]
            f.close()
        ns.append(n)
    return ns

In [17]:
fs_from = 
fs_to = 
name =

In [18]:
fn = regrid_one
n = min(get_sizes())
print n
%timeit regrid(n, None, False, fn)
%timeit regrid(n, dv, False, fn)
#%timeit regrid(n, lv, False, fn)
#print (regrid_serial(n) == regrid_parallel(n)).all()


34675
1 loops, best of 3: 170 s per loop
1 loops, best of 3: 50 s per loop

In [6]:
n = min(get_sizes())
print n
for view in (lv, dv):
    t0 = time()
    result = regrid(n, view, True)
    s = sum(result)
    dt = time() - t0
    print s, dt, s / dt, view


34675
230.216465712 58.1636180878 3.95808364886 <LoadBalancedView None>
93.7215733528 31.3307859898 2.99135723513 <DirectView [0, 1, 2, 3]>