In [1]:
import numpy as np
import xray
import dask.array as daskarray
from matplotlib import pyplot as plt
%matplotlib inline

In [2]:
import resource
resource.setrlimit(resource.RLIMIT_NOFILE, (4096 ,4096 ))
resource.getrlimit(resource.RLIMIT_NOFILE)


Out[2]:
(4096, 4096)

In [4]:
import xgcm

In [5]:
iters = range(2073840, 2384880, 480)
ddir = '/data/scratch/rpa/channel_moc/GCM/run'
ds = xgcm.open_mdsdataset(ddir, iters, deltaT=900).chunk()
ds


xgcm/mdsxray.py:201: UserWarning: Not sure what to do with rlev = L
  warnings.warn("Not sure what to do with rlev = " + rlev)
xgcm/mdsxray.py:201: UserWarning: Not sure what to do with rlev = X
  warnings.warn("Not sure what to do with rlev = " + rlev)
Out[5]:
<xray.Dataset>
Dimensions:  (X: 200, Xp1: 200, Y: 400, Yp1: 400, Z: 40, Zl: 40, Zp1: 41, Zu: 40, time: 648)
Coordinates:
  * Xp1      (Xp1) >f4 0.0 5000.0 10000.0 15000.0 20000.0 25000.0 30000.0 ...
  * Zl       (Zl) >f4 0.0 -10.0 -20.0 -30.0 -42.0 -56.0 -72.0 -91.0 -113.0 ...
  * Yp1      (Yp1) >f4 0.0 5000.0 10000.0 15000.0 20000.0 25000.0 30000.0 ...
  * Zp1      (Zp1) >f4 0.0 -10.0 -20.0 -30.0 -42.0 -56.0 -72.0 -91.0 -113.0 ...
  * Y        (Y) >f4 2500.0 7500.0 12500.0 17500.0 22500.0 27500.0 32500.0 ...
  * X        (X) >f4 2500.0 7500.0 12500.0 17500.0 22500.0 27500.0 32500.0 ...
  * Z        (Z) >f4 -5.0 -15.0 -25.0 -36.0 -49.0 -64.0 -81.5 -102.0 -126.0 ...
  * Zu       (Zu) >f4 -10.0 -20.0 -30.0 -42.0 -56.0 -72.0 -91.0 -113.0 ...
  * time     (time) int64 1866456000 1866888000 1867320000 1867752000 ...
Data variables:
    YC       (Y, X) >f4 2500.0 2500.0 2500.0 2500.0 2500.0 2500.0 2500.0 ...
    YG       (Yp1, Xp1) >f4 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    rA       (Y, X) >f4 2.5e+07 2.5e+07 2.5e+07 2.5e+07 2.5e+07 2.5e+07 ...
    dxG      (Yp1, X) >f4 5000.0 5000.0 5000.0 5000.0 5000.0 5000.0 5000.0 ...
    dxC      (Y, Xp1) >f4 5000.0 5000.0 5000.0 5000.0 5000.0 5000.0 5000.0 ...
    XC       (Y, X) >f4 2500.0 7500.0 12500.0 17500.0 22500.0 27500.0 ...
    XG       (Yp1, Xp1) >f4 0.0 5000.0 10000.0 15000.0 20000.0 25000.0 ...
    Depth    (Y, X) >f4 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    drC      (Zp1) >f4 5.0 10.0 10.0 11.0 13.0 15.0 17.5 20.5 24.0 28.0 33.0 ...
    drF      (Z) >f4 10.0 10.0 10.0 12.0 14.0 16.0 19.0 22.0 26.0 30.0 36.0 ...
    HFacS    (Z, Yp1, X) >f4 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    HFacW    (Z, Y, Xp1) >f4 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    HFacC    (Z, Y, X) >f4 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    rAw      (Y, Xp1) >f4 2.5e+07 2.5e+07 2.5e+07 2.5e+07 2.5e+07 2.5e+07 ...
    rAs      (Yp1, X) >f4 2.5e+07 2.5e+07 2.5e+07 2.5e+07 2.5e+07 2.5e+07 ...
    dyG      (Y, Xp1) >f4 5000.0 5000.0 5000.0 5000.0 5000.0 5000.0 5000.0 ...
    rAz      (Yp1, Xp1) >f4 2.5e+07 2.5e+07 2.5e+07 2.5e+07 2.5e+07 2.5e+07 ...
    dyC      (Yp1, X) >f4 5000.0 5000.0 5000.0 5000.0 5000.0 5000.0 5000.0 ...
    iter     (time) int64 2073840 2074320 2074800 2075280 2075760 2076240 ...
    THETA    (time, Z, Y, X) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    PHIHYD   (time, Z, Y, X) float32 0.07848 0.07848 0.07848 0.07848 0.07848 ...
    UVEL     (time, Z, Y, Xp1) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    VVEL     (time, Z, Yp1, X) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
    WVEL     (time, Zl, Y, X) float32 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ...
Attributes:
    history: Some made up attribute

In [6]:
ds.WVEL


Out[6]:
<xray.DataArray 'WVEL' (time: 648, Zl: 40, Y: 400, X: 200)>
dask.array<rechunk-5, shape=(648, 40, 400, 200), chunks=((1, 1, 1, ..., 1, 1), (40,), (400,), (200,)), dtype=float32>
Coordinates:
  * Zl       (Zl) >f4 0.0 -10.0 -20.0 -30.0 -42.0 -56.0 -72.0 -91.0 -113.0 ...
  * time     (time) int64 1866456000 1866888000 1867320000 1867752000 ...
  * Y        (Y) >f4 2500.0 7500.0 12500.0 17500.0 22500.0 27500.0 32500.0 ...
  * X        (X) >f4 2500.0 7500.0 12500.0 17500.0 22500.0 27500.0 32500.0 ...
Attributes:
    units: Vertical Component of Velocity (r_units/s)
    description: m/s

In [7]:
u = ds['UVEL']
u2mean = (u**2).mean()
% time u2mean.load()


CPU times: user 21 s, sys: 42 s, total: 1min 2s
Wall time: 1min 25s
Out[7]:
<xray.DataArray 'UVEL' ()>
array(0.01956135767934698)

In [8]:
v = ds['VVEL']
ubar = u.mean(dim=['Xp1', 'time'])
vbar = v.mean(dim=['X', 'time'])
uprime = u - ubar
vprime = v - vbar
# cheat because u and v are on different grids
upvp = (uprime.data * vprime.data).mean(axis=[0,-1])

In [9]:
%time upvpc = upvp.compute()
### old results
# CPU times: user 1min 11s, sys: 54 s, total: 2min 5s
# Wall time: 29.1 s


CPU times: user 1min 56s, sys: 1min 20s, total: 3min 17s
Wall time: 2min 10s

In [10]:
plt.pcolormesh(upvpc)
plt.colorbar()
plt.contour(ubar, 10, colors='k')


Out[10]:
<matplotlib.contour.QuadContourSet instance at 0x7efa48d4af80>

In [253]:
def pow_spec_2d(x, axis=[-1,-2]):
    # x and time
    f = np.fft.fftn(x, axes=axis)
    return np.real(f * f.conj())

# operate on a dataarray
def xray_isotropic_power_spectrum(da, kiso=None, axis=('Y','X')):
    axis_num = [da.get_axis_num(a) for a in axis]
    # see if there are other coords to leave alone
    other_axis = set(da.dims).difference(axis)
    if other_axis:
        other_axis_num = [da.get_axis_num for a in other_axis]
    
    # calcualte wavenumbers from axes
    N = [da.shape[n] for n in axis_num]
    delta_x = [np.diff(da[a])[0] for a in axis]
    k = [ np.fft.fftfreq(Nx, dx) for (Nx, dx) in zip(N, delta_x)]
    kk = np.array(np.meshgrid(k[1], k[0]))
    k2 = (kk**2).sum(axis=0)
    
    # set up wavenumber range
    if kiso is None:
        # no isotropic wavenumber grid specified
        kidx = np.argmin( np.array([l.max() for l in k]) )
        kiso = k[kidx][:N[kidx]/2]
    Niso = len(kiso)
    bins = np.digitize(k2.ravel(), kiso**2)
    
    # do fft
    #def iso_ps(q):
    f = np.fft.fftn(da.values, axes=axis_num)
    # sum isotropically
    fiso = np.bincount(bins,
                       weights=np.real(f*f.conj()).ravel(),
                       minlength=Niso)[:Niso]
    # replace zeros with nans
    fiso[fiso==0.] = np.nan
    return xray.DataArray(fiso, coords={'kiso': kiso})

In [202]:
s = set(sst.dims)
sd = s.difference(('Y', 'X'))
assert sd

In [254]:
kiso = np.linspace(0,(5e3)**-1, 100)
Uspec = xray_isotropic_power_spectrum(ds['UVEL'][0,0], kiso=kiso, axis=('Y','Xp1'))
Vspec = xray_isotropic_power_spectrum(ds['VVEL'][0,0], kiso=kiso, axis=('Yp1','X'))
KEspec = 0.5*(Uspec + Vspec)
SSTspec = xray_isotropic_power_spectrum(ds['THETA'][0,0], kiso=kiso, axis=('Y','X'))

plt.loglog(kiso, KEspec)
plt.loglog(kiso, SSTspec)
plt.ylim([1e4,1e10])


Out[254]:
(10000.0, 10000000000.0)

In [188]:
sst = ds['THETA'].isel(Z=0)
sstgb = sst.groupby('time')
sst_ps = sstgb.apply( lambda q: xray_isotropic_power_spectrum(q, kiso=kiso, axis=('Y','X')) )
u_ps = ds['UVEL'].isel(Z=0)

In [187]:
plt.figure(figsize=(12,8))
plt.pcolormesh(np.ma.masked_invalid(np.log10(sst_ps.values).T))
plt.clim((6,12))



In [168]:
plt.loglog(kiso, sst_ps.mean(dim='time'))


Out[168]:
[<matplotlib.lines.Line2D at 0x7fe4739fd2d0>]

In [10]:
psurf = ds['PHIHYD'][:,0]
q = psurf.data
print repr(q)


dask.array<x_22, shape=(648, 400, 200), chunks=((1, 1, 1, ..., 1, 1), (400,), (200,)), dtype=float32>

In [279]:
def iso_power_spec(sst):
    xvars = set(['X', 'Xp1'])
    yvars = set(['Y', 'Yp1'])
    xvar = xvars.intersection(sst.dims).pop()
    yvar = yvars.intersection(sst.dims).pop()
    ssta = sst - sst.mean(dim=xvar)
    hwin = np.hanning(ds[yvar].shape[0])
    window = xray.DataArray(daskarray.from_array(hwin, hwin.shape), coords={yvar: ds[yvar]})
    sst_win = ssta * window
    return sst_win.groupby('time').apply( lambda q: xray_isotropic_power_spectrum(q, kiso=kiso, axis=(yvar,xvar)) )

In [280]:
%time t_ps = iso_power_spec(ds['THETA'].isel(Z=0))
%time u_ps = iso_power_spec(ds['UVEL'].isel(Z=0))
%time v_ps = iso_power_spec(ds['VVEL'].isel(Z=0))

In [307]:
%time w_ps = iso_power_spec(ds['WVEL'].isel(Zl=5))


CPU times: user 12.6 s, sys: 1.25 s, total: 13.9 s
Wall time: 29.8 s

In [281]:
eke_ps = 0.5 * (u_ps + v_ps)

In [256]:
from matplotlib.colors import LogNorm
plt.figure(figsize=(12,8))
pc = xray.plot.pcolormesh(ssta_ps.T, norm=LogNorm())



In [312]:
plt.figure(figsize=(12,5))

ax = plt.subplot(111)
xray.plot.plot(eke_ps.mean(dim='time'))
xray.plot.plot(t_ps.mean(dim='time'))
xray.plot.plot(1e6*w_ps.mean(dim='time'))
ax.set_xscale('log')
ax.set_yscale('log')
ax.set_ylim([1e4,1e9])

x0 = np.array([1e-5, 1e-4])
plt.plot(x0, 1e-7*x0**(-3), 'k-')
plt.plot(x0, x0**(-5./3), 'k--')
plt.legend(['EKE', 'SST', 'W', r'$k^{-3}$', r'$k^{-5/3}$'])
plt.grid()



In [227]:
plt.loglog(kiso, ssta_ps.mean(dim='time'))


Out[227]:
[<matplotlib.lines.Line2D at 0x7fe471f8c2d0>]

In [14]:
%time q2dps = q.map_blocks( lambda x: pow_spec_2d(x), dtype=q.dtype).mean(axis=0).compute()
print q2dps.shape


CPU times: user 3.69 s, sys: 1.26 s, total: 4.95 s
Wall time: 1.85 s
(400, 200)

In [8]:
%time qrsc = qrs.compute()
# doesn't get beyond 100% cpu


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-8-7cdeb622734e> in <module>()
----> 1 get_ipython().magic(u'time qrsc = qrs.compute()')

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
   2305         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2306         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2307         return self.run_line_magic(magic_name, magic_arg_s)
   2308 
   2309     #-------------------------------------------------------------------------

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
   2226                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2227             with self.builtin_trap:
-> 2228                 result = fn(*args,**kwargs)
   2229             return result
   2230 

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
    191     # but it's overkill for just that one bit of state.
    192     def magic_deco(arg):
--> 193         call = lambda f, *a, **k: f(*a, **k)
    194 
    195         if callable(arg):

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)
   1164         else:
   1165             st = clock2()
-> 1166             exec(code, glob, local_ns)
   1167             end = clock2()
   1168             out = None

<timed exec> in <module>()

/home/rpa/dask/dask/array/core.pyc in compute(self, **kwargs)
    720     @wraps(compute)
    721     def compute(self, **kwargs):
--> 722         result, = compute(self, **kwargs)
    723         return result
    724 

/home/rpa/dask/dask/array/core.pyc in compute(*args, **kwargs)
    511     dsk = merge(*[arg.dask for arg in args])
    512     keys = [arg._keys() for arg in args]
--> 513     results = get(dsk, keys, **kwargs)
    514 
    515     results2 = tuple(concatenate3(x) if arg.shape else unpack_singleton(x)

/home/rpa/dask/dask/array/core.pyc in get(dsk, keys, get, **kwargs)
   1270     """
   1271     get = get or _globals['get'] or threaded.get
-> 1272     dsk2 = optimize(dsk, keys, **kwargs)
   1273     return get(dsk2, keys, **kwargs)
   1274 

/home/rpa/dask/dask/array/optimization.pyc in optimize(dsk, keys, **kwargs)
     22     dsk4 = fuse(dsk3)
     23     dsk5 = valmap(rewrite_rules.rewrite, dsk4)
---> 24     dsk6 = inline_functions(dsk5, fast_functions=fast_functions)
     25     return dsk6
     26 

/home/rpa/dask/dask/optimize.pyc in inline_functions(dsk, fast_functions, inline_constants)
    185               and dependents[k]]
    186     if keys:
--> 187         return inline(dsk, keys, inline_constants=inline_constants)
    188     else:
    189         return dsk

/home/rpa/dask/dask/optimize.pyc in inline(dsk, keys, inline_constants)
    154             continue
    155         for item in keys & get_dependencies(dsk, key):
--> 156             val = subs(val, item, keysubs[item])
    157         rv[key] = val
    158     return rv

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    298     for arg in task[1:]:
    299         if istask(arg):
--> 300             arg = subs(arg, key, val)
    301         elif isinstance(arg, list):
    302             arg = [subs(x, key, val) for x in arg]

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    300             arg = subs(arg, key, val)
    301         elif isinstance(arg, list):
--> 302             arg = [subs(x, key, val) for x in arg]
    303         elif type(arg) is type(key) and arg == key:
    304             arg = val

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    293             pass
    294         if isinstance(task, list):
--> 295             return [subs(x, key, val) for x in task]
    296         return task
    297     newargs = []

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    293             pass
    294         if isinstance(task, list):
--> 295             return [subs(x, key, val) for x in task]
    296         return task
    297     newargs = []

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    298     for arg in task[1:]:
    299         if istask(arg):
--> 300             arg = subs(arg, key, val)
    301         elif isinstance(arg, list):
    302             arg = [subs(x, key, val) for x in arg]

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    297     newargs = []
    298     for arg in task[1:]:
--> 299         if istask(arg):
    300             arg = subs(arg, key, val)
    301         elif isinstance(arg, list):

/home/rpa/dask/dask/core.pyc in istask(x)
     25 
     26 
---> 27 def istask(x):
     28     """ Is x a runnable task?
     29 

KeyboardInterrupt: 

In [7]:
%time ps = (qrs.mean(axis=0)).compute()
# doesn't get beyond 100% cpu


---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-7-ffea121da70b> in <module>()
----> 1 get_ipython().magic(u'time ps = (qrs.mean(axis=0)).compute()')

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
   2305         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2306         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2307         return self.run_line_magic(magic_name, magic_arg_s)
   2308 
   2309     #-------------------------------------------------------------------------

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
   2226                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2227             with self.builtin_trap:
-> 2228                 result = fn(*args,**kwargs)
   2229             return result
   2230 

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
    191     # but it's overkill for just that one bit of state.
    192     def magic_deco(arg):
--> 193         call = lambda f, *a, **k: f(*a, **k)
    194 
    195         if callable(arg):

/usr/local/anaconda/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)
   1164         else:
   1165             st = clock2()
-> 1166             exec(code, glob, local_ns)
   1167             end = clock2()
   1168             out = None

<timed exec> in <module>()

/home/rpa/dask/dask/array/core.pyc in compute(self, **kwargs)
    720     @wraps(compute)
    721     def compute(self, **kwargs):
--> 722         result, = compute(self, **kwargs)
    723         return result
    724 

/home/rpa/dask/dask/array/core.pyc in compute(*args, **kwargs)
    511     dsk = merge(*[arg.dask for arg in args])
    512     keys = [arg._keys() for arg in args]
--> 513     results = get(dsk, keys, **kwargs)
    514 
    515     results2 = tuple(concatenate3(x) if arg.shape else unpack_singleton(x)

/home/rpa/dask/dask/array/core.pyc in get(dsk, keys, get, **kwargs)
   1270     """
   1271     get = get or _globals['get'] or threaded.get
-> 1272     dsk2 = optimize(dsk, keys, **kwargs)
   1273     return get(dsk2, keys, **kwargs)
   1274 

/home/rpa/dask/dask/array/optimization.pyc in optimize(dsk, keys, **kwargs)
     22     dsk4 = fuse(dsk3)
     23     dsk5 = valmap(rewrite_rules.rewrite, dsk4)
---> 24     dsk6 = inline_functions(dsk5, fast_functions=fast_functions)
     25     return dsk6
     26 

/home/rpa/dask/dask/optimize.pyc in inline_functions(dsk, fast_functions, inline_constants)
    185               and dependents[k]]
    186     if keys:
--> 187         return inline(dsk, keys, inline_constants=inline_constants)
    188     else:
    189         return dsk

/home/rpa/dask/dask/optimize.pyc in inline(dsk, keys, inline_constants)
    154             continue
    155         for item in keys & get_dependencies(dsk, key):
--> 156             val = subs(val, item, keysubs[item])
    157         rv[key] = val
    158     return rv

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    300             arg = subs(arg, key, val)
    301         elif isinstance(arg, list):
--> 302             arg = [subs(x, key, val) for x in arg]
    303         elif type(arg) is type(key) and arg == key:
    304             arg = val

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    298     for arg in task[1:]:
    299         if istask(arg):
--> 300             arg = subs(arg, key, val)
    301         elif isinstance(arg, list):
    302             arg = [subs(x, key, val) for x in arg]

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    298     for arg in task[1:]:
    299         if istask(arg):
--> 300             arg = subs(arg, key, val)
    301         elif isinstance(arg, list):
    302             arg = [subs(x, key, val) for x in arg]

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    300             arg = subs(arg, key, val)
    301         elif isinstance(arg, list):
--> 302             arg = [subs(x, key, val) for x in arg]
    303         elif type(arg) is type(key) and arg == key:
    304             arg = val

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    293             pass
    294         if isinstance(task, list):
--> 295             return [subs(x, key, val) for x in task]
    296         return task
    297     newargs = []

/home/rpa/dask/dask/core.pyc in subs(task, key, val)
    289         try:
    290             if task == key:
--> 291                 return val
    292         except ValueError:
    293             pass

KeyboardInterrupt: 

In [87]:
#plt.pcolormesh(np.fft.fftshift(np.log10(ps)))
plt.pcolormesh(np.fft.fftshift(np.log10(pow_spec_2d(ds['VVEL'][:,0,200,:]))))


Out[87]:
<matplotlib.collections.QuadMesh at 0x7f122daec7d0>

In [88]:
plt.pcolormesh(np.fft.fftshift(np.log10(pow_spec_2d(ds['VVEL'][:,20,200,:]))))


Out[88]:
<matplotlib.collections.QuadMesh at 0x7f121dcb05d0>

In [263]:
a = np.random.rand(20,20)

In [265]:
np.apply_over_axes(np.sum, a, [1,0])


Out[265]:
array([[ 190.86095905]])

In [277]:
s = set(['X']).intersection(sst.dims)

In [278]:
s.pop()


Out[278]:
'X'

In [ ]: