In [1]:
%load_ext cythonmagic
In [8]:
%%cython
# cython: cdivision=True
from __future__ import division
import numpy as np
from numpy import pi
cimport numpy as np
from libc.math cimport sqrt, exp, sin, cos, pow, M_PI
cimport cython
from cython.parallel import prange
# Defining data types: --------
# time:
DTYPE = np.float32
ctypedef np.float32_t DTYPE_t
# wavelet transform:
DTYPEC = np.complex128
ctypedef np.complex128_t DTYPEC_t
# Index
ctypedef Py_ssize_t index_t
# Input data. Could be float or double.
ctypedef np.float64_t data_t
# ---------------------------
# some optimizations for the ricker wavelet
# cdef np.float64_t __ricker_const = 2 / (sqrt(3 * sqrt(M_PI)))
# Hardcoded
cdef inline np.float64_t __ricker_const = 0.86732507058407748
cdef inline np.float64_t ricker(double t) nogil:
'''
Inlined Ricker transform.
NB: Normalization is assumed to have already occurred
t: normalised variable (x/a)
'''
return __ricker_const * (1 - t * t) * exp(-0.5 * t * t)
# inlined min & max
cdef inline int int_max(int a, int b) nogil: return a if a >= b else b
cdef inline int int_min(int a, int b) nogil: return a if a <= b else b
@cython.boundscheck(False)
@cython.wraparound(False)
@cython.cdivision(True)
cpdef np.ndarray[data_t, ndim=2] rwt(np.ndarray[data_t, ndim=1] x, np.ndarray[data_t, ndim=1] scales):
'''Perform Ricker continuous wavelet transform
Input:
-----
x: 1-D float64 np.ndarray. The data to be transformed.
scales: 1-D float64 np.ndarray. Scales at which to perform the transformation.
Returns:
-------
out: the cwt of the input.
'''
cdef:
int N = scales.shape[0]
int M = x.shape[0]
double s, t_norm
index_t i, tau, k
DTYPE_t integral_sum
int kminus, k_plus
int int_s
# Defining the output file.
cdef np.ndarray[DTYPE_t, ndim = 2] out = np.empty((N, M), dtype=DTYPE)
with nogil:
for i in xrange(N):
s = scales[i]
for tau in xrange(M):
integral_sum = 0.0
# Only certain values of the integral contribute due to the
# exponential decay. Allowing 8 sigma implies an error of 8e-14
# We don't want to go outside of the limits:
int_s = int(s)
kminus = int_max(tau - 8 * int_s, 1)
k_plus = int_min(tau + 8 * int_s, M - 1)
for k in xrange(kminus + 1, k_plus):
t_norm = (k - tau) / s
integral_sum += ricker(t_norm) * x[k]
# To make results coincide with trapezoidal rule, we also consider
# the extrema, if necessary.
if kminus == 1:
integral_sum += 0.5 * ricker(-tau / s) * x[0]
if k_plus == M - 1:
t_norm = (M - 1 - tau) / s
integral_sum += 0.5 * ricker(t_norm) * x[M - 1]
# Normalise and store.
out[i, tau] = integral_sum / sqrt(s)
return out
@cython.wraparound(False)
@cython.boundscheck(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cpdef np.ndarray[data_t, ndim=2] par_rwt(np.ndarray[data_t, ndim=1] x, np.ndarray[data_t, ndim=1] scales):
'''
Parrallelized Ricker CWT -- this has been verified as valid against the single threaded version
:param x: 1-D float64 np.ndarray. The data to be transformed.
:param scales: 1-D float64 np.ndarray. Scales at which to perform the transformation.
:returns out: the Ricker CWT of the input
'''
cdef:
int num_scales = scales.shape[0]
int num_data = x.shape[0]
double scale, t_norm
index_t scale_idx
DTYPE_t integral_sum
# Defining the output file.
cdef np.ndarray[DTYPE_t, ndim = 2] out = np.empty((num_scales, num_data), dtype=DTYPE)
with nogil:
for scale_idx in prange(num_scales):
# process the CWT in parrallel
scale = scales[scale_idx]
# void function that writes to the appropriate points in out
# warning: make sure the bounds here are correct
_rwt_single(&x[0], &out[scale_idx,0], scale, num_data)
return out
@cython.wraparound(False)
@cython.boundscheck(False)
@cython.nonecheck(False)
@cython.cdivision(True)
cdef inline void _rwt_single(data_t *x, DTYPE_t *out, data_t scale, int M) nogil:
'''
Inner loop of the Ricker CWT
x: data, pointer to first element
out: output array, pointer to segment that we want to write to
scale: what scale to run the computation
M: number of data points
'''
cdef:
double t_norm
index_t i, tau, k
DTYPE_t integral_sum
int k_minus, k_plus
int int_s
for tau in range(M):
integral_sum = 0.0
# Only certain values of the integral contribute due to the
# exponential decay. Allowing 8 sigma implies an error of 8e-14
# We don't want to go outside of the limits:
int_s = int(scale)
k_minus = int_max(tau - 8 * int_s, 1)
k_plus = int_min(tau + 8 * int_s, M - 1)
for k in xrange(k_minus + 1, k_plus):
t_norm = (k - tau) / scale
integral_sum += ricker(t_norm) * x[k]
# To make results coincide with trapezoidal rule, we also consider
# the extrema, if necessary.
if k_minus == 1:
integral_sum += 0.5 * ricker(-tau / scale) * x[0]
if k_plus == M - 1:
t_norm = (M - 1 - tau) / scale
integral_sum += 0.5 * ricker(t_norm) * x[M - 1]
# Normalise and store.
out[tau] = integral_sum / sqrt(scale)
In [16]:
from scipy import signal
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tables
df = pd.io.pytables.read_hdf('../../../dev/py_vs_q/db/trades.h5', 'table')
df.reset_index(inplace=True)
prices = df[df['SYM_ROOT'] == 'AAPL']['PRICE'][2000:3000].values
plt.plot(prices)
Out[16]:
In [18]:
prices.dtype
Out[18]:
In [19]:
scales = np.arange(1, len(prices)/10, dtype=np.float64)
w_coefs = rwt(prices, scales)**2
plt.pcolor(w_coefs)
plt.colorbar()
plt.show()
In [9]:
x = np.random.randn(10000).astype(np.float64)
scales = np.arange(1, 10, dtype=np.float64)
In [12]:
%%timeit
rwt(x, scales)
In [13]:
%%timeit
par_rwt(x, scales)
In [78]:
ps = par_rwt(x, scales)
ss = rwt(x, scales)
for i in xrange(9):
assert np.array_equal(ss[i], ps[i]), i
In [76]:
len(ps)
Out[76]:
In [67]:
import pstats, cProfile
cProfile.runctx("par_rwt(x, scales)", globals(), locals(), "Profile.prof")
s = pstats.Stats("Profile.prof")
s.strip_dirs().sort_stats("time").print_stats()
Out[67]:
In [ ]: