In [1]:
%load_ext cythonmagic

In [28]:
%%cython

from __future__ import division

import cython
cimport cython

from scipy.optimize import leastsq
from skimage import feature

import numpy as np
cimport numpy as np
import pandas as pd

cdef np.ndarray[np.double_t, ndim=2] find_gaussian_peaks_cython(np.ndarray[np.long_t, ndim=2] image,
                                np.double_t w_s=15,
                                np.double_t peak_radius=1.5,
                                np.double_t threshold=27.,
                                np.double_t max_peaks=1e4):
    """
    This function implements the Gaussian peak detection described
    in Segré et al. Nature Methods **5**, 8 (2008). It is based on a
    likelyhood ratio test on the presence or absence of a Gaussian
    peak on a patch moving over the input 2D image and a successive
    sub-pixel localization of the peaks by a least square fit.  This
    detection is followed recursively by a _deflation_ of the image
    from the detected peaks and a new detection, until no more peaks
    are found

    Parameters
    ----------

    image: a 2D array
        the input image
    w_s: int, optional
        the width of the sliding window over which the hypothesis ratio
        is computed :math:`w_s` in the article. It should be wide enough
        to contain some background to allow a proper noise evaluation.
    peak_radius: float, optional
        typical radius of the peaks to detect. It must be higher than one
        (as peaks less that a pixel wide would yield bogus results
    thershold: float, optional
        Criterium for a positive detection (i.e. the null hypothesis is false).
        Corresponds to the :mat:`\chi^2` parameter in the Constant False
        Alarm Rate section of the article supplementary text (p. 12).
        A higher `threshold` corresponds to a more stringent test.
        According to the authors, this parameters needs to be adjusted
        once for a given data set.

    Returns
    -------

    peaks: ndarray
        peaks is a Nx4 array, where N is the number of detected peaks in the
        image. Each line gives the x position, y position, width,
        and (background corrected) intensity of a detected peak (in that order).

    """
    cdef np.ndarray[np.long_t, ndim=2] peaks_coords
    cdef np.ndarray[np.long_t, ndim=2] d_image
    cdef np.ndarray[np.double_t, ndim=2] new_peaks
    cdef np.ndarray[np.double_t, ndim=2] peaks
    
    peaks_coords = glrt_detection(image, peak_radius,
                                  w_s, threshold)
    peaks = gauss_estimation(image, peaks_coords, w_s)
    d_image = image_deflation(image, peaks, w_s)
    peaks_coords = glrt_detection(d_image, peak_radius,
                                  w_s, threshold)
    while len(peaks_coords) > 0 and len(peaks) < max_peaks:
        new_peaks = gauss_estimation(d_image, peaks_coords, w_s)
        # in case the 2D gauss fit fails
        if len(new_peaks) < 1:
            break
        peaks.extend(new_peaks[:])
        d_image = image_deflation(d_image, new_peaks, w_s)
        peaks_coords = glrt_detection(d_image, peak_radius,
                                      w_s, threshold)
        return peaks


@cython.boundscheck(False) 
@cython.nonecheck(False)
@cython.cdivision(False)
cdef np.ndarray[np.long_t, ndim=2] image_deflation(np.ndarray[np.long_t, ndim=2] image,
                                                      np.ndarray[np.double_t, ndim=2] peaks, 
                                                      np.double_t w_s):
    """
    Substracts the detected Gaussian peaks from the input image and
    returns the deflated image.
    """
    cdef np.ndarray[np.double_t, ndim=1] deflated_peak
    cdef np.ndarray[np.long_t, ndim=2] d_image
    cdef np.double_t xc
    cdef np.double_t yc
    cdef np.double_t xc_rel
    cdef np.double_t yc_rel
    cdef np.double_t low_x
    cdef np.double_t low_y
    cdef np.double_t width
    cdef np.double_t I

    d_image = image.copy()
    for peak in peaks:
        xc, yc, width, I = peak
        xc_rel = w_s // 2 + xc - np.floor(xc)
        yc_rel = w_s // 2 + yc - np.floor(yc)
        low_x = int(xc - w_s // 2)
        low_y = int(yc - w_s // 2)
        params = xc_rel, yc_rel, width, I, 0
        deflated_peak = gauss_continuous(params, w_s)
        d_image[low_x:low_x + w_s,
                low_y:low_y + w_s] -= deflated_peak.reshape((w_s, w_s))
    return d_image


@cython.boundscheck(False) 
@cython.nonecheck(False)
@cython.cdivision(False)
cdef np.ndarray[np.double_t, ndim=2] gauss_estimation(np.ndarray[np.long_t, ndim=2] image,
                                                       np.ndarray[np.long_t, ndim=2] peaks_coords,
                                                       np.double_t w_s):
    """
    Least square fit of a 2D Gauss peaks (with radial symmetry)
    on regions of width `w_s` centered on each element
    of `peaks_coords`.

    Parameters:
    ----------
    image : 2D array
        a greyscale 2D input image.
    peaks_coords: iterable of pairs of int.
       The peaks_coords should contain `(x, y)` pairs
       corresponding to the approximate peak center,
       in pixels.
    """

    cdef np.ndarray[np.double_t, ndim=2] peaks
    cdef np.double_t low_x
    cdef np.double_t low_y
    cdef np.ndarray[np.double_t, ndim=1] params
    cdef np.int_t success
    cdef np.double_t xc
    cdef np.double_t yc
    cdef np.double_t width
    cdef np.double_t I
    cdef np.double_t bg
    cdef np.ndarray[np.long_t, ndim=2] patch

    tmp_peaks = []
    for coords in peaks_coords:
        low_x, low_y = coords - w_s // 2
        try:
            patch = image[low_x: low_x + w_s,
                          low_y: low_y + w_s]
            params, success =  gauss_estimate(patch, w_s)
            xc, yc, width, I, bg = params
            if success and I > 0 and width < w_s:
                tmp_peaks.append([xc + low_x, yc + low_y, width, I])
        except IndexError:
            continue
            
    peaks = np.array(tmp_peaks)
    return peaks


@cython.boundscheck(False) 
@cython.nonecheck(False)
@cython.cdivision(False)
cpdef np.ndarray[np.long_t, ndim=2] glrt_detection(np.ndarray[np.long_t, ndim=2] image, 
                                                   np.double_t r0,
                                                   np.double_t w_s, 
                                                   np.double_t threshold):
    """
    Implements the Generalized Likelyhood Ratio Test, by
    computing equation 4 in Segré et al. Supplementary Note (p. 12)
    in a window sliding other the image.

    Parameters:
    ----------
    image: array
        the 2D input image
    r0: float
        the detected Gaussian peak 1/e radius
    w_s: int
        Size of the sliding window over which the test is
        computed ( :math:`w_s` in the article).
    threshold: float
        Criterium for a positive detection (i.e. the null hypothesis is false).
        Corresponds to the :mat:`\chi^2` parameter in the Constant False
        Alarm Rate section of the article supplementary text (p. 12).
        A higher `threshold` corresponds to a more stringent test.
        According to the authors, this parameters needs to be adjusted
        once for a given data set.

    Returns:
    --------

    peaks_coords: array
        An Nx2 array containing (x, y) pairs of the detected peaks
        in integer pixel coordinates.
    """
    cdef np.int_t w
    cdef np.int_t h
    cdef np.ndarray[np.double_t, ndim=2] g_patch
    cdef np.float64_t g_squaresum
    cdef np.ndarray[np.float64_t, ndim=1] hmap
    cdef np.ndarray[np.float64_t, ndim=2] reshaped_hmap
    cdef np.ndarray[np.long_t, ndim=2] peaks_coords

    w = image.shape[0]
    h = image.shape[1]
    g_patch = gauss_patch(r0, w_s)    
    g_patch -= g_patch.mean()
    g_squaresum = np.sum(g_patch ** 2)
    hmap = np.array([hypothesis_map(image[i: i + w_s, j: j + w_s],
                                    g_patch, g_squaresum)
                     for i, j in np.ndindex((w - w_s, h - w_s))])
    reshaped_hmap = -2 * hmap.reshape((w - w_s, h - w_s))
    peaks_coords = feature.peak_local_max(reshaped_hmap, 3,
                                          threshold_abs=threshold)
    peaks_coords += w_s // 2
    return peaks_coords


@cython.boundscheck(False) 
@cython.nonecheck(False)
@cython.cdivision(False)
cdef np.float64_t hypothesis_map(np.ndarray[np.int_t, ndim=2] patch,
                                 np.ndarray[np.float64_t, ndim=2] g_patch,
                                 np.float64_t g_squaresum):
    """
    Computes the ratio for a given patch position.
    """
    cdef np.int_t w_s
    cdef np.ndarray[np.float64_t, ndim=2] multiplicative
    cdef np.float64_t intensity
    cdef np.float64_t normalisation
    cdef np.float64_t ratio
    
    w_s = patch.shape[0]
    multiplicative = g_patch * patch
    intensity = multiplicative.sum()
    normalisation = w_s * patch.std(keepdims=False)
    ratio = (w_s ** 2. / 2.) * np.log(1 - (intensity / normalisation) ** 2 / g_squaresum)
    return ratio


def gauss_estimate(patch, w_s):
    """
    Least square 2D gauss fit
    """    
    params0 = [w_s / 2., w_s / 2., 3., np.float(patch.max() - patch.min()), np.float(patch.min())]
    errfunc = lambda p: patch.flatten() - gauss_continuous(tuple(p), w_s)
    sq = leastsq(errfunc, params0, xtol=0.01, full_output=False)
    return sq


@cython.boundscheck(False) 
@cython.nonecheck(False)
@cython.cdivision(False)
cdef np.ndarray[np.double_t, ndim=1] gauss_continuous(tuple params, np.double_t w_s):
    """
    2D gauss function with a float center position
    """
    cdef np.double_t xc
    cdef np.double_t yc
    cdef np.double_t width
    cdef np.double_t I
    cdef np.double_t bg
    cdef np.ndarray[np.double_t, ndim=1] x
    cdef np.ndarray[np.double_t, ndim=1] y
    cdef np.ndarray[np.double_t, ndim=2] g_patch
    cdef np.ndarray[np.double_t, ndim=1] g_patch_flat
    
    xc, yc, width, I, bg = params
    xc = np.float(xc)
    yc = np.float(yc)
    x = np.exp(- (np.arange(0, w_s) - xc) ** 2 / width ** 2)
    y = np.exp(- (np.arange(0, w_s) - yc) ** 2 / width ** 2)
    g_patch = I * np.outer(x, y) + bg
    g_patch_flat = g_patch.flatten()

    return g_patch_flat

@cython.boundscheck(False) 
@cython.nonecheck(False)
@cython.cdivision(False) 
cdef np.ndarray[np.double_t, ndim=2] gauss_patch(np.double_t r0, np.double_t w_s):
    """
    Computes an w_s by w_s image with a
    power normalized  Gaussian peak with radial symmetry
    at its center.
    """
    cdef np.ndarray[np.double_t, ndim=1] x
    cdef np.ndarray[np.double_t, ndim=1] y
    cdef np.double_t A
    cdef np.ndarray[np.double_t, ndim=2] g_patch
    
    x = np.exp(- (np.arange(w_s, dtype='double') - w_s // 2) ** 2 / r0 ** 2)
    y = np.exp(- (np.arange(w_s, dtype='double') - w_s // 2) ** 2 / r0 ** 2)
    A = 1. / (np.sqrt(np.pi) * r0)
    g_patch = A * np.outer(x, y)
    return g_patch

In [29]:
import sys
sys.path.append("..")
from peak_detection.tifffile import TiffFile
from peak_detection.detection import _find_gaussian_peaks

detection_parameters = {'w_s': 10,
                        'peak_radius': 4.,
                        'threshold': 60.,
                        'max_peaks': 10
                        }

fname = 'sample.tif'
tf = TiffFile(fname)
frame = tf.asarray()[2][2]
frame = frame.astype(np.int)

In [30]:
%prun -l 50 peaks = find_gaussian_peaks_cython(frame, **detection_parameters)




In [26]:
%prun -l 10 peaks = _find_gaussian_peaks(frame, **detection_parameters)




In [31]:
%timeit peaks = find_gaussian_peaks_cython(frame, **detection_parameters)
%timeit peaks = _find_gaussian_peaks(frame, **detection_parameters)


1 loops, best of 3: 1.24 s per loop
1 loops, best of 3: 1.19 s per loop

In [ ]: