Expression Chunking

We evaluate computations out-of-core by breaking expressions on entire datasets into two pieces

  1. What to do on each chunk of a dataset
  2. What to do on the concatenation of computed chunks

This works well with the computation includes some sort of reduction

Simple example

Given a long vector of integers we compute the sum of the vector by

  1. Computing the sum of each 1000000 long chunk
  2. Computing the sum of the aggregated results

In [1]:
from blaze import *
from blaze.expr.split import split

x = Symbol('x', '1000000000 * int')
chunk = Symbol('chunk', '1000000 * int')

split(x, x.sum(), chunk=chunk)


Out[1]:
((chunk, sum(chunk, keepdims=True)), (aggregate, sum(aggregate)))

We reason about the shapes of all of the pieces so that intermediates may be preallocated


In [2]:
(chunk, chunk_expr), (aggregate, aggregate_expr) = split(x, x.sum(), chunk=chunk)

In [3]:
chunk_expr.dshape


Out[3]:
dshape("1 * int32")

In [4]:
aggregate.dshape


Out[4]:
dshape("1000 * int32")

More complex cases

Computing a chunked sum is simple. Lets consider more complex cases

Count

When computing the count we count each chunk and then sum the results


In [5]:
split(x, count(x), chunk=chunk)


Out[5]:
((chunk, count(chunk, keepdims=True)), (aggregate, sum(aggregate)))

In [6]:
split(x, x.std(), chunk=chunk)


Out[6]:
((chunk,
  summary(n=count(chunk), x=sum(chunk), x2=sum(chunk ** 2), keepdims=True)),
 (aggregate,
  sqrt((sum(aggregate.x2) / (sum(aggregate.n) * 1.0)) - ((sum(aggregate.x) / (sum(aggregate.n) * 1.0)) ** 2))))

Split apply combine

Split-apply-combine operations are also reductive


In [7]:
t = Symbol('t', 'var * {name: string, amount: int}')

In [8]:
split(t, by(t.name, avg=t.amount.mean()))


Out[8]:
((chunk,
  by(chunk.name, avg_count=count(chunk.amount), avg_total=sum(chunk.amount))),
 (aggregate,
  by(aggregate.name, avg=(1.0 * sum(aggregate.avg_total)) / sum(aggregate.avg_count))))

We are smart enough to cut the expression at the right place, deciding which operations should occur on each chunk, and which must occur after the aggregation


In [9]:
t2 = t[t.amount > 1000]

In [10]:
split(t, by(t2.name, total=t2.amount.sum() / 10))


Out[10]:
((chunk,
  by(chunk[chunk.amount > 1000].name, total=sum(chunk[chunk.amount > 1000].amount))),
 (aggregate, by(aggregate.name, total=sum(aggregate.total) / 10)))

N-Dimensions

This works equally well in N-Dimensions


In [11]:
x = Symbol('x', '1000000 * 2000000 * float64')
chunk = Symbol('chunk', '1000 * 1000 * float64')

In [12]:
split(x, sum(2*x, axis=0), chunk=chunk)


Out[12]:
((chunk, sum(2 * chunk, keepdims=True, axis=(0,))),
 (aggregate, sum(aggregate, axis=(0,))))

The datashapes of the various stages of computation


In [13]:
(chunk, chunk_expr), (agg, agg_expr) = split(x, sum(2*x, axis=0), chunk=chunk)

In [14]:
chunk.dshape, chunk_expr.dshape, agg.dshape, agg_expr.dshape


Out[14]:
(dshape("1000 * 1000 * float64"),
 dshape("1 * 1000 * float64"),
 dshape("1000 * 2000000 * float64"),
 dshape("2000000 * float64"))

Practical Example: Combine NumPy and HDF5

NumPy is a computationally rich in-memory container. H5Py is a computationally poor on-disk container. We use chunking to apply a numpy computational layer onto HDF5 files using chunking.

Create a simple data set


In [15]:
!rm foo.hdf5


rm: cannot remove ‘foo.hdf5’: No such file or directory

In [16]:
import h5py

f = h5py.File('foo.hdf5')

In [17]:
x = np.arange(20*24, dtype='f4').reshape((20, 24))

In [18]:
d = f.create_dataset('/x', shape=x.shape, dtype=x.dtype,
                           fillvalue=0.0, chunks=(4, 6))
d[:] = x

In [19]:
d


Out[19]:
<HDF5 dataset "x": shape (20, 24), type "<f4">

Combining NumPy and HDF5

Consider the following expression in NumPy


In [20]:
(x + 1).sum(axis=0)


Out[20]:
array([ 4580.,  4600.,  4620.,  4640.,  4660.,  4680.,  4700.,  4720.,
        4740.,  4760.,  4780.,  4800.,  4820.,  4840.,  4860.,  4880.,
        4900.,  4920.,  4940.,  4960.,  4980.,  5000.,  5020.,  5040.], dtype=float32)

We can't do this on h5py datasets


In [21]:
(d + 1).sum(axis=0)  # d is an h5py dataset


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-21-fcd5b640e7f4> in <module>()
----> 1 (d + 1).sum(axis=0)  # d is an h5py dataset

TypeError: unsupported operand type(s) for +: 'Dataset' and 'int'

But if we wrap it in Blaze then we can do this computation via chunking


In [22]:
b = Data(d)

In [23]:
compute((b + 1).sum(axis=0))  # b is an h5py dataset wrapped by Blaze


Out[23]:
array([ 4580.,  4600.,  4620.,  4640.,  4660.,  4680.,  4700.,  4720.,
        4740.,  4760.,  4780.,  4800.,  4820.,  4840.,  4860.,  4880.,
        4900.,  4920.,  4940.,  4960.,  4980.,  5000.,  5020.,  5040.], dtype=float32)

Cleanup


In [24]:
f.close()

In [25]:
!rm foo.hdf5