Profile array copy via dask threaded scheduler

This notebook profiles a very simple array copy operation, using synthetic data.


In [1]:
import sys
sys.path.insert(0, '..')
import zarr
print('zarr', zarr.__version__)
from zarr import blosc
import numpy as np
import h5py
import bcolz
# don't let bcolz use multiple threads internally, we want to 
# see whether dask can make good use of multiple CPUs
bcolz.set_nthreads(1)
import multiprocessing
import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize
from cachey import nbytes
import bokeh
from bokeh.io import output_notebook
output_notebook()


zarr 1.0.1.dev18+dirty
Loading BokehJS ...

In [2]:
import tempfile
import operator
from functools import reduce
from zarr.util import human_readable_size


def h5fmem(**kwargs):
    """Convenience function to create an in-memory HDF5 file."""

    # need a file name even tho nothing is ever written
    fn = tempfile.mktemp()

    # file creation args
    kwargs['mode'] = 'w'
    kwargs['driver'] = 'core'
    kwargs['backing_store'] = False

    # open HDF5 file
    h5f = h5py.File(fn, **kwargs)

    return h5f


def h5d_diagnostics(d):
    """Print some diagnostics on an HDF5 dataset."""
    
    print(d)
    nbytes = reduce(operator.mul, d.shape) * d.dtype.itemsize
    cbytes = d._id.get_storage_size()
    if cbytes > 0:
        ratio = nbytes / cbytes
    else:
        ratio = np.inf
    r = '  compression: %s' % d.compression
    r += '; compression_opts: %s' % d.compression_opts
    r += '; shuffle: %s' % d.shuffle
    r += '\n  nbytes: %s' % human_readable_size(nbytes)
    r += '; nbytes_stored: %s' % human_readable_size(cbytes)
    r += '; ratio: %.1f' % ratio
    r += '; chunks: %s' % str(d.chunks)
    print(r)

In [3]:
def profile_dask_copy(src, dst, chunks, num_workers=multiprocessing.cpu_count(), dt=0.1, lock=True):
    dsrc = da.from_array(src, chunks=chunks)
    with Profiler() as prof, ResourceProfiler(dt=dt) as rprof:
        da.store(dsrc, dst, num_workers=num_workers, lock=lock)
    visualize([prof, rprof], min_border_top=60, min_border_bottom=60)

NumPy arrays


In [4]:
# a1 = np.arange(400000000, dtype='i4')
a1 = np.random.normal(2000, 1000, size=200000000).astype('u2')
a1


Out[4]:
array([1314, 2727, 2905, ..., 1958, 1325, 1971], dtype=uint16)

In [5]:
human_readable_size(a1.nbytes)


Out[5]:
'381.5M'

In [6]:
a2 = np.empty_like(a1)

In [7]:
chunks = 2**20,  # 4M

In [8]:
%time a2[:] = a1


CPU times: user 56 ms, sys: 36 ms, total: 92 ms
Wall time: 91.7 ms

In [9]:
profile_dask_copy(a1, a2, chunks, lock=True, dt=.01)



In [10]:
profile_dask_copy(a1, a2, chunks, lock=False, dt=.01)


Zarr arrays (in-memory)


In [11]:
z1 = zarr.array(a1, chunks=chunks, compression='blosc', 
                compression_opts=dict(cname='lz4', clevel=1, shuffle=2))
z1


Out[11]:
zarr.core.Array((200000000,), uint16, chunks=(1048576,), order=C)
  compression: blosc; compression_opts: {'clevel': 1, 'cname': 'lz4', 'shuffle': 2}
  nbytes: 381.5M; nbytes_stored: 318.2M; ratio: 1.2; initialized: 191/191
  store: builtins.dict

In [12]:
z2 = zarr.empty_like(z1)
z2


Out[12]:
zarr.core.Array((200000000,), uint16, chunks=(1048576,), order=C)
  compression: blosc; compression_opts: {'clevel': 1, 'cname': 'lz4', 'shuffle': 2}
  nbytes: 381.5M; nbytes_stored: 294; ratio: 1360544.2; initialized: 0/191
  store: builtins.dict

In [13]:
profile_dask_copy(z1, z2, chunks, lock=True, dt=.02)



In [14]:
profile_dask_copy(z1, z2, chunks, lock=False, dt=0.02)



In [15]:
# for comparison, using blosc internal threads
%timeit -n3 -r5 z2[:] = z1


3 loops, best of 5: 251 ms per loop

In [17]:
%prun z2[:] = z1


 

Without the dask lock, we get better CPU utilisation.

HDF5 datasets (in-memory)


In [16]:
h5f = h5fmem()
h5f


Out[16]:
<HDF5 file "tmp0kuotgrr" (mode r+)>

In [17]:
h1 = h5f.create_dataset('h1', data=a1, chunks=chunks, compression='lzf', shuffle=True)
h5d_diagnostics(h1)


<HDF5 dataset "h1": shape (200000000,), type "<u2">
  compression: lzf; compression_opts: None; shuffle: True
  nbytes: 381.5M; nbytes_stored: 357.4M; ratio: 1.1; chunks: (1048576,)

In [18]:
h2 = h5f.create_dataset('h2', shape=h1.shape, chunks=h1.chunks, 
                        compression=h1.compression, compression_opts=h1.compression_opts, 
                        shuffle=h1.shuffle)
h5d_diagnostics(h2)


<HDF5 dataset "h2": shape (200000000,), type "<f4">
  compression: lzf; compression_opts: None; shuffle: True
  nbytes: 762.9M; nbytes_stored: 0; ratio: inf; chunks: (1048576,)

In [19]:
profile_dask_copy(h1, h2, chunks, lock=True, dt=0.1)



In [20]:
profile_dask_copy(h1, h2, chunks, lock=False, dt=0.1)


Bcolz carrays (in-memory)


In [27]:
c1 = bcolz.carray(a1, chunklen=chunks[0],
                  cparams=bcolz.cparams(cname='lz4', clevel=1, shuffle=2))
c1


Out[27]:
carray((200000000,), uint16)
  nbytes := 381.47 MB; cbytes := 318.98 MB; ratio: 1.20
  cparams := cparams(clevel=1, shuffle=2, cname='lz4', quantize=0)
  chunklen := 1048576; chunksize: 2097152; blocksize: 16384
[1314 2727 2905 ..., 1958 1325 1971]

In [28]:
c2 = bcolz.zeros(a1.shape, chunklen=chunks[0], dtype=a1.dtype, 
                 cparams=bcolz.cparams(cname='lz4', clevel=1, shuffle=2))
c2


Out[28]:
carray((200000000,), uint16)
  nbytes := 381.47 MB; cbytes := 2.00 MB; ratio: 190.73
  cparams := cparams(clevel=1, shuffle=2, cname='lz4', quantize=0)
  chunklen := 1048576; chunksize: 2097152; blocksize: 4096
[0 0 0 ..., 0 0 0]

In [29]:
profile_dask_copy(c1, c2, chunks, lock=True, dt=0.05)



In [30]:
# not sure it's safe to use bcolz without a lock, but what the heck...
profile_dask_copy(c1, c2, chunks, lock=False, dt=0.05)



In [31]:
# for comparison
%timeit -n3 -r5 c2[:] = c1


3 loops, best of 5: 649 ms per loop

In [32]:
# for comparison
%timeit -n3 -r5 c1.copy()


3 loops, best of 5: 557 ms per loop

In [ ]: