This notebook illustrates how empirical mode decomposition (EMD) decomposes a signal into multiple rhythmic components.
Outline
END THIS NOTEBOOK AND CREATE ANOTHER FOR MORE DETAILED STUFF
TODO
Some background
Load libraries
In [159]:
%config InlineBackend.figure_format = 'retina'
%matplotlib inline
import seaborn as sns
sns.set_style('white')
import numpy as np
import scipy as sp
from scipy import signal
import matplotlib.pyplot as plt
EMD functions
In [202]:
def emd(x, nIMF = 3, stoplim = .001):
"""Perform empirical mode decomposition to extract 'niMF' components out of the signal 'x'."""
r = x
t = np.arange(len(r))
imfs = np.zeros(nIMF,dtype=object)
for i in range(nIMF):
r_t = r
is_imf = False
while is_imf == False:
# Identify peaks and troughs
pks = sp.signal.argrelmax(r_t)[0]
trs = sp.signal.argrelmin(r_t)[0]
# Interpolate extrema
pks_r = r_t[pks]
fip = sp.interpolate.InterpolatedUnivariateSpline(pks,pks_r,k=3)
pks_t = fip(t)
trs_r = r_t[trs]
fitr = sp.interpolate.InterpolatedUnivariateSpline(trs,trs_r,k=3)
trs_t = fitr(t)
# Calculate mean
mean_t = (pks_t + trs_t) / 2
mean_t = _emd_complim(mean_t, pks, trs)
# Assess if this is an IMF (only look in time between peaks and troughs)
sdk = _emd_comperror(r_t, mean_t, pks, trs)
# if not imf, update r_t and is_imf
if sdk < stoplim:
is_imf = True
else:
r_t = r_t - mean_t
imfs[i] = r_t
r = r - imfs[i]
return imfs
def _emd_comperror(h, mean, pks, trs):
"""Calculate the normalized error of the current component"""
samp_start = np.max((np.min(pks),np.min(trs)))
samp_end = np.min((np.max(pks),np.max(trs))) + 1
return np.sum(np.abs(mean[samp_start:samp_end]**2)) / np.sum(np.abs(h[samp_start:samp_end]**2))
def _emd_complim(mean_t, pks, trs):
samp_start = np.max((np.min(pks),np.min(trs)))
samp_end = np.min((np.max(pks),np.max(trs))) + 1
mean_t[:samp_start] = mean_t[samp_start]
mean_t[samp_end:] = mean_t[samp_end]
return mean_t
Import data
In [203]:
minN = 7000
maxN = 9000
x = np.load('./exampledata.npy')
x = x[minN:maxN+1]
t = np.arange(0,len(x)*.001,.001)
Calculate and plot components
In [204]:
imfs = emd(x, nIMF = 3)
In [205]:
plt.figure(figsize=(12,12))
for i in range(len(imfs)):
plt.subplot(len(imfs),1,i+1)
plt.plot(t,x,color='0.5')
plt.plot(t,imfs[i],'k')
plt.ylim([-1000,1000])
WRITE DESCRIPTION OF COMPONENTS
EMD is an interative algorithm. Begin with a signal and repetitively modify it until it satisfies a certain criterion. Modify the original signal, and repeat the procedure to obtain each component.
The next section outlines the algorithm, broken down as:
Define decomposition parameters
In [179]:
nIMF = 3 # Number of components in which to decompose the signal
stoplim = .001 # Criterion to stop iteration and declare a signal to be a component
In [180]:
x_temp = x
pks = signal.argrelmax(x_temp)[0]
trs = signal.argrelmin(x_temp)[0]
In [181]:
# Plot
plt.figure(figsize=(15,3))
plt.plot(t,x_temp,'k')
plt.plot(t[pks],x_temp[pks],'r.')
plt.plot(t[trs],x_temp[trs],'b.')
plt.xlabel('Time (s)')
plt.ylabel('Voltage (uV)')
Out[181]:
In [182]:
x_pks = x[pks]
fip = sp.interpolate.InterpolatedUnivariateSpline(pks,x_pks,k=3)
pks_t = fip(range(len(x)))
x_trs = x_temp[trs]
fitr = sp.interpolate.InterpolatedUnivariateSpline(trs,x_trs,k=3)
trs_t = fitr(range(len(x)))
In [183]:
# Plot
plt.figure(figsize=(15,3))
plt.plot(t,x,'k')
plt.plot(t,pks_t,'r')
plt.plot(t,trs_t,'b')
plt.xlabel('Time (s)')
plt.ylabel('Voltage (uV)')
plt.ylim([-1000,1000])
Out[183]:
In [184]:
mean_t = (pks_t + trs_t) / 2
In [185]:
# Plot
plt.figure(figsize=(15,3))
plt.plot(t,x,'k')
plt.plot(t,mean_t,'r')
#plt.plot(t,x-mean_t,'g')
plt.xlabel('Time (s)')
plt.ylabel('Voltage (uV)')
plt.ylim([-1000,500])
Out[185]:
In [186]:
plt.plot(t,x-mean_t,'g')
Out[186]:
Notice the high frequency components of the signal are best preserved
WRITE INTUITION
In [171]:
samp_start = np.max((np.min(pks),np.min(trs)))
samp_end = np.min((np.max(pks),np.max(trs))) + 1
sdk = np.sum(np.abs(mean_t[samp_start:samp_end]**2)) / np.sum(np.abs(x_temp[samp_start:samp_end]**2))
if sdk < stoplim:
is_imf = True
else:
x_temp = x_temp - mean_t
is_imf = False
If the signal satisfies the criterion:
If the signal does not satisfy the criterion:
do each plot is 1 component and each row of subplots is 1 step
In [172]:
nIMF = 3
imfs = np.zeros(nIMF,dtype=object)
r = x
for i in range(nIMF):
r_t = r
is_imf = False
while is_imf == False:
# Identify peaks and troughs
pks = sp.signal.argrelmax(r_t)[0]
trs = sp.signal.argrelmin(r_t)[0]
# Interpolate extrema
pks_r = r_t[pks]
fip = sp.interpolate.InterpolatedUnivariateSpline(pks,pks_r,k=3)
pks_t = fip(range(len(r_t)))
trs_r = r_t[trs]
fitr = sp.interpolate.InterpolatedUnivariateSpline(trs,trs_r,k=3)
trs_t = fitr(range(len(r_t)))
# Calculate mean
mean_t = (pks_t + trs_t) / 2
# Assess if this is an IMF
sdk = _emd_stop(r_t, mean_t,pks,trs)
print sdk
# debug: make sure everything looks right, because getting weird interp error after a few iterations; error not converging?
plt.figure()
plt.subplot(3,1,1)
plt.plot(x)
plt.plot(r_t)
plt.plot(r)
plt.ylim((-1000,1000))
plt.subplot(3,1,2)
plt.plot(pks,pks_r,'k.',ms=10)
plt.plot(pks_t)
plt.plot(trs,trs_r,'k.',ms=10)
plt.plot(trs_t)
plt.plot(mean_t)
plt.ylim((-1000,1000))
plt.subplot(3,1,3)
plt.plot(r_t - mean_t)
plt.ylim((-1000,1000))
# if not imf, update r_t and is_imf
if sdk < stoplim:
is_imf = True
else:
r_t = r_t - mean_t
imfs[i] = r_t
r = r - imfs[i]
Advantages of EMD
Disadvantages of EMD
Description of pittman motivation, procedure, and results example with my data example with theta data or something that works better