In [1]:
from IPython import parallel
c = parallel.Client()
dv = c[:]
dv.block = True
print len(dv.targets), dv.targets
lv = c.load_balanced_view()
lv.block = True
with dv.sync_imports():
from time import time
import numpy
import cdms2
In [2]:
def in_chunks (files, tot = None, per = 500):
current = (0, 0)
got = 0
chunks = []
i = 0
j = 0
f = None
done = False
while not done:
this_got = 0
chunk = []
chunks.append(chunk)
while True:
if f is None:
if len(files) == i:
done = True
break
f = cdms2.open(files[i])
n = f[name].shape[0]
remain = n - j
want = per - this_got
if tot is not None:
tot_want = tot - got
if tot is not None and want >= tot_want and remain >= tot_want:
# finish
chunk.append((i, j, tot_want))
done = True
f.close()
break
elif (tot is None or tot_want >= want) and remain >= want:
# next chunk
chunk.append((i, j, want))
j += want
this_got += want
got += want
break
else:
# next file
chunk.append((i, j, remain))
j = 0
this_got += remain
got += remain
f.close()
f = None
i += 1
return chunks
In [3]:
def regrid_one (chunks):
from_chunk, to_chunk, timed = chunks
if timed:
t0 = time()
f_to = cdms2.open(fs_to[to_chunk[0][0]])
to_grid = f_to[name].getGrid()
for i, start, n in from_chunk:
f_from = cdms2.open(fs_from[i])
v = f_from[name][start:start + n]
v.regrid(to_grid)
f_from.close()
f_to.close()
if timed:
return time() - t0
else:
return []
def regrid_one_diffmean (chunks):
from_chunk, to_chunk, timed = chunks
if timed:
t0 = time()
shapes = []
to_arrs = []
first = True
for i, start, n in to_chunk:
f_to = cdms2.open(fs_to[i])
to_arrs.append(numpy.array(f_to[name][start:start + n]))
if first:
f = f_to
to_grid = f_to[name].getGrid()
else:
f_to.close()
from_arrs = []
for i, start, n in from_chunk:
f_from = cdms2.open(fs_from[i])
v = f_from[name][start:start + n]
from_arrs.append(numpy.array(v.regrid(to_grid)))
f_from.close()
f.close()
a = numpy.vstack(to_arrs) - numpy.vstack(from_arrs)
if timed:
return time() - t0
return a.mean(1).mean(1)
def regrid (n, view = None, timed = False, do_one = regrid_one):
if view is not None:
dv.push({'fs_from': fs_from, 'fs_to': fs_to, 'name': name})
do = view.map
else:
do = map
from_chunks = in_chunks(fs_from, n)
to_chunks = in_chunks(fs_to, n)
rtn = do(do_one, zip(from_chunks, to_chunks, [timed] * len(from_chunks)))
if timed:
return rtn
else:
return numpy.hstack(rtn)
In [4]:
def get_sizes ():
ns = []
for fs in (fs_from, fs_to):
n = 0
for f in fs:
f = cdms2.open(f)
n += f[name].shape[0]
f.close()
ns.append(n)
return ns
In [17]:
fs_from =
fs_to =
name =
In [18]:
fn = regrid_one
n = min(get_sizes())
print n
%timeit regrid(n, None, False, fn)
%timeit regrid(n, dv, False, fn)
#%timeit regrid(n, lv, False, fn)
#print (regrid_serial(n) == regrid_parallel(n)).all()
In [6]:
n = min(get_sizes())
print n
for view in (lv, dv):
t0 = time()
result = regrid(n, view, True)
s = sum(result)
dt = time() - t0
print s, dt, s / dt, view