Example on how to extract peaks from a accelerometer signal captured by a gcdataconcepts device.
Requires Python (tested with 2.7 but should also work with 3.3).
Required external Python packages:
(c) 2013 Wannes Meert, KU Leuven
In [18]:
%matplotlib inline
#%matplotlib gtk
In [19]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.core.display import display
import math
The accproc module contains some code to extract peaks. You are free to inspect the code the see which SciPy methods are useful for peak detection or to use an alternative implementation.
In [20]:
# The accproc scripts are loaded in a try-block such that you
# can update the code if necessary
def reloadaccproc():
print("Reloading accproc")
dreload(accproc,
exclude=['pandas', 'scipy', 'scipy.optimize', 'scipy.signal',
'matplotlib', 'matplotlib.pyplot', 'numpy',
'sys', 'os', 'math', 'os.path',
'__builtin__', '__main__'])
try:
accproc
reloadaccproc()
except NameError:
import accproc
In [21]:
data = accproc.readGCDCFormat("./Runs/Example/enkel/DATA-001.csv")
print("Before preprocessing")
display(data[0:10])
# Preprocessing translates the acceleration to units of g
# and removes the gravitational component.
data = accproc.preprocessGCDC(data)
print("After preprocessing")
display(data[0:10])
data.plot()
Out[21]:
Look at a short slice of the data (seconds 100 to 105)
In [22]:
dataslice = data[(100 < data.index) & (data.index < 105)]
plt.figure(figsize=plt.figaspect(0.33))
dataslice.plot()
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Ax signal")
dataslice.Ax.plot()
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Atotal signal")
dataslice.Atotal.plot()
Out[22]:
The signals should be close to zero (no acceleration) when the test subject is not running.
In [23]:
calslice = data[(20 < data.index) & (data.index < 35)]
plt.figure(figsize=plt.figaspect(0.33))
calslice.plot()
Out[23]:
When loading ankle data, Ax and Az are the most relevant ones.
In [24]:
print(accproc.detectPeaksGCDC.__doc__)
In [25]:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Ax with Envelope+Butterworth filter (5Hz)")
_max = accproc.detectPeaksGCDC(dataslice, columnname="Ax",
detection={'delta': 1.0},
smooth={'type': 'hilbert,butter',
'fcol': 5,
'correct': True},
plot=True, verbose=True)
Ay is the forward axis for the ankle sensor, not ideal to measure steps.
In [26]:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Az with Hilbert+Butter filter (enveloppe+lowpass@10Hz) and correcting")
_ = accproc.detectPeaksGCDC(dataslice, columnname="Az",
detection={'lookahead':40, 'delta':0.4},
smooth={'type':'hilbert,butter',
'fcol':10,
'window':31, 'correct':True},
plot=True, verbose=True)
In [27]:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Atotal with Hilbert+Butter filter (enveloppe+lowpass@10Hz) and correcting")
_ = accproc.detectPeaksGCDC(dataslice, columnname="Atotal",
detection={'type':'simple',
'lookahead':50, 'delta':1.0},
smooth={'type':'hilbert,butter',
'fcol':5,
'correct':True, 'dist':0.1},
plot=True, verbose=True)
Alternatively you can try to use peak detection using wavelet convolution which is more powerful but harder to calculate. (See also scipy.signal.find_peaks_cwt)
In [28]:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Atotal with Hilbert+Butter filter (enveloppe+lowpass@10Hz), correcting and cwt")
_ = accproc.detectPeaksGCDC(dataslice, columnname="Atotal",
detection={'type':'cwt',
'widths':np.arange(60,160,10)},
plot=True, verbose=True)
In [29]:
def getSteptimes(peaks):
"""Calculate the time between every two peaks.
Keyword arguments:
peaks -- List of times of peaks
"""
steptimes = []
for i in range(len(peaks)-1):
steptimes.append(peaks[i+1][0]-peaks[i][0])
return steptimes
def calcStatisticsForAnkle(data, plot=False):
"""Calculate statistics for the ankle.
Only Ax, Az and Atotal are used.
Keyword arguments:
data -- DataFrame object
plot -- Visualize peak detection
"""
if plot:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Ax with Hilbert+Butter filter (enveloppe+lowpass@6Hz) and correcting")
Ax_max = accproc.detectPeaksGCDC(data, columnname="Ax",
detection={'delta':0.7},
smooth={'type':'hilbert,butter', 'fcol':6, 'correct':True},
plot=plot, verbose=True)
if plot:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Az with Hilbert+Butter filter (enveloppe+lowpass@10Hz) and correcting")
Az_max = accproc.detectPeaksGCDC(data, columnname="Az",
detection={'lookahead':40, 'delta':0.4},
smooth={'type':'hilbert,butter', 'fcol':10, 'correct':True},
plot=plot, verbose=True)
if plot:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Atotal with Hilbert+Butter filter (enveloppe+lowpass@10Hz) and correcting")
At_max = accproc.detectPeaksGCDC(data, columnname="Atotal",
detection={'type':'simple',
'lookahead':50, 'delta':1.0},
smooth={'type':'hilbert,butter',
'fcol':5,
'correct':True, 'dist':0.1},
plot=plot, verbose=True)
Ax_xm,Ax_ym = zip(*Ax_max)
Az_xm,Az_ym = zip(*Az_max)
At_xm,At_ym = zip(*At_max)
Ax_steptimes = getSteptimes(Ax_max)
Az_steptimes = getSteptimes(Az_max)
At_steptimes = getSteptimes(At_max)
def stats(A,ym,steptimes):
return {'avg' : np.mean(A.values),
'avgPeak' : np.mean(ym),
'stdPeak' : np.std(ym),
'avgSteptime' : np.mean(steptimes),
'stdSteptime' : np.std(steptimes)}
r = {'Ax': stats(data.Ax, Ax_ym, Ax_steptimes),
'Az': stats(data.Az, Az_ym, Az_steptimes),
'Atotal': stats(data.Atotal, At_ym, At_steptimes)}
return r
def printStatistics(stats):
"""Pretty print generated statistics.
Keyword arguments:
stats -- Object as returned by calcStatisticsForAnkle
"""
for signalname,struct in stats.iteritems():
print("\n{}\n----------".format(signalname))
for statname,value in struct.iteritems():
print("{:<20} : {}".format(statname, value))
Take a larger dataslice to calculate some statistics.
In [30]:
dataslice2 = data[(60 < data.index) & (data.index < 120)]
stats = calcStatisticsForAnkle(dataslice2, plot=True)
printStatistics(stats)
In [31]:
import time, platform, IPython
print 'Last run in Python %s (%s), Numpy %s and IPython %s \non %s, %s.' \
%(platform.python_version(), platform.architecture()[0], np.__version__, IPython.__version__, \
platform.platform(), time.asctime(time.localtime(time.time())))