In [1]:
import numpy as np
import dask.async
import dask.array as da
import sys
sys.path.insert(0, '../..')
%load_ext autoreload
%autoreload 1
%aimport allel
%aimport allel.model
%aimport allel.model.dask
allel.__version__
Out[1]:
In [2]:
g = allel.GenotypeArray([
[[0, 0], [0, 1], [-1, -1]],
[[0, 2], [1, 1], [-1, -1]],
[[1, 0], [2, 1], [-1, -1]],
[[2, 2], [-1, -1], [-1, -1]],
[[-1, -1], [-1, -1], [-1, -1]]
])
g
Out[2]:
In [3]:
# count alleles, no mask
expect_no_mask = g.count_alleles()
expect_no_mask
Out[3]:
In [6]:
m = np.array([
[True, False, False],
[False, False, False],
[False, True, False],
[False, False, True],
[True, False, True]
])
g.mask = m
In [7]:
# count alleles, with mask
expect_with_mask = g.count_alleles()
expect_with_mask
Out[7]:
In [8]:
assert not np.array_equal(expect_no_mask, expect_with_mask)
In [14]:
def dask_count_alleles(g, max_allele=None):
"""Count alleles, no mask."""
gd = da.from_array(np.asarray(g), chunks=(2, 2, None))
# if max_allele not specified, precompute
if max_allele is None:
max_allele = gd.max().compute()
# block mapping function
def f(block):
bg = allel.GenotypeArray(block)
return bg.count_alleles(max_allele=max_allele)[:, None, :]
# determine output chunks - preserve dim0; change dim1, dim2
chunks = (gd.chunks[0], (1,)*len(gd.chunks[1]), (max_allele+1,))
# map blocks and reduce
out = gd.map_blocks(f, chunks=chunks).sum(axis=1)
return out
In [15]:
ac = dask_count_alleles(g)
ac
Out[15]:
In [17]:
assert np.array_equal(expect_no_mask, ac.compute())
In [23]:
def dask_count_alleles_with_mask(g, m, max_allele=None):
"""Count alleles, ignoring masked genotype calls."""
gd = da.from_array(np.asarray(g), chunks=(2, 2, None))
md = da.from_array(m, chunks=gd.chunks[:2])
# if max_allele not specified, precompute
if max_allele is None:
max_allele = gd.max().compute()
# block mapping function
def f(block, bmask):
bg = allel.GenotypeArray(block)
bg.mask = bmask[:, :, 0]
return bg.count_alleles(max_allele=max_allele)[:, None, :]
# determine output chunks - preserve dim0; change dim1, dim2
chunks = (gd.chunks[0], (1,)*len(gd.chunks[1]), (max_allele+1,))
# map blocks and reduce
out = da.map_blocks(f, gd, md[:, :, None], chunks=chunks).sum(axis=1)
return out
In [24]:
ac_with_mask = dask_count_alleles_with_mask(g, m)
ac_with_mask
Out[24]:
In [25]:
ac_with_mask.compute()
Out[25]:
In [26]:
assert np.array_equal(expect_with_mask, ac_with_mask.compute())
In [ ]: