In [1]:
import numpy as np
import dask
print('dask', dask.__version__)
import dask.array as da
import h5py
print('h5py', h5py.__version__)
import bcolz
print('bcolz', bcolz.__version__)
import sys
sys.path.insert(0, '../..')
%reload_ext autoreload
%autoreload 1
%aimport allel.model.dask
%aimport allel.model.chunked
%aimport allel.model.ndarray
%aimport allel.model
%aimport allel
print('scikit-allel', allel.__version__)


dask 0.7.5
h5py 2.5.0
bcolz 0.12.1
scikit-allel 0.21.0.dev0

In [2]:
callset = h5py.File('/data/coluzzi/ag1000g/data/phase1/release/AR3/variation/main/hdf5/ag1000g.phase1.ar3.pass.h5', mode='r')
callset


Out[2]:
<HDF5 file "ag1000g.phase1.ar3.pass.h5" (mode r)>

In [3]:
genotype = allel.model.chunked.GenotypeChunkedArray(callset['3L/calldata/genotype'])
genotype


Out[3]:
GenotypeChunkedArray((9643193, 765, 2), int8, nbytes=13.7G, cbytes=548.0M, cratio=25.7, cname=gzip, clevel=3, shuffle=False, chunks=(6553, 10, 2), data=h5py._hl.dataset.Dataset)
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...

h5py


In [4]:
# copy the first 2 million rows into memory to use as benchmarking data
gc = genotype.copy(storage='hdf5mem_zlib1', stop=2000000, chunks=(5000, 100, 2))
gc


Out[4]:
GenotypeChunkedArray((2000000, 765, 2), int8, nbytes=2.8G, cbytes=110.3M, cratio=26.4, cname=gzip, clevel=1, shuffle=False, chunks=(5000, 100, 2), data=h5py._hl.dataset.Dataset)
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...


In [5]:
# how long does it take to read the data?
%time gc.max()


CPU times: user 6.56 s, sys: 0 ns, total: 6.56 s
Wall time: 6.56 s
Out[5]:
3

In [6]:
# compute count_alleles block-wise in series for comparison with dask
%time gc.count_alleles(max_allele=3)


CPU times: user 9.06 s, sys: 4 ms, total: 9.07 s
Wall time: 8.98 s
Out[6]:
AlleleCountsChunkedArray((2000000, 4), int32, nbytes=30.5M, cbytes=5.4M, cratio=5.6, cname=blosclz, clevel=5, shuffle=True, chunks=(32768, 4), data=bcolz.carray_ext.carray)
0 1 2 3
0 1527 3 0 0
1 1529 1 0 0
2 1528 2 0 0
3 1528 2 0 0
4 1526 4 0 0

...


In [7]:
gd = allel.model.dask.GenotypeDaskArray.from_array(gc)
gd


Out[7]:
GenotypeDaskArray<from-ar..., shape=(2000000, 765, 2), dtype=int8, chunksize=(5000, 100, 2)>
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...


In [8]:
ac = gd.count_alleles(max_allele=3)
ac


Out[8]:
AlleleCountsDaskArray<atop-26..., shape=(2000000, 4), dtype=int64, chunksize=(5000, 4)>
0 1 2 3
0 1527 3 0 0
1 1529 1 0 0
2 1528 2 0 0
3 1528 2 0 0
4 1526 4 0 0

...


In [9]:
%time _ = ac.compute(num_workers=1)


CPU times: user 10.9 s, sys: 576 ms, total: 11.5 s
Wall time: 11.1 s

In [10]:
%time _ = ac.compute(num_workers=2)


CPU times: user 11.4 s, sys: 1.1 s, total: 12.5 s
Wall time: 7.13 s

In [11]:
%time _ = ac.compute(num_workers=3)


CPU times: user 12.3 s, sys: 1.34 s, total: 13.6 s
Wall time: 7.24 s

In [12]:
%time _ = ac.compute(num_workers=6)


CPU times: user 12.6 s, sys: 1.5 s, total: 14.1 s
Wall time: 7.44 s

bcolz


In [13]:
gc = genotype.copy(storage='bcolzmem', stop=2000000)
gc


Out[13]:
GenotypeChunkedArray((2000000, 765, 2), int8, nbytes=2.8G, cbytes=149.6M, cratio=19.5, cname=blosclz, clevel=5, shuffle=True, chunks=(1370, 765, 2), data=bcolz.carray_ext.carray)
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...


In [14]:
%time gc.max()


CPU times: user 4.66 s, sys: 104 ms, total: 4.76 s
Wall time: 3.19 s
Out[14]:
3

In [15]:
# compute count_alleles block-wise in series for comparison
%time _ = gc.count_alleles(max_allele=3)


CPU times: user 7.03 s, sys: 160 ms, total: 7.19 s
Wall time: 5.51 s

In [16]:
gd = allel.model.dask.GenotypeDaskArray.from_array(gc)
gd


Out[16]:
GenotypeDaskArray<from-ar..., shape=(2000000, 765, 2), dtype=int8, chunksize=(1370, 765, 2)>
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...


In [17]:
ac = gd.count_alleles(max_allele=3)
ac


Out[17]:
AlleleCountsDaskArray<atop-55..., shape=(2000000, 4), dtype=int64, chunksize=(1370, 4)>
0 1 2 3
0 1527 3 0 0
1 1529 1 0 0
2 1528 2 0 0
3 1528 2 0 0
4 1526 4 0 0

...


In [18]:
%time _ = ac.compute(num_workers=1)


CPU times: user 7.66 s, sys: 1.72 s, total: 9.38 s
Wall time: 7.07 s

In [19]:
%time _ = ac.compute(num_workers=2)


CPU times: user 8.35 s, sys: 1.88 s, total: 10.2 s
Wall time: 4.36 s

In [20]:
%time _ = ac.compute(num_workers=3)


CPU times: user 8.3 s, sys: 1.86 s, total: 10.2 s
Wall time: 3.26 s

In [21]:
%time _ = ac.compute(num_workers=6)


CPU times: user 8.52 s, sys: 1.52 s, total: 10 s
Wall time: 2.25 s

In [ ]: