Accelerometer Peak Finding

Example on how to extract peaks from a accelerometer signal captured by a gcdataconcepts device.

Requires Python (tested with 2.7 but should also work with 3.3).

Required external Python packages:

  • Numpy
  • Matplotlib
  • Pandas

(c) 2013 Wannes Meert, KU Leuven


In [18]:
%matplotlib inline
#%matplotlib gtk

In [19]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.core.display import display
import math

The accproc module contains some code to extract peaks. You are free to inspect the code the see which SciPy methods are useful for peak detection or to use an alternative implementation.


In [20]:
# The accproc scripts are loaded in a try-block such that you
# can update the code if necessary
def reloadaccproc():
    print("Reloading accproc")
    dreload(accproc,
            exclude=['pandas', 'scipy', 'scipy.optimize', 'scipy.signal',
                     'matplotlib', 'matplotlib.pyplot', 'numpy',
                     'sys', 'os', 'math', 'os.path',
                     '__builtin__', '__main__'])

try:
    accproc
    reloadaccproc()
except NameError:
    import accproc


Reloading accproc

Pre-Processing


In [21]:
data = accproc.readGCDCFormat("./Runs/Example/enkel/DATA-001.csv")
print("Before preprocessing")
display(data[0:10])

# Preprocessing translates the acceleration to units of g 
# and removes the gravitational component.
data = accproc.preprocessGCDC(data)
print("After preprocessing")
display(data[0:10])

data.plot()


Before preprocessing
Ax Ay Az
Time
2.999 976 254 -40
3.000 962 254 -38
3.002 978 256 -38
3.005 974 252 -36
3.007 972 252 -34
3.010 968 252 -46
3.013 976 254 -40
3.015 978 256 -38
3.018 968 254 -36
3.020 978 256 -52
Avg Ax :  1077.07 ->  1.05g
Avg Ay :  -383.10 -> -0.37g
Avg Az :   242.44 ->  0.24g
After preprocessing
Ax Ay Az Atotal
Time
2.999 -0.098704 0.622168 -0.275821 0.687687
3.000 -0.112376 0.622168 -0.273868 0.689003
3.002 -0.096751 0.624121 -0.273868 0.688398
3.005 -0.100658 0.620215 -0.271915 0.684643
3.007 -0.102611 0.620215 -0.269962 0.684160
3.010 -0.106517 0.620215 -0.281680 0.689461
3.013 -0.098704 0.622168 -0.275821 0.687687
3.015 -0.096751 0.624121 -0.273868 0.688398
3.018 -0.106517 0.622168 -0.271915 0.687297
3.020 -0.096751 0.624121 -0.287540 0.693950
Out[21]:
<matplotlib.axes.AxesSubplot at 0x113cac0d0>

Look at a short slice of the data (seconds 100 to 105)


In [22]:
dataslice = data[(100 < data.index) & (data.index < 105)]

plt.figure(figsize=plt.figaspect(0.33))
dataslice.plot()

plt.figure(figsize=plt.figaspect(0.33))
plt.title("Ax signal")
dataslice.Ax.plot()

plt.figure(figsize=plt.figaspect(0.33))
plt.title("Atotal signal")
dataslice.Atotal.plot()


Out[22]:
<matplotlib.axes.AxesSubplot at 0x114a70a50>
<matplotlib.figure.Figure at 0x11509f8d0>

Calibration

The signals should be close to zero (no acceleration) when the test subject is not running.


In [23]:
calslice = data[(20 < data.index) & (data.index < 35)]

plt.figure(figsize=plt.figaspect(0.33))
calslice.plot()


Out[23]:
<matplotlib.axes.AxesSubplot at 0x114d7bb10>
<matplotlib.figure.Figure at 0x113c96ed0>

Find Peaks

When loading ankle data, Ax and Az are the most relevant ones.


In [24]:
print(accproc.detectPeaksGCDC.__doc__)


Return the peaks found in `columnname`

    Check the code of this function to know what methods are available in,
    for example, scipy to extract peaks.

    Keyword arguments:
    data -- DataFrame object
    columname -- Name of column
                 (default: "Atotal")
    detection -- Specify type of detection algorithm as dict:
                 - 'type'      : One of:
                                 * 'simple'
                                 * 'cwt' (Wavelet Convolution)
                                 (default: 'simple')
                 - 'delta'     : Minimal diff between peak and surrounding
                                 points
                                 (for simple, default: 0.9)
                 - 'lookahead' : At a sampling rate of 400Hz there can be
                                 maximally one peak for every lookahead/400
                                 seconds.
                                 (for simple, default: 20)
                 - 'widths'    : 1D array of widths in steps to use for
                                 calculating the CWT matrix. See
                                 scipy.signal.find_peaks_cwt.
                                 (for cwt, default: np.arange(60,100,10))
    smooth -- Smooth signal first with the following settings as a dict:
              - 'type'    : Comma separated list of
                            * 'hilbert' (Hilbert enveloppe)
                            * 'sg' (Savitzky-Golay)
                            * 'butter' (Butterworth)
                            (default: butter)
              - 'correct' : Use the peaks in the original signal but based
                            on the smoothed signal
                            (default: False)
              - 'dist'    : Max distance between peaks from smoothed and
                            non-smoothed (if correct is true).
                            (default: 0.05)
              - 'fs'      : Sampling frequency
                            (for Butterworth, default: 400)
              - 'fcol'    : Low pass cut-off frequency
                            (for Butterwort, default: 10)
              - 'window'  : Window over which to smooth (in steps)
                            (for Savitzky-Golay, default: 31)
              (default: None)
    plot -- Show plot of results (default: False)

    Returns pairs of (x,y) for peaks
    

Ax


In [25]:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Ax with Envelope+Butterworth filter (5Hz)")
_max = accproc.detectPeaksGCDC(dataslice, columnname="Ax",
                                     detection={'delta': 1.0},
                                     smooth={'type': 'hilbert,butter',
                                             'fcol': 5,
                                             'correct': True},
                                     plot=True, verbose=True)


Ay

Ay is the forward axis for the ankle sensor, not ideal to measure steps.

Az


In [26]:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Az with Hilbert+Butter filter (enveloppe+lowpass@10Hz) and correcting")
_ = accproc.detectPeaksGCDC(dataslice, columnname="Az",
                            detection={'lookahead':40, 'delta':0.4},
                            smooth={'type':'hilbert,butter',
                                    'fcol':10,
                                    'window':31, 'correct':True},
                            plot=True, verbose=True)


Atotal


In [27]:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Atotal with Hilbert+Butter filter (enveloppe+lowpass@10Hz) and correcting")
_ = accproc.detectPeaksGCDC(dataslice, columnname="Atotal",
                            detection={'type':'simple',
                                       'lookahead':50, 'delta':1.0},
                            smooth={'type':'hilbert,butter',
                                    'fcol':5,
                                    'correct':True, 'dist':0.1},
                            plot=True, verbose=True)


Alternatively you can try to use peak detection using wavelet convolution which is more powerful but harder to calculate. (See also scipy.signal.find_peaks_cwt)


In [28]:
plt.figure(figsize=plt.figaspect(0.33))
plt.title("Atotal with Hilbert+Butter filter (enveloppe+lowpass@10Hz), correcting and cwt")
_ = accproc.detectPeaksGCDC(dataslice, columnname="Atotal",
                            detection={'type':'cwt',
                                       'widths':np.arange(60,160,10)},
                            plot=True, verbose=True)


Peak statistics


In [29]:
def getSteptimes(peaks):
    """Calculate the time between every two peaks.
    
    Keyword arguments:
    peaks -- List of times of peaks
    """
    steptimes = []
    for i in range(len(peaks)-1):
        steptimes.append(peaks[i+1][0]-peaks[i][0])
    return steptimes

def calcStatisticsForAnkle(data, plot=False):
    """Calculate statistics for the ankle.
    
       Only Ax, Az and Atotal are used.
       
       Keyword arguments:
       data -- DataFrame object
       plot -- Visualize peak detection
    """
    if plot:
        plt.figure(figsize=plt.figaspect(0.33))
        plt.title("Ax with Hilbert+Butter filter (enveloppe+lowpass@6Hz) and correcting")
    
    Ax_max = accproc.detectPeaksGCDC(data, columnname="Ax",
                                     detection={'delta':0.7},
                                     smooth={'type':'hilbert,butter', 'fcol':6, 'correct':True},
                                     plot=plot, verbose=True)

    if plot:
        plt.figure(figsize=plt.figaspect(0.33))
        plt.title("Az with Hilbert+Butter filter (enveloppe+lowpass@10Hz) and correcting")
    
    Az_max = accproc.detectPeaksGCDC(data, columnname="Az",
                                     detection={'lookahead':40, 'delta':0.4},
                                     smooth={'type':'hilbert,butter', 'fcol':10, 'correct':True},
                                     plot=plot, verbose=True)
    
    if plot:
        plt.figure(figsize=plt.figaspect(0.33))
        plt.title("Atotal with Hilbert+Butter filter (enveloppe+lowpass@10Hz) and correcting")
    
    At_max = accproc.detectPeaksGCDC(data, columnname="Atotal",
                                     detection={'type':'simple',
                                                'lookahead':50, 'delta':1.0},
                                     smooth={'type':'hilbert,butter',
                                             'fcol':5,
                                             'correct':True, 'dist':0.1},
                                     plot=plot, verbose=True)
    
    Ax_xm,Ax_ym = zip(*Ax_max)
    Az_xm,Az_ym = zip(*Az_max)
    At_xm,At_ym = zip(*At_max)
    
    Ax_steptimes = getSteptimes(Ax_max)
    Az_steptimes = getSteptimes(Az_max)
    At_steptimes = getSteptimes(At_max)
    
    def stats(A,ym,steptimes):
        return {'avg' : np.mean(A.values),
                'avgPeak' : np.mean(ym),
                'stdPeak' : np.std(ym),
                'avgSteptime' : np.mean(steptimes),
                'stdSteptime' : np.std(steptimes)}
    
    r = {'Ax': stats(data.Ax, Ax_ym, Ax_steptimes),
         'Az': stats(data.Az, Az_ym, Az_steptimes),
         'Atotal': stats(data.Atotal, At_ym, At_steptimes)}
    return r

def printStatistics(stats):
    """Pretty print generated statistics.
    
    Keyword arguments:
    stats -- Object as returned by calcStatisticsForAnkle
    """
    for signalname,struct in stats.iteritems():
        print("\n{}\n----------".format(signalname))
        for statname,value in struct.iteritems():
            print("{:<20} : {}".format(statname, value))

Take a larger dataslice to calculate some statistics.


In [30]:
dataslice2 = data[(60 < data.index) & (data.index < 120)]

stats = calcStatisticsForAnkle(dataslice2, plot=True)
printStatistics(stats)


Ax
----------
avgSteptime          : 0.704261904762
avg                  : 0.0595518566325
stdPeak              : 1.03158732224
avgPeak              : 2.64458603814
stdSteptime          : 0.22732005542

Az
----------
avgSteptime          : 0.845405797101
avg                  : 0.0872569582803
stdPeak              : 0.456660356657
avgPeak              : 1.955596291
stdSteptime          : 0.100440813755

Atotal
----------
avgSteptime          : 0.833985714286
avg                  : 1.21678020408
stdPeak              : 0.865144567447
avgPeak              : 4.11417406593
stdSteptime          : 0.0309277927979


In [31]:
import time, platform, IPython
print 'Last run in Python %s (%s), Numpy %s and IPython %s \non %s, %s.'  \
%(platform.python_version(), platform.architecture()[0], np.__version__, IPython.__version__, \
platform.platform(), time.asctime(time.localtime(time.time())))


Last run in Python 2.7.2 (64bit), Numpy 1.8.0.dev-5c944b9 and IPython 1.0.0 
on Darwin-12.5.0-x86_64-i386-64bit, Sun Oct  6 21:35:37 2013.