In [1]:
"""
*** README (assumes familiarity with Jupyter notebook) ***
1. To begin, create a dataset by calling prep(window), this pre-processes the collected accelerometer+gyroscope data
in /data in the following way:
i. For all key position data, call combine_setState_createFeatures()
2. To run tests, it is useful to create test data. This is done with prep_test(file_to_test) - this function omits
setting the state, but the name tells you what the expected outcome should be (see also state value mappings)
3. Next, we add the HMM for smoothing. Using domain specific knowledge, we setup an emission and transition
probability matrix, which captures the liklihood of all movement between the total number of states.
Y. Optional, use the state_reconciler on data involving stand ups to readjust the predicted states. Particularly
aimed at fixing OCG / YMOUNT errors
def main(training_window=40):
prepped_data = prep(training_window)
ALGORITHM STEPS:
1) The first (standing up detection) RF classifier prediction output sets the 'avg_stand'
feature in the dataframe passed to (2)
2) The main (position state) RF classifier runs and outputs predicted states
3) The "reconcile function" looks through the predicted states and for OCG and YMOUNT looks for any stand up
motions detected in the vicinity. It corrects accordingly
4) The HMM smooths the results to create clean sequences
** In future, step 1 could be replicated with other isolated movements (e.g. bridge, shrimp, sit up)
TODO: is it possible to attempt to detect stand ups using a different time window?
"""
Out[1]:
In [2]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math
from hmmlearn import hmm
from sklearn import cross_validation
from sklearn.cross_validation import KFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.grid_search import GridSearchCV, RandomizedSearchCV
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.preprocessing import PolynomialFeatures
from rolltec_features import create_features
from manage_state import set_state, set_stand_state, state_reconciler
from algorithm_tests import trial, trial_standup, test_model, test_model_stand
from utilities import (convert_to_words, print_full, get_position_stats, combine_csv, resolve_acc_gyro,
blank_filter, concat_data)
TIME_SEQUENCE_LENGTH = 50
polynomial_features = PolynomialFeatures(interaction_only=False, include_bias=True, degree=1)
In [4]:
def update_df(df, index, new_values, reach=8):
#print new_values # This is the value at the index (i.e. the row when the stand_up event was 1)
# need to take this list and lay it over the values in the df at that index
# TODO: catch indexing error
for x in range(0,reach):
amount = reach - x
i = index - (amount*20)
df.loc[i, 'state'] = new_values[x]
for y in range(0,reach):
amount = reach - y
i = index + (amount*20)
df.loc[i, 'state'] = new_values[y+reach]
return df
In [5]:
def combine_setState_createFeatures(directory, state, window=40, stand=0):
"""
convenience method to combine three steps in one function:
(1) combine multiple csv files, (2) set their movement state for training,
(3) detect any instances of standing up, (4) add features
"""
combined_data = combine_csv(directory)
combined_data_updated = set_state(combined_data, state)
combined_data_updated2 = set_stand_state(combined_data_updated, stand)
feature_training_data = create_features(combined_data_updated2, window)
ready_training_data = set_state(feature_training_data, state)
return ready_training_data
In [6]:
def prep(window=40):
"""prepare the raw sensor data
the argument window determines the size of the sliding selection window
for the time series. Given that data has been collected at a frequency of
25Hz, a sliding window of 40 will give you combined data windows
of 1.6 seconds.
"""
#1 Your mount
ymount_td = combine_setState_createFeatures('your_mount_raw_data', 'your_mount', window, 0)
#2 Your side control
ysc_td = combine_setState_createFeatures('your_side_control_raw_data', 'your_side_control', window, 0)
#3 Your closed guard
ycg_td = combine_setState_createFeatures('your_closed_guard_raw_data', 'your_closed_guard', window, 0)
#4 Your back control
ybc_td = combine_setState_createFeatures('your_back_control_raw_data', 'your_back_control', window, 0)
#5 Opponent mount or opponent side control
omountsc_td = combine_setState_createFeatures('opponent_mount_and_opponent_side_control_raw_data', 'opponent_mount_or_sc', window, 0)
#6 Opponent closed guard
ocg_td = combine_setState_createFeatures('opponent_closed_guard_raw_data', 'opponent_closed_guard', window, 0)
#7 Opponent back control
obc_td = combine_setState_createFeatures('opponent_back_control_raw_data', 'opponent_back_control', window, 0)
#8 "Non jiu-jitsu" motion
nonjj_td = combine_setState_createFeatures('non_jj_raw_data', 'non_jj', window, 0)
#9 "stand up" motion
stand_up_td = combine_setState_createFeatures('standing_up_raw_data', 'opponent_closed_guard', window, 1)
training_data = concat_data([ymount_td, ysc_td, ycg_td, ybc_td, omountsc_td, ocg_td, obc_td, nonjj_td, stand_up_td])
# remove NaN
training_data = blank_filter(training_data)
return training_data
In [7]:
def prep_test(test_file):
el_file = 'data/test_cases/' + test_file
df = pd.DataFrame()
df = pd.read_csv(el_file, index_col=None, header=0)
df = resolve_acc_gyro(df)
df = create_features(df, _window=40, test=True)
test_data = blank_filter(df)
return test_data
In [8]:
training_data40 = prep(40)
In [9]:
test_data1 = prep_test('test1_ymount_ycg.csv')
test_data4 = prep_test('GL_TEST1_CS.csv')
test_data5 = prep_test('GL_TEST2_CS.csv')
test_data6 = prep_test('GL_TEST3_CS_very_still.csv')
test_data7 = prep_test('GL_TEST1_UrsWearing.csv')
test_data8 = prep_test('DIO_YCG_YMOUNT_YSC.csv')
test_data9 = prep_test('DIO_OCG_YCG_YMOUNT_YBC.csv')
In [12]:
"""
This is for testing the effectiveness of the
standing motion detection. The datasets below with STAND in the
name refer to sequences involving a stand up.
This is effectively a tangent to the main algorithm, but one which could
be very useful and widely applicable.
"""
test_data100 = prep_test('CS_OCG_STAND_OCG.csv')
#test_data101 = prep_test('URS_OCG_STAND_OCG1.csv')
# TODO: DEBUG If I move the test data creation out of this cell it causes an error
foo = trial_standup(training_data40, test_data100)
# HAVE TO BE CAREFUL WITH CREATING COPIES OF DF vs. actually changing the original - is creating bugs
step2 = trial(training_data40, foo)
print step2[['state', 'avg_stand']]
# step 3
fixed = state_reconciler(step2)
test_data1000 = prep_test('URS_OCG_STAND_OCG3.csv')
baz = trial_standup(training_data40, test_data1000)
step2again = trial(training_data40, baz)
pre_pre_smooth = step2again['state'].values
pre_pre_smooth_words = convert_to_words(pre_pre_smooth)
pre_smooth = fixed['state'].values
pre_smooth_words = convert_to_words(pre_smooth)
print pre_pre_smooth_words
print pre_smooth_words
In [14]:
n_components = 8 # ('ybc', 'ymount', 'ysc', 'ycg', 'ocg', 'osc_mount', 'obc', 'other')
startprob = np.array([0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.65,]) # assume users will turn on sensor whist standing
In [15]:
"""
probability of these positions given current state:
your_mount' if v == 0
else 'your_side_control' if v == 1
else 'your_closed_guard' if v == 2
else 'your_back_control' if v == 3
else 'opponent_mount_or_sc' if v == 4
else 'opponent_closed_guard' if v == 5
else 'opponent_back_control' if v == 6
else 'OTHER' if v == 7
transition_probability = {
'ymt' : {'ymount': 0.800, 'ysc': 0.050, 'ycg': 0.010, 'ybc': 0.050, 'osc_mount': 0.001, 'ocg': 0.050, 'obc': 0.001, 'other': 0.038},
'ysc' : {'ymount': 0.100, 'ysc': 0.800, 'ycg': 0.010, 'ybc': 0.010, 'osc_mount': 0.001, 'ocg': 0.050, 'obc': 0.001, 'other': 0.028},
'ycg' : {'ymount': 0.010, 'ysc': 0.050, 'ycg': 0.800, 'ybc': 0.010, 'osc_mount': 0.050, 'ocg': 0.001, 'obc': 0.001, 'other': 0.078},
'ybc' : {'ymount': 0.050, 'ysc': 0.010, 'ycg': 0.050, 'ybc': 0.800, 'osc_mount': 0.001, 'ocg': 0.010, 'obc': 0.001, 'other': 0.078},
'omt' : {'ymount': 0.001, 'ysc': 0.050, 'ycg': 0.010, 'ybc': 0.001, 'osc_mount': 0.800, 'ocg': 0.050, 'obc': 0.050, 'other': 0.038},
'ocg' : {'ymount': 0.100, 'ysc': 0.050, 'ycg': 0.010, 'ybc': 0.010, 'osc_mount': 0.001, 'ocg': 0.800, 'obc': 0.001, 'other': 0.028},
'obc' : {'ymount': 0.010, 'ysc': 0.050, 'ycg': 0.001, 'ybc': 0.010, 'osc_mount': 0.050, 'ocg': 0.001, 'obc': 0.800, 'other': 0.078},
'oth' : {'ymount': 0.050, 'ysc': 0.010, 'ycg': 0.050, 'ybc': 0.078, 'osc_mount': 0.001, 'ocg': 0.010, 'obc': 0.001, 'other': 0.800}
}
"""
transmat = np.array([
[0.800, 0.050, 0.010, 0.050, 0.001, 0.050, 0.001, 0.038],
[0.100, 0.800, 0.010, 0.010, 0.001, 0.050, 0.001, 0.028],
[0.010, 0.050, 0.800, 0.010, 0.050, 0.001, 0.001, 0.078],
[0.050, 0.010, 0.050, 0.800, 0.001, 0.010, 0.001, 0.078],
[0.001, 0.050, 0.010, 0.001, 0.800, 0.050, 0.050, 0.038],
[0.100, 0.050, 0.010, 0.010, 0.001, 0.800, 0.001, 0.028],
[0.010, 0.050, 0.001, 0.010, 0.050, 0.001, 0.800, 0.078],
[0.050, 0.010, 0.050, 0.078, 0.001, 0.010, 0.001, 0.800],
])
In [40]:
"""
probability of these positions given current state:
your_mount' if v == 0
else 'your_side_control' if v == 1
else 'your_closed_guard' if v == 2
else 'your_back_control' if v == 3
else 'opponent_mount_or_sc' if v == 4
else 'opponent_closed_guard' if v == 5
else 'opponent_back_control' if v == 6
else 'OTHER' if v == 7
emission_probability = {
'ymt' : {'ymount': 0.500, 'ysc': 0.050, 'ycg': 0.010, 'ybc': 0.050, 'osc_mount': 0.001, 'ocg': 0.350, 'obc': 0.001, 'other': 0.038},
'ysc' : {'ymount': 0.100, 'ysc': 0.800, 'ycg': 0.010, 'ybc': 0.010, 'osc_mount': 0.001, 'ocg': 0.050, 'obc': 0.001, 'other': 0.028},
'ycg' : {'ymount': 0.010, 'ysc': 0.050, 'ycg': 0.400, 'ybc': 0.010, 'osc_mount': 0.500, 'ocg': 0.001, 'obc': 0.001, 'other': 0.078},
'ybc' : {'ymount': 0.050, 'ysc': 0.010, 'ycg': 0.050, 'ybc': 0.600, 'osc_mount': 0.001, 'ocg': 0.010, 'obc': 0.201, 'other': 0.078},
'omt' : {'ymount': 0.001, 'ysc': 0.050, 'ycg': 0.210, 'ybc': 0.050, 'osc_mount': 0.600, 'ocg': 0.050, 'obc': 0.001, 'other': 0.038},
'ocg' : {'ymount': 0.400, 'ysc': 0.050, 'ycg': 0.010, 'ybc': 0.010, 'osc_mount': 0.001, 'ocg': 0.400, 'obc': 0.001, 'other': 0.028},
'obc' : {'ymount': 0.010, 'ysc': 0.050, 'ycg': 0.001, 'ybc': 0.110, 'osc_mount': 0.050, 'ocg': 0.001, 'obc': 0.700, 'other': 0.078},
'oth' : {'ymount': 0.050, 'ysc': 0.010, 'ycg': 0.050, 'ybc': 0.078, 'osc_mount': 0.001, 'ocg': 0.010, 'obc': 0.001, 'other': 0.800}
}
"""
emissionprob = np.array([
[0.500, 0.050, 0.010, 0.050, 0.001, 0.350, 0.001, 0.038],
[0.100, 0.800, 0.010, 0.010, 0.001, 0.050, 0.001, 0.028],
[0.010, 0.050, 0.350, 0.010, 0.500, 0.001, 0.001, 0.078],
[0.050, 0.010, 0.050, 0.700, 0.001, 0.010, 0.101, 0.078],
[0.001, 0.050, 0.210, 0.050, 0.600, 0.050, 0.001, 0.038],
[0.400, 0.050, 0.010, 0.010, 0.001, 0.400, 0.001, 0.028],
[0.010, 0.050, 0.001, 0.110, 0.050, 0.001, 0.700, 0.078],
[0.050, 0.010, 0.050, 0.078, 0.001, 0.010, 0.001, 0.800],
])
In [41]:
# Hidden Markov Model with multinomial (discrete) emissions
model = hmm.MultinomialHMM(n_components=n_components,
n_iter=10,
verbose=False)
model.startprob_ = startprob
model.transmat_ = transmat
model.emissionprob_ = emissionprob
# model.n_features = 8
In [42]:
observations = np.array(pre_smooth)
n_samples = len(observations)
data = observations.reshape((n_samples, -1))
print data
In [43]:
print 'TEST 1'
result = model.decode(data, algorithm='viterbi')
print 'pre smooth: {}'.format(pre_pre_smooth)
print 'result accuracy {}%'.format(result[0])
print 'final result: {}'.format(result[1])
result_words = convert_to_words(result[1])
print '====================='
print 'pre pre smooth words: {}'.format(pre_pre_smooth_words)
print '====================='
print 'result words: {}'.format(result_words)
print '\n'
print "pre pre smooth stats (before stand up detection)"
print get_position_stats(pre_pre_smooth_words)
print '\n'
print "pre smooth stats (before HMM)"
print get_position_stats(pre_smooth_words)
print '\n'
print 'result stats'
print get_position_stats(result_words)
print '******************'
In [ ]: