Profile allele count from genotype data via dask.array


In [3]:
import sys
sys.path.insert(0, '..')
import zarr
print('zarr', zarr.__version__)
from zarr import blosc
import numpy as np
import h5py
import multiprocessing
import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize
from cachey import nbytes
import bokeh
from bokeh.io import output_notebook
output_notebook()
from functools import reduce
import operator
import allel


zarr 1.0.1.dev18+dirty
Loading BokehJS ...

In [4]:
callset = h5py.File('/data/coluzzi/ag1000g/data/phase1/release/AR3/variation/main/hdf5/ag1000g.phase1.ar3.pass.h5',
                    mode='r')
genotype = callset['3R/calldata/genotype']
genotype


Out[4]:
<HDF5 dataset "genotype": shape (13167162, 765, 2), type "|i1">

In [5]:
# copy into a zarr array
# N.B., chunks in HDF5 are too small really, use something bigger
chunks = (genotype.chunks[0], genotype.chunks[1] * 20, genotype.chunks[2])
genotype_zarr = zarr.array(genotype, chunks=chunks, compression='blosc',
                           compression_opts=dict(cname='lz4', clevel=1, shuffle=2))
genotype_zarr


Out[5]:
zarr.core.Array((13167162, 765, 2), int8, chunks=(6553, 200, 2), order=C)
  compression: blosc; compression_opts: {'clevel': 1, 'cname': 'lz4', 'shuffle': 2}
  nbytes: 18.8G; nbytes_stored: 683.2M; ratio: 28.1; initialized: 8040/8040
  store: builtins.dict

We want to perform an allele count. Compare serial and parallel implementations, and compare working direct from HDF5 versus from Zarr.


In [6]:
%%time
# linear implementation from HDF5 on disk
allel.GenotypeChunkedArray(genotype).count_alleles()


CPU times: user 1min 50s, sys: 512 ms, total: 1min 51s
Wall time: 1min 50s
Out[6]:
AlleleCountsChunkedArray((13167162, 4), int32, chunks=(65536, 4))
nbytes: 200.9M; cbytes: 38.3M; cratio: 5.2;
compression: blosc; compression_opts: cparams(clevel=5, shuffle=1, cname='lz4', quantize=0);
data: bcolz.carray_ext.carray
0 1 2 3
0 1523 5 0 0
1 1527 1 0 0
2 1527 1 0 0
3 1527 1 0 0
4 1527 1 0 0

...


In [8]:
%%time
# linear implementation from zarr in memory
# (although blosc can use multiple threads internally)
allel.GenotypeChunkedArray(genotype_zarr).count_alleles()


CPU times: user 2min 27s, sys: 2.14 s, total: 2min 29s
Wall time: 1min 23s
Out[8]:
AlleleCountsChunkedArray((13167162, 4), int32, chunks=(65536, 4))
nbytes: 200.9M; cbytes: 38.3M; cratio: 5.2;
compression: blosc; compression_opts: cparams(clevel=5, shuffle=1, cname='lz4', quantize=0);
data: bcolz.carray_ext.carray
0 1 2 3
0 1523 5 0 0
1 1527 1 0 0
2 1527 1 0 0
3 1527 1 0 0
4 1527 1 0 0

...


In [21]:
# multi-threaded implementation from HDF5 on disk
gd = allel.model.dask.GenotypeDaskArray.from_array(genotype, chunks=chunks)
ac = gd.count_alleles(max_allele=3)
with Profiler() as prof, ResourceProfiler(dt=1) as rprof:
    ac.compute(num_workers=8)
visualize([prof, rprof], min_border_bottom=60, min_border_top=60);



In [23]:
# multi-threaded implementation from zarr in memory
gdz = allel.model.dask.GenotypeDaskArray.from_array(genotype_zarr, chunks=chunks)
acz = gdz.count_alleles(max_allele=3)
with Profiler() as prof, ResourceProfiler(dt=1) as rprof:
    acz.compute(num_workers=8)
visualize([prof, rprof], min_border_bottom=60, min_border_top=60);



In [ ]: