In [1]:
infile1 = "/g/data1/r87/drstree/CMIP5/GCM/CSIRO-QCCCE/CSIRO-Mk3-6-0/historicalMisc/mon/ocean/thetao/r1i1p1/thetao_Omon_CSIRO-Mk3-6-0_historicalMisc_r1i1p1_185001-185912.nc"
infile2 = "/g/data1/r87/drstree/CMIP5/GCM/CSIRO-QCCCE/CSIRO-Mk3-6-0/historicalMisc/mon/ocean/thetao/r1i1p1/thetao_Omon_CSIRO-Mk3-6-0_historicalMisc_r1i1p1_186001-186912.nc"
allfiles = "/g/data1/r87/drstree/CMIP5/GCM/CSIRO-QCCCE/CSIRO-Mk3-6-0/historicalMisc/mon/ocean/thetao/r1i1p1/thetao_Omon_CSIRO-Mk3-6-0_historicalMisc_r1i1p1_*.nc"
From the documentation: xarray
uses Dask, which divides arrays into many small pieces, called chunks, each of which is presumed to be small enough to fit into memory.
Unlike NumPy, which has eager evaluation, operations on dask arrays are lazy. Operations queue up a series of tasks mapped over blocks, and no computation is performed until you actually ask values to be computed (e.g., to print results to your screen or write to disk).
In [2]:
import xarray
In [3]:
ds = xarray.open_mfdataset(allfiles) #chunks={'lev': 1, 'time': 1956})
Note that the default chunking will have each file in a separate chunk. You can't change this with the chunk option (i.e. the commented code above still chunks along the time axis (as well as the level axis)), so you have to rechunk later on (see below).
In [5]:
print ds
In [6]:
ds.nbytes * (2 ** -30)
Out[6]:
So there's 16.4 GB of data, according to the conversion that Stephan Hoyer does at this blog post.
In [7]:
ds.chunks
Out[7]:
You can re-chunk your data like so, where the number represents the size of each individual chunk. This might be useful when you want each chunk to contain the entire time axis.
In [8]:
rechunked = ds.chunk({'time': 1956, 'lev': 1})
In [9]:
rechunked.chunks
Out[9]:
From the documentation: You can convert an xarray data structure from lazy dask arrays into eager, in-memory numpy arrays using the load()
method (i.e. ds.load()
), or make it a numpy array using the values
method of numpy.asarray()
.
In [10]:
import numpy
In [10]:
darray = ds['thetao']
In [11]:
print darray
In [12]:
climatology_eager = darray.values.mean(axis=0)
In [11]:
climatology_lazy = ds.mean('time')
In [12]:
%%time
climatology_lazy.to_netcdf("/g/data/r87/dbi599/lazy.nc")
Notice that the computation used only 25 seconds of wall clock time, but 47 seconds of CPU time. It's definitely using 2 cores.
The simple numpy functions are available through xarray (see here for the notes on import xarray.ufuncs as xu
), so doing something like a mean or standard deviation is trivial. For more complex functions, you need to use the map_blocks()
method associted with dask arrays. Below I'll try this for the task of fitting a cubic polynomial:
In [13]:
% matplotlib inline
In [14]:
import matplotlib.pyplot as plt
x = [2.53240, 1.91110, 1.18430, 0.95784, 0.33158,
-0.19506, -0.82144, -1.64770, -1.87450, -2.2010]
y = [-2.50400, -1.62600, -1.17600, -0.87400, -0.64900,
-0.477000, -0.33400, -0.20600, -0.10100, -0.00600]
coefficients = numpy.polyfit(x, y, 3)
polynomial = numpy.poly1d(coefficients)
xs = numpy.arange(-2.2, 2.6, 0.1)
ys = polynomial(xs)
plt.plot(x, y, 'o')
plt.plot(xs, ys)
plt.ylabel('y')
plt.xlabel('x')
plt.show()
In [15]:
def cubic_fit(data_series):
"""Fit a cubic polynomial to a 1D numpy array."""
x = numpy.arange(0, len(data_series))
coefficients = numpy.polyfit(x, data_series, 3)
polynomial = numpy.poly1d(coefficients)
return polynomial(x)
def cubic_fit_ds(dataset):
"""Fit a cubic polynomial to an xarray dataset."""
return numpy.apply_along_axis(cubic_fit, 0, dataset)
In [16]:
import dask.array as da
#dask_array = da.from_array(rechunked['thetao'], chunks=(1956, 1, 189, 192))
dask_array = rechunked['thetao'].data
In [17]:
print dask_array
In [18]:
cubic_data = dask_array.map_blocks(cubic_fit_ds)
In [19]:
cubic_data.chunks
Out[19]:
In [20]:
cubic_data.shape
Out[20]:
In [21]:
new_ds = xarray.Dataset({'thetao': (('time', 'lev', 'lat', 'lon',), cubic_data)})
In [22]:
file_nums = range(0,31)
paths = ['/g/data/r87/dbi599/dask_%s.nc' %f for f in file_nums]
xarray.save_mfdataset(new_ds, paths)
In [49]:
ds_small = xarray.open_mfdataset(infile2)
In [50]:
print ds_small
In [53]:
type(ds_small['thetao'].data)
Out[53]:
In [54]:
cubic_data_small = ds_small['thetao'].data.map_blocks(cubic_fit_ds)
In [58]:
file_nums = range(0,1)
paths = ['/g/data/r87/dbi599/dask_%s.nc' %f for f in file_nums]
xarray.save_mfdataset([cubic_data_small], paths)
How do you write a dask array to a netCDF file??? Perhaps manually convert each chuck to a numpy array / xarray dataArray?
In [2]:
import iris
In [3]:
cube = iris.load_cube([infile1, infile2])
In [7]:
history = []
def edit_attributes(cube, field, filename):
cube.attributes.pop('creation_date', None)
cube.attributes.pop('tracking_id', None)
history.append(cube.attributes['history'])
cube.attributes.pop('history', None)
In [8]:
cubes = iris.load([infile1, infile2], 'sea_water_potential_temperature', callback=edit_attributes)
In [9]:
print cubes
In [10]:
#iris.util.unify_time_units(cubes)
cubes = cubes.concatenate_cube()
In [11]:
print cubes
In [12]:
print len(history)
print history
In [14]:
for i, x_slice in enumerate(cubes.slices(['time', 'latitude', 'longitude'])):
print(i, x_slice.shape)
In [17]:
coord_names = [coord.name() for coord in cubes.coords()]
In [18]:
print coord_names
In [20]:
cubes.aux_coords
Out[20]:
In [29]:
str(cubes.coord('time').units)
Out[29]:
In [ ]: