In [3]:
    
import sys
sys.path.insert(0, '..')
import zarr
print('zarr', zarr.__version__)
from zarr import blosc
import numpy as np
import h5py
import multiprocessing
import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize
from cachey import nbytes
import bokeh
from bokeh.io import output_notebook
output_notebook()
from functools import reduce
import operator
import allel
    
    
    
    
In [4]:
    
callset = h5py.File('/data/coluzzi/ag1000g/data/phase1/release/AR3/variation/main/hdf5/ag1000g.phase1.ar3.pass.h5',
                    mode='r')
genotype = callset['3R/calldata/genotype']
genotype
    
    Out[4]:
In [5]:
    
# copy into a zarr array
# N.B., chunks in HDF5 are too small really, use something bigger
chunks = (genotype.chunks[0], genotype.chunks[1] * 20, genotype.chunks[2])
genotype_zarr = zarr.array(genotype, chunks=chunks, compression='blosc',
                           compression_opts=dict(cname='lz4', clevel=1, shuffle=2))
genotype_zarr
    
    Out[5]:
We want to perform an allele count. Compare serial and parallel implementations, and compare working direct from HDF5 versus from Zarr.
In [6]:
    
%%time
# linear implementation from HDF5 on disk
allel.GenotypeChunkedArray(genotype).count_alleles()
    
    
    Out[6]:
In [8]:
    
%%time
# linear implementation from zarr in memory
# (although blosc can use multiple threads internally)
allel.GenotypeChunkedArray(genotype_zarr).count_alleles()
    
    
    Out[8]:
In [21]:
    
# multi-threaded implementation from HDF5 on disk
gd = allel.model.dask.GenotypeDaskArray.from_array(genotype, chunks=chunks)
ac = gd.count_alleles(max_allele=3)
with Profiler() as prof, ResourceProfiler(dt=1) as rprof:
    ac.compute(num_workers=8)
visualize([prof, rprof], min_border_bottom=60, min_border_top=60);
    
    
In [23]:
    
# multi-threaded implementation from zarr in memory
gdz = allel.model.dask.GenotypeDaskArray.from_array(genotype_zarr, chunks=chunks)
acz = gdz.count_alleles(max_allele=3)
with Profiler() as prof, ResourceProfiler(dt=1) as rprof:
    acz.compute(num_workers=8)
visualize([prof, rprof], min_border_bottom=60, min_border_top=60);
    
    
In [ ]: