In [1]:
import numpy as np
import dask
print('dask', dask.__version__)
import dask.array as da
import h5py
print('h5py', h5py.__version__)
import sys
sys.path.insert(0, '../..')
import allel
print('scikit-allel', allel.__version__)
In [2]:
import bcolz
print('bcolz', bcolz.__version__)
In [3]:
bcolz.blosc_version()
Out[3]:
In [4]:
bcolz.detect_number_of_cores()
Out[4]:
Let's use some real data...
In [2]:
callset = h5py.File('/data/coluzzi/ag1000g/data/phase1/release/AR3/variation/main/hdf5/ag1000g.phase1.ar3.pass.h5', mode='r')
callset
Out[2]:
In [3]:
genotype = allel.model.chunked.GenotypeChunkedArray(callset['3L/calldata/genotype'])
genotype
Out[3]:
Copy the first 2 million rows into a bcolz carray to use for benchmarking.
In [ ]:
g = genotype.copy(stop=2000000)
g
Out of interest, what chunk size did bcolz choose?
In [8]:
g.data.chunklen * g.shape[1] * g.shape[2]
Out[8]:
How long does it take to decompress the data?
In [4]:
def toarray(x):
np.asarray(x)
In [10]:
for n in 1, 2, 4, 8:
bcolz.blosc_set_nthreads(n)
print('--- blosc threads:', n, '---')
%time toarray(g)
print()
How long does it take to compute the maximum value?
In [8]:
def time_max(x):
x = np.asarray(x)
%time x.max()
In [12]:
time_max(g)
Check that scikit-allel's chunked implementation of max() behaves as expected - implementation is not threaded so total time should equal time to decompress data plus time to compute max.
In [13]:
for n in 1, 2, 4, 8:
bcolz.blosc_set_nthreads(n)
print('--- blosc threads:', n, '---')
%time g.max()
print()
Now see how dask behaves.
In [10]:
gd = allel.GenotypeDaskArray.from_array(g)
gd
Out[10]:
In [15]:
for n in 1, 2, 4, 8:
bcolz.blosc_set_nthreads(n)
for m in 1, 2, 4, 8:
print('--- blosc threads:', n, '; dask threads:', m, '---')
%time gd.max().compute(num_workers=m)
print()
See especially the case with 4 blosc threads and 4 dask threads. Total wall time here is less than the sum of the time required to decompress the data with the same number of blosc threads and the time required to compute the maximum. So dask is able to do some work in parallel, even though bcolz does not release the GIL.
In [16]:
bcolz.blosc_set_nthreads(4)
%timeit -n1 -r5 gd.max().compute(num_workers=4)
Try a slightly more compute-intensive task. First the non-parallel version.
In [17]:
bcolz.blosc_set_nthreads(4)
%time g.count_alleles()
Out[17]:
Now dask.
In [11]:
bcolz.blosc_set_nthreads(4)
%time gd.count_alleles().compute(num_workers=4)
Out[11]:
In [12]:
bcolz.blosc_set_nthreads(1)
%time gd.count_alleles().compute(num_workers=4)
Out[12]:
Hack bcolz to reinstate nogil sections around blosc_decompress...
In [7]:
import bcolz
bcolz.__version__
Out[7]:
In [8]:
bcolz.blosc_version()
Out[8]:
Check compression time is unaffected.
In [9]:
for n in 1, 2, 4, 8:
bcolz.blosc_set_nthreads(n)
print('--- blosc threads:', n, '---')
%time toarray(g)
print()
Check scikit-allel's chunked implementation is unaffected.
In [10]:
for n in 1, 2, 4, 8:
bcolz.blosc_set_nthreads(n)
print('--- blosc threads:', n, '---')
%time g.max()
print()
Now see if dask does any better...
In [11]:
gd = allel.GenotypeDaskArray.from_array(g)
gd
Out[11]:
In [12]:
for n in 1, 2, 4, 8:
bcolz.blosc_set_nthreads(n)
for m in 1, 2, 4, 8:
print('--- blosc threads:', n, '; dask threads:', m, '---')
%time gd.max().compute(num_workers=m)
print()
In [13]:
bcolz.blosc_set_nthreads(1)
%timeit -r5 gd.max().compute(num_workers=4)
In [14]:
bcolz.blosc_set_nthreads(1)
%timeit -r5 gd.max().compute(num_workers=8)
In [17]:
bcolz.blosc_set_nthreads(2)
%timeit -r5 gd.max().compute(num_workers=2)
In [15]:
bcolz.blosc_set_nthreads(4)
%timeit -r5 gd.max().compute(num_workers=4)
Try the more compute-intensive operation again.
In [19]:
bcolz.blosc_set_nthreads(1)
%time gd.count_alleles().compute(num_workers=4)
Out[19]:
In [20]:
bcolz.blosc_set_nthreads(4)
%time gd.count_alleles().compute(num_workers=4)
Out[20]:
In [5]:
import bcolz
bcolz.__version__
Out[5]:
In [6]:
bcolz.blosc_version()
Out[6]:
In [7]:
g = genotype.copy(stop=2000000)
g
Out[7]:
In [8]:
for n in 1, 2, 4, 8:
bcolz.blosc_set_nthreads(n)
print('--- blosc threads:', n, '---')
%time toarray(g)
print()
In [ ]:
gd = allel.GenotypeDaskArray.from_array(g)
gd
Out[ ]:
In [ ]:
for n in 1, 2, 4, 8:
bcolz.blosc_set_nthreads(n)
for m in 1, 2, 4, 8:
print('--- blosc threads:', n, '; dask threads:', m, '---')
%time gd.max().compute(num_workers=m)
print()
In [10]:
bcolz.blosc_set_nthreads(1)
Out[10]:
In [11]:
gd.astype('f4').max().compute()
Out[11]:
In [12]:
gd.mean(axis=1).sum().compute(num_workers=4)
Out[12]:
In [13]:
((gd + gd) * gd).std(axis=0).compute()
Out[13]:
In [ ]: