In [2]:
from IPython import parallel
from glob import glob
from datetime import datetime
from matplotlib import pylab as plt
from mpl_toolkits.basemap import Basemap

c = parallel.Client()
dv = c[:]
dv.block = True
with dv.sync_imports():
    import numpy
    from netCDF4 import Dataset, date2num
    import cdms2

data_dir = '/data/cmip5/output1/MOHC/HadGEM2-ES/rcp85/mon/atmos/' \
           'Amon/r1i1p1/v20111215/tas/'
fs = glob(data_dir + 'tas_Amon_HadGEM2-ES_rcp85_r1i1p1_*-*.nc')
t0 = datetime(2010, 1, 1)
t1 = datetime(2010, 2, 1)


importing numpy on engine(s)
importing Dataset,date2num from netCDF4 on engine(s)
importing cdms2 on engine(s)

In [3]:
with dv.sync_imports():
    import cf


importing cf on engine(s)

In [4]:
def get_data_netcdf4 ():
    datasets = []
    for f in fs:
        d = Dataset(f)
        time = d.variables['time']
        n = len(time)
        if n > 0:
            this_t0 = date2num(t0, time.units, time.calendar)
            this_t1 = date2num(t1, time.units, time.calendar)
            if time[n - 1] >= this_t0 and time[0] < this_t1:
                # add data for each time in the right range
                lat = d.variables['lat']
                lon = d.variables['lon']
                tas = d.variables['tas']
                for i_t, t in enumerate(time[:]):
                    if this_t0 <= t < this_t1:
                        datasets.append((t, lon[:], lat[:], tas[i_t,:,:]))
        d.close()
    return datasets

def get_time_range_cf (time):
    f = cf.Coordinate()
    f._data = cf.Data(numpy.array([0, 0]))
    u = cf.Units('months since 2010-1-1')
    u.calendar = time.calendar
    f.override_Units(u)
    f.slice[:] = [0, 1]
    f.units = time.units
    return f.array

def get_data_cf ():
    fields = cf.read(fs)
    datasets = []
    for f in fields:
        if f.name() == 'air_temperature':
            s = f.space
            time = s['dim0']
            if time.size == 0:
                continue
            else:
                t_1d = time.size == 1
            this_t0, this_t1 = get_time_range_cf(time)
            time = time.varray
            if t_1d:
                time = [time]
            if time[-1] >= this_t0 and time[0] < this_t1:
                # add data for each time in the right range
                lat = s['dim1']
                lon = s['dim2']
                for i_t, t in enumerate(time):
                    if this_t0 <= t < this_t1:
                        if t_1d:
                            tas = f.slice[:,:].array
                        else:
                            tas = f.slice[i_t,:,:].array[0]
                        datasets.append((t, lon.array, lat.array, tas))
    cf.close()
    return datasets

def get_data_cdms2 ():
    datasets = []
    for f in fs:
        f = cdms2.open(f)
        tas = f('tas')
        time, lat, lon = tas.getAxisList()
        if len(time) > 0:
            this_t0 = date2num(t0, time.units, time.calendar)
            this_t1 = date2num(t1, time.units, time.calendar)
            if time[-1] >= this_t0 and time[0] < this_t1:
                # add data for each time in the right range
                for i_t, t in enumerate(time):
                    if this_t0 <= t < this_t1:
                        tas_data = numpy.array(tas[i_t,:,:])
                        datasets.append((t, lon[:], lat[:], tas_data))
        f.close()
    return datasets

def get_data (method, in_parallel):
    if method == 0:
        get_data = get_data_netcdf4
    elif method == 1:
        get_data = get_data_cf
    else: # method == 2
        get_data = get_data_cdms2
    if in_parallel:
        dv.scatter('fs', fs)
        dv.push({'t0': t0, 't1': t1})
        if method == 1:
            dv['get_time_range_cf'] = get_time_range_cf
        sum = __builtin__.sum # not sure where sum becomes numpy.sum...
        datasets = sum(dv.apply(get_data), [])
    else:
        datasets = serial_data = get_data()
    return datasets

In [6]:
# time some data processing in serial and parallel
all_datasets = []
for method in xrange(3):
    method_name = ('netcdf', 'cf', 'cdat_lite')[method]
    # check method is supported
    try:
        if method == 0:
            Dataset
            dv.execute('Dataset')
        elif method == 1:
            cf
            dv.execute('cf')
        else: # method == 2
            cdms2
            dv.execute('cdms2')
    except (NameError, parallel.CompositeError):
        print 'unsupported method:', method_name
        continue
    # run computation
    for in_parallel in (False, True):
        print method_name, ('serial', 'parallel')[in_parallel]
        %timeit all_datasets.append(get_data(method, in_parallel))
# check all methods return the same data
print 'results the same:',
print all(all([(x == y).all() if isinstance(x, numpy.ndarray) else x == y \
               for x, y in zip(d1, d2)] \
              for d1, d2 in zip(all_datasets[i], all_datasets[i + 1])) \
          for i in xrange(len(all_datasets) - 1))
if all_datasets:
    datasets = all_datasets[0]
del all_datasets


netcdf serial
100 loops, best of 3: 18.2 ms per loop
netcdf parallel
10 loops, best of 3: 45.5 ms per loop
cf serial
1 loops, best of 3: 435 ms per loop
cf parallel
1 loops, best of 3: 430 ms per loop
cdat_lite serial
1 loops, best of 3: 1.57 s per loop
cdat_lite parallel
1 loops, best of 3: 1.12 s per loop
results the same: True

In [1]:
# standard plot
num_levels = 100
x, y, z = datasets[0][1:]
plot = plt.contourf(x, y, z, num_levels)
plt.colorbar()

In [1]:
# Mercator projection
lon, lat, tas = datasets[0][1:]
m = Basemap(projection = 'merc', llcrnrlat = -80, urcrnrlat = 80,
            llcrnrlon = lon[0], urcrnrlon = lon[-1], lat_ts = 0)
m.drawcoastlines()
#m.drawparallels(range(-90, 90, 30))
#m.drawmeridians(range(-0, 360, 30))
x, y = m(*np.meshgrid(lon, lat))
m.contourf(x, y, tas, 100)
plt.colorbar()