In [1]:
%load_ext cythonmagic

In [8]:
%%cython
# cython: cdivision=True

from __future__ import division

import numpy as np
from numpy import pi
cimport numpy as np

from libc.math cimport sqrt, exp, sin, cos, pow, M_PI

cimport cython
from cython.parallel import prange

# Defining data types: --------
# time:
DTYPE = np.float32
ctypedef np.float32_t DTYPE_t

# wavelet transform:
DTYPEC = np.complex128
ctypedef np.complex128_t DTYPEC_t

# Index
ctypedef Py_ssize_t index_t

# Input data. Could be float or double.
ctypedef np.float64_t data_t
# ---------------------------

# some optimizations for the ricker wavelet
# cdef np.float64_t __ricker_const = 2 / (sqrt(3 * sqrt(M_PI)))
# Hardcoded 
cdef inline np.float64_t __ricker_const = 0.86732507058407748

cdef inline np.float64_t ricker(double t) nogil:
    '''
    Inlined Ricker transform.

    NB: Normalization is assumed to have already occurred

    t: normalised variable (x/a)
    '''

    return __ricker_const * (1 - t * t) * exp(-0.5 * t * t)

# inlined min & max
cdef inline int int_max(int a, int b) nogil: return a if a >= b else b
cdef inline int int_min(int a, int b) nogil: return a if a <= b else b

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
cpdef np.ndarray[data_t, ndim=2] rwt(np.ndarray[data_t, ndim=1] x, np.ndarray[data_t, ndim=1] scales):
    '''Perform Ricker continuous wavelet transform

    Input:
    -----
        x:      1-D float64 np.ndarray. The data to be transformed.
        scales: 1-D float64 np.ndarray. Scales at which to perform the transformation.

    Returns:
    -------
        out: the cwt of the input.
        
    '''

    cdef:
        int N = scales.shape[0]
        int M = x.shape[0]
        double s, t_norm
        index_t i, tau, k
        DTYPE_t integral_sum
        int kminus, k_plus
        int int_s

    # Defining the output file.
    cdef np.ndarray[DTYPE_t, ndim = 2] out = np.empty((N, M), dtype=DTYPE)

    with nogil:
        for i in xrange(N):
            s = scales[i]
            for tau in xrange(M):
                integral_sum = 0.0

                # Only certain values of the integral contribute due to the
                # exponential decay. Allowing 8 sigma implies an error of 8e-14

                # We don't want to go outside of the limits:
                int_s = int(s)
                kminus = int_max(tau - 8 * int_s, 1)
                k_plus = int_min(tau + 8 * int_s, M - 1)

                for k in xrange(kminus + 1, k_plus):
                    t_norm = (k - tau) / s
                    integral_sum += ricker(t_norm) * x[k]

                # To make results coincide with trapezoidal rule, we also consider
                # the extrema, if necessary.
                if kminus == 1:
                    integral_sum += 0.5 * ricker(-tau / s) * x[0]

                if k_plus == M - 1:
                    t_norm = (M - 1 - tau) / s
                    integral_sum += 0.5 * ricker(t_norm) * x[M - 1]

                # Normalise and store.
                out[i, tau] = integral_sum / sqrt(s)

    return out


@cython.wraparound(False)
@cython.boundscheck(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cpdef np.ndarray[data_t, ndim=2] par_rwt(np.ndarray[data_t, ndim=1] x, np.ndarray[data_t, ndim=1] scales):
    '''
    Parrallelized Ricker CWT -- this has been verified as valid against the single threaded version

    :param x:      1-D float64 np.ndarray. The data to be transformed.
    :param scales: 1-D float64 np.ndarray. Scales at which to perform the transformation.

    :returns out:  the Ricker CWT of the input
        
    '''

    cdef:
        int num_scales = scales.shape[0]
        int num_data = x.shape[0]
        double scale, t_norm
        index_t scale_idx
        DTYPE_t integral_sum

    # Defining the output file.
    cdef np.ndarray[DTYPE_t, ndim = 2] out = np.empty((num_scales, num_data), dtype=DTYPE)

    with nogil:
        for scale_idx in prange(num_scales):
            # process the CWT in parrallel
            scale = scales[scale_idx]
            
            # void function that writes to the appropriate points in out
            # warning: make sure the bounds here are correct
            _rwt_single(&x[0], &out[scale_idx,0], scale, num_data)

    return out


@cython.wraparound(False)
@cython.boundscheck(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cdef inline void _rwt_single(data_t *x, DTYPE_t *out, data_t scale, int M) nogil:
    '''
    Inner loop of the Ricker CWT
    
    x: data, pointer to first element
    out: output array, pointer to segment that we want to write to
    scale: what scale to run the computation
    M: number of data points
    '''
    cdef:
        double t_norm
        index_t i, tau, k
        DTYPE_t integral_sum
        int k_minus, k_plus
        int int_s
        
    for tau in range(M):
        integral_sum = 0.0

        # Only certain values of the integral contribute due to the
        # exponential decay. Allowing 8 sigma implies an error of 8e-14

        # We don't want to go outside of the limits:
        int_s = int(scale)
        k_minus = int_max(tau - 8 * int_s, 1)
        k_plus = int_min(tau + 8 * int_s, M - 1)

        for k in xrange(k_minus + 1, k_plus):
            t_norm = (k - tau) / scale
            integral_sum += ricker(t_norm) * x[k]

        # To make results coincide with trapezoidal rule, we also consider
        # the extrema, if necessary.
        if k_minus == 1:
            integral_sum += 0.5 * ricker(-tau / scale) * x[0]

        if k_plus == M - 1:
            t_norm = (M - 1 - tau) / scale
            integral_sum += 0.5 * ricker(t_norm) * x[M - 1]

        # Normalise and store.
        out[tau] = integral_sum / sqrt(scale)

In [16]:
from scipy import signal
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tables

df = pd.io.pytables.read_hdf('../../../dev/py_vs_q/db/trades.h5', 'table')
df.reset_index(inplace=True)
prices = df[df['SYM_ROOT'] == 'AAPL']['PRICE'][2000:3000].values
plt.plot(prices)


Out[16]:
[<matplotlib.lines.Line2D at 0x7fe32b87c310>]

In [18]:
prices.dtype


Out[18]:
dtype('float64')

In [19]:
scales = np.arange(1, len(prices)/10, dtype=np.float64)

w_coefs = rwt(prices, scales)**2
plt.pcolor(w_coefs)
plt.colorbar()
plt.show()



In [9]:
x = np.random.randn(10000).astype(np.float64)
scales = np.arange(1, 10, dtype=np.float64)

In [12]:
%%timeit

rwt(x, scales)


1 loops, best of 3: 316 ms per loop

In [13]:
%%timeit

par_rwt(x, scales)


1 loops, best of 3: 315 ms per loop

In [78]:
ps = par_rwt(x, scales)
ss = rwt(x, scales)

for i in xrange(9):
    assert np.array_equal(ss[i], ps[i]), i

In [76]:
len(ps)


Out[76]:
9

In [67]:
import pstats, cProfile

cProfile.runctx("par_rwt(x, scales)", globals(), locals(), "Profile.prof")

s = pstats.Stats("Profile.prof")
s.strip_dirs().sort_stats("time").print_stats()


Mon Jul 28 12:00:18 2014    Profile.prof

         4 function calls in 0.011 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.011    0.011    0.011    0.011 _cython_magic_3b6b6e25349a0cc1181d5827102bdfd0.pyx:116(par_rwt)
        1    0.000    0.000    0.011    0.011 <string>:1(<module>)
        1    0.000    0.000    0.011    0.011 {_cython_magic_3b6b6e25349a0cc1181d5827102bdfd0.par_rwt}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}


Out[67]:
<pstats.Stats instance at 0x107b3e5f0>

In [ ]: