In [1]:
import numpy as np
import dask.async
import dask.array as da
import sys
sys.path.insert(0, '../..')
%load_ext autoreload
%autoreload 1
%aimport allel
%aimport allel.model
%aimport allel.model.dask
allel.__version__


Out[1]:
'0.20.0.feature_dask'

Count alleles, no mask


In [2]:
g = allel.GenotypeArray([
    [[0, 0], [0, 1], [-1, -1]],
    [[0, 2], [1, 1], [-1, -1]],
    [[1, 0], [2, 1], [-1, -1]],
    [[2, 2], [-1, -1], [-1, -1]],
    [[-1, -1], [-1, -1], [-1, -1]]
])
g


Out[2]:
GenotypeArray((5, 3, 2), dtype=int64)
0 1 2
0 0/0 0/1 ./.
1 0/2 1/1 ./.
2 1/0 2/1 ./.
3 2/2 ./. ./.
4 ./. ./. ./.

In [3]:
# count alleles, no mask
expect_no_mask = g.count_alleles()
expect_no_mask


Out[3]:
AlleleCountsArray((5, 3), dtype=int32)
0 1 2
0 3 1 0
1 1 2 1
2 1 2 1
3 0 0 2
4 0 0 0

In [6]:
m = np.array([
    [True, False, False],
    [False, False, False],
    [False, True, False],
    [False, False, True],
    [True, False, True]
])
g.mask = m

In [7]:
# count alleles, with mask
expect_with_mask = g.count_alleles()
expect_with_mask


Out[7]:
AlleleCountsArray((5, 3), dtype=int32)
0 1 2
0 1 1 0
1 1 2 1
2 1 1 0
3 0 0 2
4 0 0 0

In [8]:
assert not np.array_equal(expect_no_mask, expect_with_mask)

In [14]:
def dask_count_alleles(g, max_allele=None):
    """Count alleles, no mask."""
    
    gd = da.from_array(np.asarray(g), chunks=(2, 2, None))

    # if max_allele not specified, precompute
    if max_allele is None:
        max_allele = gd.max().compute()

    # block mapping function
    def f(block):
        bg = allel.GenotypeArray(block)
        return bg.count_alleles(max_allele=max_allele)[:, None, :]

    # determine output chunks - preserve dim0; change dim1, dim2
    chunks = (gd.chunks[0], (1,)*len(gd.chunks[1]), (max_allele+1,))

    # map blocks and reduce
    out = gd.map_blocks(f, chunks=chunks).sum(axis=1)
    
    return out

In [15]:
ac = dask_count_alleles(g)
ac


Out[15]:
dask.array<atop-70..., shape=(5, 3), dtype=None, chunksize=(2, 3)>

In [17]:
assert np.array_equal(expect_no_mask, ac.compute())

In [23]:
def dask_count_alleles_with_mask(g, m, max_allele=None):
    """Count alleles, ignoring masked genotype calls."""
    
    gd = da.from_array(np.asarray(g), chunks=(2, 2, None))
    md = da.from_array(m, chunks=gd.chunks[:2])

    # if max_allele not specified, precompute
    if max_allele is None:
        max_allele = gd.max().compute()

    # block mapping function
    def f(block, bmask):
        bg = allel.GenotypeArray(block)
        bg.mask = bmask[:, :, 0]
        return bg.count_alleles(max_allele=max_allele)[:, None, :]

    # determine output chunks - preserve dim0; change dim1, dim2
    chunks = (gd.chunks[0], (1,)*len(gd.chunks[1]), (max_allele+1,))

    # map blocks and reduce
    out = da.map_blocks(f, gd, md[:, :, None], chunks=chunks).sum(axis=1)
    
    return out

In [24]:
ac_with_mask = dask_count_alleles_with_mask(g, m)
ac_with_mask


Out[24]:
dask.array<atop-ed..., shape=(5, 3), dtype=None, chunksize=(2, 3)>

In [25]:
ac_with_mask.compute()


Out[25]:
array([[1, 1, 0],
       [1, 2, 1],
       [1, 1, 0],
       [0, 0, 2],
       [0, 0, 0]])

In [26]:
assert np.array_equal(expect_with_mask, ac_with_mask.compute())

In [ ]: