This notebook profiles a very simple array copy operation, using synthetic data.
In [1]:
import sys
sys.path.insert(0, '..')
import zarr
print('zarr', zarr.__version__)
from zarr import blosc
import numpy as np
import h5py
import bcolz
# don't let bcolz use multiple threads internally, we want to
# see whether dask can make good use of multiple CPUs
bcolz.set_nthreads(1)
import multiprocessing
import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize
from cachey import nbytes
import bokeh
from bokeh.io import output_notebook
output_notebook()
In [2]:
import tempfile
import operator
from functools import reduce
from zarr.util import human_readable_size
def h5fmem(**kwargs):
"""Convenience function to create an in-memory HDF5 file."""
# need a file name even tho nothing is ever written
fn = tempfile.mktemp()
# file creation args
kwargs['mode'] = 'w'
kwargs['driver'] = 'core'
kwargs['backing_store'] = False
# open HDF5 file
h5f = h5py.File(fn, **kwargs)
return h5f
def h5d_diagnostics(d):
"""Print some diagnostics on an HDF5 dataset."""
print(d)
nbytes = reduce(operator.mul, d.shape) * d.dtype.itemsize
cbytes = d._id.get_storage_size()
if cbytes > 0:
ratio = nbytes / cbytes
else:
ratio = np.inf
r = ' compression: %s' % d.compression
r += '; compression_opts: %s' % d.compression_opts
r += '; shuffle: %s' % d.shuffle
r += '\n nbytes: %s' % human_readable_size(nbytes)
r += '; nbytes_stored: %s' % human_readable_size(cbytes)
r += '; ratio: %.1f' % ratio
r += '; chunks: %s' % str(d.chunks)
print(r)
In [3]:
def profile_dask_copy(src, dst, chunks, num_workers=multiprocessing.cpu_count(), dt=0.1, lock=True):
dsrc = da.from_array(src, chunks=chunks)
with Profiler() as prof, ResourceProfiler(dt=dt) as rprof:
da.store(dsrc, dst, num_workers=num_workers, lock=lock)
visualize([prof, rprof], min_border_top=60, min_border_bottom=60)
In [4]:
# a1 = np.arange(400000000, dtype='i4')
a1 = np.random.normal(2000, 1000, size=200000000).astype('u2')
a1
Out[4]:
In [5]:
human_readable_size(a1.nbytes)
Out[5]:
In [6]:
a2 = np.empty_like(a1)
In [7]:
chunks = 2**20, # 4M
In [8]:
%time a2[:] = a1
In [9]:
profile_dask_copy(a1, a2, chunks, lock=True, dt=.01)
In [10]:
profile_dask_copy(a1, a2, chunks, lock=False, dt=.01)
In [11]:
z1 = zarr.array(a1, chunks=chunks, compression='blosc',
compression_opts=dict(cname='lz4', clevel=1, shuffle=2))
z1
Out[11]:
In [12]:
z2 = zarr.empty_like(z1)
z2
Out[12]:
In [13]:
profile_dask_copy(z1, z2, chunks, lock=True, dt=.02)
In [14]:
profile_dask_copy(z1, z2, chunks, lock=False, dt=0.02)
In [15]:
# for comparison, using blosc internal threads
%timeit -n3 -r5 z2[:] = z1
In [17]:
%prun z2[:] = z1
Without the dask lock, we get better CPU utilisation.
In [16]:
h5f = h5fmem()
h5f
Out[16]:
In [17]:
h1 = h5f.create_dataset('h1', data=a1, chunks=chunks, compression='lzf', shuffle=True)
h5d_diagnostics(h1)
In [18]:
h2 = h5f.create_dataset('h2', shape=h1.shape, chunks=h1.chunks,
compression=h1.compression, compression_opts=h1.compression_opts,
shuffle=h1.shuffle)
h5d_diagnostics(h2)
In [19]:
profile_dask_copy(h1, h2, chunks, lock=True, dt=0.1)
In [20]:
profile_dask_copy(h1, h2, chunks, lock=False, dt=0.1)
In [27]:
c1 = bcolz.carray(a1, chunklen=chunks[0],
cparams=bcolz.cparams(cname='lz4', clevel=1, shuffle=2))
c1
Out[27]:
In [28]:
c2 = bcolz.zeros(a1.shape, chunklen=chunks[0], dtype=a1.dtype,
cparams=bcolz.cparams(cname='lz4', clevel=1, shuffle=2))
c2
Out[28]:
In [29]:
profile_dask_copy(c1, c2, chunks, lock=True, dt=0.05)
In [30]:
# not sure it's safe to use bcolz without a lock, but what the heck...
profile_dask_copy(c1, c2, chunks, lock=False, dt=0.05)
In [31]:
# for comparison
%timeit -n3 -r5 c2[:] = c1
In [32]:
# for comparison
%timeit -n3 -r5 c1.copy()
In [ ]: