In [1]:
import numpy as np
import dask
print('dask', dask.__version__)
import dask.array as da
import h5py
print('h5py', h5py.__version__)
import sys
sys.path.insert(0, '../..')
import allel
print('scikit-allel', allel.__version__)


dask 0.7.5
h5py 2.5.0
scikit-allel 0.20.1

In [2]:
import bcolz
print('bcolz', bcolz.__version__)


bcolz 0.12.1

In [3]:
bcolz.blosc_version()


Out[3]:
('1.4.1', '$Date:: 2014-07-08 #$')

In [4]:
bcolz.detect_number_of_cores()


Out[4]:
4

Let's use some real data...


In [2]:
callset = h5py.File('/data/coluzzi/ag1000g/data/phase1/release/AR3/variation/main/hdf5/ag1000g.phase1.ar3.pass.h5', mode='r')
callset


Out[2]:
<HDF5 file "ag1000g.phase1.ar3.pass.h5" (mode r)>

In [3]:
genotype = allel.model.chunked.GenotypeChunkedArray(callset['3L/calldata/genotype'])
genotype


Out[3]:
GenotypeChunkedArray((9643193, 765, 2), int8, nbytes=13.7G, cbytes=548.0M, cratio=25.7, cname=gzip, clevel=3, shuffle=False, chunks=(6553, 10, 2), data=h5py._hl.dataset.Dataset)
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...

Copy the first 2 million rows into a bcolz carray to use for benchmarking.


In [ ]:
g = genotype.copy(stop=2000000)
g

Out of interest, what chunk size did bcolz choose?


In [8]:
g.data.chunklen * g.shape[1] * g.shape[2]


Out[8]:
2096100

How long does it take to decompress the data?


In [4]:
def toarray(x):
    np.asarray(x)

In [10]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    print('--- blosc threads:', n, '---')
    %time toarray(g)
    print()


--- blosc threads: 1 ---
CPU times: user 1.79 s, sys: 164 ms, total: 1.95 s
Wall time: 1.95 s

--- blosc threads: 2 ---
CPU times: user 2 s, sys: 232 ms, total: 2.23 s
Wall time: 1.43 s

--- blosc threads: 4 ---
CPU times: user 2.42 s, sys: 208 ms, total: 2.63 s
Wall time: 1.43 s

--- blosc threads: 8 ---
CPU times: user 2.31 s, sys: 292 ms, total: 2.6 s
Wall time: 1.47 s

How long does it take to compute the maximum value?


In [8]:
def time_max(x):
    x = np.asarray(x)
    %time x.max()

In [12]:
time_max(g)


CPU times: user 2.96 s, sys: 0 ns, total: 2.96 s
Wall time: 2.95 s

Check that scikit-allel's chunked implementation of max() behaves as expected - implementation is not threaded so total time should equal time to decompress data plus time to compute max.


In [13]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    print('--- blosc threads:', n, '---')
    %time g.max()
    print()


--- blosc threads: 1 ---
CPU times: user 5.02 s, sys: 0 ns, total: 5.02 s
Wall time: 5.01 s

--- blosc threads: 2 ---
CPU times: user 5.79 s, sys: 56 ms, total: 5.85 s
Wall time: 4.94 s

--- blosc threads: 4 ---
CPU times: user 6.08 s, sys: 92 ms, total: 6.17 s
Wall time: 4.73 s

--- blosc threads: 8 ---
CPU times: user 6.17 s, sys: 140 ms, total: 6.31 s
Wall time: 4.73 s

Now see how dask behaves.


In [10]:
gd = allel.GenotypeDaskArray.from_array(g)
gd


Out[10]:
GenotypeDaskArray<from-ar..., shape=(2000000, 765, 2), dtype=int8, chunksize=(1370, 765, 2)>
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...


In [15]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    for m in 1, 2, 4, 8:
        print('--- blosc threads:', n, '; dask threads:', m, '---')
        %time gd.max().compute(num_workers=m)
        print()


--- blosc threads: 1 ; dask threads: 1 ---
CPU times: user 7.48 s, sys: 896 ms, total: 8.37 s
Wall time: 8.21 s

--- blosc threads: 1 ; dask threads: 2 ---
CPU times: user 6.83 s, sys: 1.57 s, total: 8.4 s
Wall time: 5.29 s

--- blosc threads: 1 ; dask threads: 4 ---
CPU times: user 7.32 s, sys: 1.85 s, total: 9.17 s
Wall time: 4.16 s

--- blosc threads: 1 ; dask threads: 8 ---
CPU times: user 7.54 s, sys: 1.13 s, total: 8.67 s
Wall time: 4.02 s

--- blosc threads: 2 ; dask threads: 1 ---
CPU times: user 7.69 s, sys: 1.8 s, total: 9.49 s
Wall time: 7.94 s

--- blosc threads: 2 ; dask threads: 2 ---
CPU times: user 7.07 s, sys: 1.76 s, total: 8.83 s
Wall time: 4.53 s

--- blosc threads: 2 ; dask threads: 4 ---
CPU times: user 7.75 s, sys: 1.54 s, total: 9.29 s
Wall time: 3.9 s

--- blosc threads: 2 ; dask threads: 8 ---
CPU times: user 7.56 s, sys: 1.88 s, total: 9.44 s
Wall time: 4 s

--- blosc threads: 4 ; dask threads: 1 ---
CPU times: user 7.62 s, sys: 2 s, total: 9.62 s
Wall time: 7.55 s

--- blosc threads: 4 ; dask threads: 2 ---
CPU times: user 7.32 s, sys: 1.8 s, total: 9.12 s
Wall time: 4.63 s

--- blosc threads: 4 ; dask threads: 4 ---
CPU times: user 7.79 s, sys: 1.99 s, total: 9.78 s
Wall time: 3.54 s

--- blosc threads: 4 ; dask threads: 8 ---
CPU times: user 7.85 s, sys: 1.76 s, total: 9.61 s
Wall time: 4.01 s

--- blosc threads: 8 ; dask threads: 1 ---
CPU times: user 7.45 s, sys: 2.04 s, total: 9.49 s
Wall time: 7.33 s

--- blosc threads: 8 ; dask threads: 2 ---
CPU times: user 7.39 s, sys: 1.94 s, total: 9.33 s
Wall time: 4.67 s

--- blosc threads: 8 ; dask threads: 4 ---
CPU times: user 7.74 s, sys: 2.05 s, total: 9.79 s
Wall time: 3.61 s

--- blosc threads: 8 ; dask threads: 8 ---
CPU times: user 7.79 s, sys: 1.9 s, total: 9.69 s
Wall time: 3.84 s

See especially the case with 4 blosc threads and 4 dask threads. Total wall time here is less than the sum of the time required to decompress the data with the same number of blosc threads and the time required to compute the maximum. So dask is able to do some work in parallel, even though bcolz does not release the GIL.


In [16]:
bcolz.blosc_set_nthreads(4)
%timeit -n1 -r5 gd.max().compute(num_workers=4)


1 loops, best of 5: 3.43 s per loop

Try a slightly more compute-intensive task. First the non-parallel version.


In [17]:
bcolz.blosc_set_nthreads(4)
%time g.count_alleles()


CPU times: user 15.7 s, sys: 244 ms, total: 16 s
Wall time: 13.1 s
Out[17]:
AlleleCountsChunkedArray((2000000, 4), int32, nbytes=30.5M, cbytes=5.4M, cratio=5.6, cname=blosclz, clevel=5, shuffle=True, chunks=(32768, 4), data=bcolz.carray_ext.carray)
0 1 2 3
0 1527 3 0 0
1 1529 1 0 0
2 1528 2 0 0
3 1528 2 0 0
4 1526 4 0 0

...

Now dask.


In [11]:
bcolz.blosc_set_nthreads(4)
%time gd.count_alleles().compute(num_workers=4)


CPU times: user 19.7 s, sys: 4.11 s, total: 23.8 s
Wall time: 8.6 s
Out[11]:
AlleleCountsArray((2000000, 4), dtype=int64)
0 1 2 3
0 1527 3 0 0
1 1529 1 0 0
2 1528 2 0 0
3 1528 2 0 0
4 1526 4 0 0

...


In [12]:
bcolz.blosc_set_nthreads(1)
%time gd.count_alleles().compute(num_workers=4)


CPU times: user 19.3 s, sys: 3.73 s, total: 23 s
Wall time: 9.51 s
Out[12]:
AlleleCountsArray((2000000, 4), dtype=int64)
0 1 2 3
0 1527 3 0 0
1 1529 1 0 0
2 1528 2 0 0
3 1528 2 0 0
4 1526 4 0 0

...

with nogil

Hack bcolz to reinstate nogil sections around blosc_decompress...


In [7]:
import bcolz
bcolz.__version__


Out[7]:
'0.12.2.dev1+dirty'

In [8]:
bcolz.blosc_version()


Out[8]:
('1.4.1', '$Date:: 2014-07-08 #$')

Check compression time is unaffected.


In [9]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    print('--- blosc threads:', n, '---')
    %time toarray(g)
    print()


--- blosc threads: 1 ---
CPU times: user 1.75 s, sys: 164 ms, total: 1.91 s
Wall time: 1.91 s

--- blosc threads: 2 ---
CPU times: user 1.86 s, sys: 304 ms, total: 2.16 s
Wall time: 1.39 s

--- blosc threads: 4 ---
CPU times: user 2.34 s, sys: 264 ms, total: 2.6 s
Wall time: 1.46 s

--- blosc threads: 8 ---
CPU times: user 2.29 s, sys: 264 ms, total: 2.56 s
Wall time: 1.46 s

Check scikit-allel's chunked implementation is unaffected.


In [10]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    print('--- blosc threads:', n, '---')
    %time g.max()
    print()


--- blosc threads: 1 ---
CPU times: user 5.04 s, sys: 0 ns, total: 5.04 s
Wall time: 5.03 s

--- blosc threads: 2 ---
CPU times: user 5.77 s, sys: 44 ms, total: 5.81 s
Wall time: 4.9 s

--- blosc threads: 4 ---
CPU times: user 6.06 s, sys: 108 ms, total: 6.16 s
Wall time: 4.76 s

--- blosc threads: 8 ---
CPU times: user 6.1 s, sys: 152 ms, total: 6.25 s
Wall time: 4.86 s

Now see if dask does any better...


In [11]:
gd = allel.GenotypeDaskArray.from_array(g)
gd


Out[11]:
GenotypeDaskArray<from-ar..., shape=(2000000, 765, 2), dtype=int8, chunksize=(1370, 765, 2)>
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...


In [12]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    for m in 1, 2, 4, 8:
        print('--- blosc threads:', n, '; dask threads:', m, '---')
        %time gd.max().compute(num_workers=m)
        print()


--- blosc threads: 1 ; dask threads: 1 ---
CPU times: user 7.13 s, sys: 828 ms, total: 7.96 s
Wall time: 7.84 s

--- blosc threads: 1 ; dask threads: 2 ---
CPU times: user 6.91 s, sys: 1.74 s, total: 8.65 s
Wall time: 4.58 s

--- blosc threads: 1 ; dask threads: 4 ---
CPU times: user 7.68 s, sys: 1.98 s, total: 9.65 s
Wall time: 3.36 s

--- blosc threads: 1 ; dask threads: 8 ---
CPU times: user 7.62 s, sys: 888 ms, total: 8.5 s
Wall time: 3.27 s

--- blosc threads: 2 ; dask threads: 1 ---
CPU times: user 7.42 s, sys: 1.88 s, total: 9.3 s
Wall time: 7.94 s

--- blosc threads: 2 ; dask threads: 2 ---
CPU times: user 7.2 s, sys: 1.82 s, total: 9.01 s
Wall time: 4.35 s

--- blosc threads: 2 ; dask threads: 4 ---
CPU times: user 7.52 s, sys: 1.64 s, total: 9.15 s
Wall time: 3.66 s

--- blosc threads: 2 ; dask threads: 8 ---
CPU times: user 7.7 s, sys: 1.77 s, total: 9.46 s
Wall time: 3.78 s

--- blosc threads: 4 ; dask threads: 1 ---
CPU times: user 7.44 s, sys: 624 ms, total: 8.06 s
Wall time: 6.44 s

--- blosc threads: 4 ; dask threads: 2 ---
CPU times: user 7.28 s, sys: 1.74 s, total: 9.01 s
Wall time: 4.4 s

--- blosc threads: 4 ; dask threads: 4 ---
CPU times: user 7.55 s, sys: 2 s, total: 9.55 s
Wall time: 3.74 s

--- blosc threads: 4 ; dask threads: 8 ---
CPU times: user 7.7 s, sys: 1.78 s, total: 9.48 s
Wall time: 3.99 s

--- blosc threads: 8 ; dask threads: 1 ---
CPU times: user 7.52 s, sys: 1.96 s, total: 9.48 s
Wall time: 7.37 s

--- blosc threads: 8 ; dask threads: 2 ---
CPU times: user 7.2 s, sys: 680 ms, total: 7.88 s
Wall time: 3.97 s

--- blosc threads: 8 ; dask threads: 4 ---
CPU times: user 7.77 s, sys: 2.04 s, total: 9.8 s
Wall time: 3.65 s

--- blosc threads: 8 ; dask threads: 8 ---
CPU times: user 7.69 s, sys: 1.88 s, total: 9.57 s
Wall time: 3.87 s


In [13]:
bcolz.blosc_set_nthreads(1)
%timeit -r5 gd.max().compute(num_workers=4)


1 loops, best of 5: 3.08 s per loop

In [14]:
bcolz.blosc_set_nthreads(1)
%timeit -r5 gd.max().compute(num_workers=8)


1 loops, best of 5: 3.24 s per loop

In [17]:
bcolz.blosc_set_nthreads(2)
%timeit -r5 gd.max().compute(num_workers=2)


1 loops, best of 5: 3.53 s per loop

In [15]:
bcolz.blosc_set_nthreads(4)
%timeit -r5 gd.max().compute(num_workers=4)


1 loops, best of 5: 3.63 s per loop

Try the more compute-intensive operation again.


In [19]:
bcolz.blosc_set_nthreads(1)
%time gd.count_alleles().compute(num_workers=4)


CPU times: user 20.1 s, sys: 3.96 s, total: 24.1 s
Wall time: 7.82 s
Out[19]:
AlleleCountsArray((2000000, 4), dtype=int64)
0 1 2 3
0 1527 3 0 0
1 1529 1 0 0
2 1528 2 0 0
3 1528 2 0 0
4 1526 4 0 0

...


In [20]:
bcolz.blosc_set_nthreads(4)
%time gd.count_alleles().compute(num_workers=4)


CPU times: user 19.9 s, sys: 3.92 s, total: 23.8 s
Wall time: 8.54 s
Out[20]:
AlleleCountsArray((2000000, 4), dtype=int64)
0 1 2 3
0 1527 3 0 0
1 1529 1 0 0
2 1528 2 0 0
3 1528 2 0 0
4 1526 4 0 0

...

with nogil, with c-blosc 1.7.0


In [5]:
import bcolz
bcolz.__version__


Out[5]:
'0.12.2.dev1+dirty'

In [6]:
bcolz.blosc_version()


Out[6]:
('1.7.1.dev', '$Date:: 2015-07-05 #$')

In [7]:
g = genotype.copy(stop=2000000)
g


Out[7]:
GenotypeChunkedArray((2000000, 765, 2), int8, nbytes=2.8G, cbytes=156.8M, cratio=18.6, cname=blosclz, clevel=5, shuffle=True, chunks=(1370, 765, 2), data=bcolz.carray_ext.carray)
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...


In [8]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    print('--- blosc threads:', n, '---')
    %time toarray(g)
    print()


--- blosc threads: 1 ---
CPU times: user 5.36 s, sys: 768 ms, total: 6.12 s
Wall time: 6.11 s

--- blosc threads: 2 ---
CPU times: user 7.26 s, sys: 888 ms, total: 8.14 s
Wall time: 4.8 s

--- blosc threads: 4 ---
CPU times: user 9.41 s, sys: 888 ms, total: 10.3 s
Wall time: 3.99 s

--- blosc threads: 8 ---
CPU times: user 9.21 s, sys: 936 ms, total: 10.1 s
Wall time: 4.1 s


In [ ]:
gd = allel.GenotypeDaskArray.from_array(g)
gd


Out[ ]:
GenotypeDaskArray<from-ar..., shape=(2000000, 765, 2), dtype=int8, chunksize=(1370, 765, 2)>
0 1 2 3 4 ... 760 761 762 763 764
0 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
1 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
2 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
3 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0
4 0/0 0/0 0/0 0/0 0/0 ... 0/0 0/0 0/0 0/0 0/0

...


In [ ]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    for m in 1, 2, 4, 8:
        print('--- blosc threads:', n, '; dask threads:', m, '---')
        %time gd.max().compute(num_workers=m)
        print()


--- blosc threads: 1 ; dask threads: 1 ---
CPU times: user 11 s, sys: 1.6 s, total: 12.6 s
Wall time: 12.4 s

--- blosc threads: 1 ; dask threads: 2 ---
CPU times: user 12.4 s, sys: 1.45 s, total: 13.9 s
Wall time: 7.37 s

--- blosc threads: 1 ; dask threads: 4 ---
CPU times: user 15.3 s, sys: 1.24 s, total: 16.5 s

Try to reproduce segfaults...


In [10]:
bcolz.blosc_set_nthreads(1)


Out[10]:
1

In [11]:
gd.astype('f4').max().compute()


Out[11]:
3.0

In [12]:
gd.mean(axis=1).sum().compute(num_workers=4)


Out[12]:
170767.96339869278

In [13]:
((gd + gd) * gd).std(axis=0).compute()


Out[13]:
array([[ 0.50495372,  0.84406405],
       [ 0.48935613,  0.81991694],
       [ 0.48457914,  0.8209112 ],
       ..., 
       [ 0.50302364,  0.84650751],
       [ 0.49943217,  0.8405899 ],
       [ 0.5033905 ,  0.84399184]])

In [ ]: