In [1]:
%load_ext autoreload
%autoreload 2
import prepare_EMG, prepare_outputs, prepare_data, pandas
EMG_Prep = prepare_EMG.EMG_preparer()
Output_Prep = prepare_outputs.output_preparer()
Data_Prep = prepare_data.data_preparer()
In [2]:
singles_1 = Data_Prep.load_singletons(1)
singles_2 = Data_Prep.load_singletons(2)
singles_3 = Data_Prep.load_singletons(3)
# print(singles_1.keys())
First, we generate the phoneme and articulatory feature labels from each word. We'll use these to process the data in each file based on the length of the file and how many phonemes it should contain. We perform CWT, the continuous wavelet transform on the EMG data to help differentiate MUAP activity. We scale the CWT windows relative to the length of time we expect an even distribution of phonemes across the file to have.
In [3]:
from scipy import signal
import numpy as np
labels = {}
windows = {}
for word in singles_1:
try:
label = Output_Prep.transform(word)
num_phonemes = label.shape[0]
label = label.append(Output_Prep.transform(word))
label = label.append(Output_Prep.transform(word))
labels[word] = label
widths = np.linspace(0.01,10,50)
wt_out = signal.cwt(singles_1[word]['voltage'], signal.ricker, widths)
wt_out = pandas.DataFrame(wt_out).T
windows[word] = EMG_Prep.process(wt_out, num_phonemes, wavelets=True)
wt_out_2 = signal.cwt(singles_2[word]['voltage'], signal.ricker, widths)
wt_out_2 = pandas.DataFrame(wt_out_2).T
windows[word] = windows[word].append(EMG_Prep.process(wt_out_2, num_phonemes, wavelets=True))
wt_out_3 = signal.cwt(singles_3[word]['voltage'], signal.ricker, widths)
wt_out_3 = pandas.DataFrame(wt_out_3).T
windows[word] = windows[word].append(EMG_Prep.process(wt_out_3, num_phonemes, wavelets=True))
except Exception as inst:
print(inst)
In [156]:
import pandas
%autoreload 2
y = pandas.DataFrame()
X = pandas.DataFrame()
for word in labels:
# append labels to the master label dataframe
label_frame = labels[word]
y = y.append(label_frame)
# Use phonemes to name each series in 'windows' for that word
window_frame = windows[word]
if len(label_frame.axes[0]):
window_frame = window_frame.rename_axis(lambda x: label_frame.axes[0][x])
X = X.append(window_frame)
else:
print('no labels for:',word)
print(y.head(),X.head())
In [5]:
# print(X.head(18), y.head(18))
from sklearn.preprocessing import scale,normalize
from sklearn.decomposition import PCA
X_scaled = scale(X)
pca = PCA(n_components=10, random_state=9)
X_reduced = pca.fit_transform(X_scaled)
X_normalized = normalize(X_reduced)
X_normalized = pandas.DataFrame(X_normalized)
X_normalized = X_normalized.rename_axis(lambda x: 'pc-'+str(x), axis='columns')
print(X_normalized)
We use wavelet tranformations to visualize EMG data for a few words, to see how they can vary from series to series and between one another. X-axis of these graphs is file time (seconds), Y is the wavelet width relative to delta-t (the time between data points). Purple and green denote positive and negative phases, while color intensity corresponds to wave amplitude. Some patterns apparently correspond to specific phonemes, unifying words across different series. Variation between samples of a given word is also apparent.
In [126]:
from scipy import signal
import numpy as np
import matplotlib.pyplot as plt
sig = singles_1['advice']['voltage']
length = len(sig)
dur = singles_1['advice']['time'][length-1]
widths = np.linspace(.01,10,50)
wt_out = signal.cwt(sig, signal.ricker, widths)
# print (wt_out, wt_out.shape)
plt.imshow(wt_out, extent=[0, dur, 10, .01],cmap='PRGn',aspect='auto',vmax=abs(wt_out).max(), vmin=-abs(wt_out).max())
plt.show()
wt_out_frame = pandas.DataFrame(wt_out).T
# print(wt_out_frame.head())
sig = singles_2['advice']['voltage']
length = len(sig)
dur = singles_2['advice']['time'][length-1]
widths = np.linspace(.01,10,50)
wt_out = signal.cwt(sig, signal.ricker, widths)
# print (wt_out, wt_out.shape)
plt.imshow(wt_out, extent=[0, dur, 10, .01],cmap='PRGn',aspect='auto',vmax=abs(wt_out).max(), vmin=-abs(wt_out).max())
plt.show()
wt_out_frame = pandas.DataFrame(wt_out).T
sig = singles_3['advice']['voltage']
length = len(sig)
dur = singles_3['advice']['time'][length-1]
widths = np.linspace(.01,10,50)
wt_out = signal.cwt(sig, signal.ricker, widths)
# print (wt_out, wt_out.shape)
plt.imshow(wt_out, extent=[0, dur, 10, .01],cmap='PRGn',aspect='auto',vmax=abs(wt_out).max(), vmin=-abs(wt_out).max())
plt.show()
wt_out_frame = pandas.DataFrame(wt_out).T
In [125]:
sig = singles_1['aspiring']['voltage']
length = len(sig)
dur = singles_1['aspiring']['time'][length-1]
widths = np.linspace(.01,10,50)
wt_out = signal.cwt(sig, signal.ricker, widths)
# print (wt_out, wt_out.shape)
plt.imshow(wt_out, extent=[0, dur, 10, .01],cmap='PRGn',aspect='auto',vmax=abs(wt_out).max(), vmin=-abs(wt_out).max())
plt.show()
wt_out_frame = pandas.DataFrame(wt_out).T
# print(wt_out_frame.head())
sig = singles_2['aspiring']['voltage']
length = len(sig)
dur = singles_2['aspiring']['time'][length-1]
widths = np.linspace(.01,10,50)
wt_out = signal.cwt(sig, signal.ricker, widths)
# print (wt_out, wt_out.shape)
plt.imshow(wt_out, extent=[0, dur, 10, .01],cmap='PRGn',aspect='auto',vmax=abs(wt_out).max(), vmin=-abs(wt_out).max())
plt.show()
wt_out_frame = pandas.DataFrame(wt_out).T
sig = singles_3['aspiring']['voltage']
length = len(sig)
dur = singles_3['aspiring']['time'][length-1]
widths = np.linspace(.01,10,50)
wt_out = signal.cwt(sig, signal.ricker, widths)
# print (wt_out, wt_out.shape)
plt.imshow(wt_out, extent=[0, dur, 10, .01],cmap='PRGn',aspect='auto',vmax=abs(wt_out).max(), vmin=-abs(wt_out).max())
plt.show()
wt_out_frame = pandas.DataFrame(wt_out).T
In [128]:
sig3 = singles_1['weather']['voltage']
length = len(sig)
dur = singles_1['weather']['time'][length-1]
widths = np.linspace(.01,10,50)
wt_out = signal.cwt(sig3, signal.ricker, widths)
# print (wt_out, wt_out.shape)
plt.imshow(wt_out, extent=[0, dur, 10, .01],cmap='PRGn',aspect='auto',vmax=abs(wt_out).max(), vmin=-abs(wt_out).max())
plt.show()
sig3 = singles_2['weather']['voltage']
length = len(sig)
dur = singles_2['weather']['time'][length-1]
widths = np.linspace(.01,10,50)
wt_out = signal.cwt(sig3, signal.ricker, widths)
# print (wt_out, wt_out.shape)
plt.imshow(wt_out, extent=[0, dur, 10, .01],cmap='PRGn',aspect='auto',vmax=abs(wt_out).max(), vmin=-abs(wt_out).max())
plt.show()
sig3 = singles_3['weather']['voltage']
length = len(sig)
dur = singles_3['weather']['time'][length-1]
widths = np.linspace(.01,10,50)
wt_out = signal.cwt(sig3, signal.ricker, widths)
# print (wt_out, wt_out.shape)
plt.imshow(wt_out, extent=[0, dur, 10, .01],cmap='PRGn',aspect='auto',vmax=abs(wt_out).max(), vmin=-abs(wt_out).max())
plt.show()
In [6]:
# Prepare lists of parameters for our GridSearch
# First, our layer sizes
layer_sizes = []
for i in range(2,5):
for j in range(0,180,30):
if j:
tup = []
for k in range(i):
tup.append(j)
layer_sizes.append(tuple(tup))
print('number layer sizes:',len(layer_sizes),'here be layer sizes',layer_sizes)
# Next, our alpha values
alphas = [0.0001,1,1000]
We setup the objects for performing gridsearch on each one of the Articulatory Feature Extractor models. We also train untuned, stock MLPC models to serve as a performance baseline. We will compare the performance of these baseline, untuned models to our gridsearched models to determine whether gridsearch has in fact improved the model parameters for each AF extractor.
In [7]:
from sklearn.neural_network import MLPClassifier as MLPC
# Import other models to try for feature extraction
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.decomposition import PCA
from sklearn.feature_selection import SelectKBest
import copy
X_train, X_test, y_train, y_test = train_test_split(X_normalized, y, test_size=0.15, random_state=12)
combined_features = FeatureUnion([
('pca',PCA(random_state=18)),
('kbest',SelectKBest(k=1))
])
pipeline = Pipeline([
# ('features', combined_features),
('model', MLPC(random_state=12))
])
param_grid = {
# 'features__pca__n_components':[10,20,50],
'model__solver':['adam'],
'model__hidden_layer_sizes':layer_sizes,
'model__activation':['relu'],
'model__alpha': alphas,
'model__max_iter':[200]
}
grid_search = GridSearchCV(pipeline, param_grid, n_jobs=-1)
manner_classifier = MLPC(solver='adam',random_state=3)
manner_classifier.fit(X_train, y_train['manner'])
m_score = manner_classifier.score(X_test, y_test['manner'])
place_classifier = MLPC(solver='adam',random_state=6)
place_classifier.fit(X_train, y_train['place'])
p_score = place_classifier.score(X_test, y_test['place'])
height_classifier = MLPC(solver='adam',random_state=9)
height_classifier.fit(X_train, y_train['height'])
h_score = height_classifier.score(X_test, y_test['height'])
vowel_classifier = MLPC(solver='adam',random_state=12)
vowel_classifier.fit(X_train, y_train['vowel'])
v_score = vowel_classifier.score(X_test, y_test['vowel'])
print('manner score:',m_score,'place score:',p_score,'height score:',h_score,'vowel score:',v_score)
# print(data_1_proc.head(50), trans_labels['manner'].head(50))
In [8]:
manner_classifier2 = copy.deepcopy(grid_search)
manner_classifier2.fit(X_train, y_train['manner'])
m_score2 = manner_classifier2.score(X_test, y_test['manner'])
print('manner score:',m_score2)
In [9]:
place_classifier2 = copy.deepcopy(grid_search)
place_classifier2.fit(X_train, y_train['place'])
p_score2 = place_classifier2.score(X_test, y_test['place'])
print('place score:',p_score2)
In [10]:
height_classifier2 = copy.deepcopy(grid_search)
height_classifier2.fit(X_train, y_train['height'])
h_score2 = height_classifier2.score(X_test, y_test['height'])
print('height score:',h_score2)
In [11]:
vowel_classifier2 = copy.deepcopy(grid_search)
vowel_classifier2.fit(X_train, y_train['vowel'])
v_score2 = vowel_classifier2.score(X_test, y_test['vowel'])
print('vowel score:',v_score2)
Training the solution model requires predicted AF's, using the optimized AFE models gridsearched in the previous step. Here is where the predictions are made and transformed into input features for the solution model before CV train-test splitting is performed.
In [16]:
from sklearn.preprocessing import LabelEncoder as LE
from sklearn.feature_extraction import DictVectorizer as DV
from sklearn.preprocessing import MultiLabelBinarizer as MLB
from sklearn.preprocessing import OneHotEncoder as OHE
from collections import Counter
manner_inputs = manner_classifier2.predict(X_normalized)
place_inputs = place_classifier2.predict(X_normalized)
height_inputs = height_classifier2.predict(X_normalized)
vowel_inputs = vowel_classifier2.predict(X_normalized)
# We need to account for each value that each category of label can take on
m_count = Counter()
p_count = Counter()
h_count = Counter()
v_count = Counter()
for row in range(y.shape[0]):
m_count.update([y.iloc[row]['manner']])
p_count.update([y.iloc[row]['place']])
h_count.update([y.iloc[row]['height']])
v_count.update([y.iloc[row]['vowel']])
counters = [m_count,p_count,h_count,v_count]
feature_dict = {}
for count in counters:
current = 0
for feature in count.keys():
feature_dict[feature] = current
current += 1
# Then, we transform the predicted labels with one-hot encoding after
# concatenating the AF outputs and Solution Model Inputs
raw_inputs = copy.deepcopy(y)
for row in range(len(raw_inputs)):
raw_inputs.iloc[row]['manner'] = manner_inputs[row]
raw_inputs.iloc[row]['place'] = place_inputs[row]
raw_inputs.iloc[row]['height'] = height_inputs[row]
raw_inputs.iloc[row]['vowel'] = vowel_inputs[row]
num_labels = copy.deepcopy(raw_inputs)
for row in range(raw_inputs.shape[0]):
m_feat = raw_inputs.iloc[row]['manner']
p_feat = raw_inputs.iloc[row]['place']
h_feat = raw_inputs.iloc[row]['height']
v_feat = raw_inputs.iloc[row]['vowel']
num_labels.iloc[row]['manner'] = feature_dict[m_feat]
num_labels.iloc[row]['place'] = feature_dict[p_feat]
num_labels.iloc[row]['height'] = feature_dict[h_feat]
num_labels.iloc[row]['vowel'] = feature_dict[v_feat]
encoder = OHE()
new_labels = encoder.fit_transform(num_labels)
enc_labels = pandas.DataFrame(new_labels.toarray())
# Finally, we build our new input DataFrame with predicted AF's and processed EMG
X_cols = list(X_normalized.axes[1]) + list(enc_labels.axes[1])
phoneme_inputs = pandas.DataFrame(columns=X_cols)
phoneme_labels = y.axes[0]
for row in range(X.shape[0]):
new_row = X_normalized.iloc[row].append(enc_labels.iloc[row])
new_row.name = X_normalized.iloc[row].name
phoneme_inputs = phoneme_inputs.append(new_row)
# We're ready to split our solution model data for CV
pho_X_train, pho_X_test, pho_y_train, pho_y_test = train_test_split(phoneme_inputs, phoneme_labels, test_size=0.15, random_state=12)
In [20]:
pho2_X_train, pho2_X_test, pho2_y_train, pho2_y_test = train_test_split(X,phoneme_labels, test_size=0.15, random_state=12)
benchmark_gs = GridSearchCV(pipeline, param_grid, n_jobs=-1)
benchmark_gs.fit(pho2_X_train, pho2_y_train)
pho2_score = benchmark_gs.score(pho2_X_test,pho2_y_test)
print(pho2_score)
In [17]:
pho_layer_sizes = []
for i in range(2,10):
for j in range(60,120,30):
if j:
tup = []
for k in range(i):
tup.append(j)
pho_layer_sizes.append(tuple(tup))
print('number layer sizes:',len(pho_layer_sizes),'here be layer sizes',pho_layer_sizes)
# Next, our alpha values
pho_alphas = [0.001,0.1,1,1000]
param_grid = {
# 'features__pca__n_components':[10,20,50],
'model__solver':['adam'],
'model__hidden_layer_sizes':pho_layer_sizes,
'model__activation':['relu'],
'model__alpha': pho_alphas,
'model__max_iter':[300]
}
pho_model_grid_search = GridSearchCV(pipeline, param_grid, n_jobs=-1)
# The Solution Model
phoneme_classifier = pho_model_grid_search
phoneme_classifier.fit(pho_X_train, pho_y_train)
pho_train_f1 = phoneme_classifier.score(pho_X_train, pho_y_train)
print('phoneme classifier training score:',pho_train_f1)
In [19]:
pho_test_score = phoneme_classifier.score(pho_X_test, pho_y_test)
print('phoneme model test score:',pho_test_score)
In [49]:
phonemes = Counter(phoneme_labels)
N = len(phonemes)
total = sum(phonemes.values())
for key in phonemes:
phonemes[key] = phonemes[key] / total
print(key, "represents", str(phonemes[key]*100)+"%","of all samples")
ind = np.arange(N) # the x locations for the groups
width = .66 # the width of the bars
fig, ax = plt.subplots()
rects1 = ax.bar(ind, phonemes.values(), width, color='xkcd:purple')
# add some text for labels, title and axes ticks
ax.set_ylabel('Phonemes')
ax.set_title('Phoneme instances by type')
ax.set_xticks(ind)
ax.set_xticklabels(phonemes.keys(),size='xx-small')
ax.legend('Phonemes')
def autolabel(rects):
"""
Attach a text label above each bar displaying its height
"""
for rect in rects:
height = rect.get_height()
ax.text(rect.get_x() + rect.get_width()/2., 1.05*height,
'%f' % int(height),
ha='center', va='bottom')
autolabel(rects1)
plt.show()
In [87]:
num_phonemes = []
num_letters = []
for word in labels:
label_length = len(labels[word].axes[0])
phonemes = labels[word].axes[0].values[0:label_length/3]
num_letters.append(len(word))
num_phonemes.append(len(phonemes))
# print(word, ",", labels[word].axes[0].values[0:label_length/3])
print('average word length:', np.mean(num_letters), '+/-',np.std(num_letters))
print('average num phonemes:', np.mean(num_phonemes), '+/-', np.std(num_phonemes))
In [96]:
import random
singles = [singles_1, singles_2, singles_3]
duration = 0
durations = []
dataframe_size = 0
dataframe_sizes = []
voltages = []
for single in singles:
for word in single:
length = len(single[word])
dur = single[word]['time'][length-1]
size = np.sum(single[word].memory_usage())
duration += dur
durations.append(dur)
dataframe_size += size
dataframe_sizes.append(size)
avg_v = np.mean(single[word]['voltage'])
voltages.append(avg_v)
print('total duration:', duration, "seconds")
print('standard deviation of duration:', np.std(durations))
print('total dataframe mem use:', dataframe_size)
print('standard deviation of dataframe mem usage:', np.std(dataframe_sizes))
print('average v:',np.mean(voltages), '+/-', np.std(voltages))
r_volts = []
r_durs = []
for i in range(6):
r_key = random.choice(list(singles_2))
length = len(singles_2[r_key])
dur = singles_2[r_key]['time'][length-1]
avg_v = np.mean(singles_2[r_key]['voltage'])
std_v = np.std(singles_2[r_key]['voltage'])
r_durs.append(dur)
r_volts.append(avg_v)
print(r_key, ',',avg_v,',',std_v)
In [117]:
from sklearn.metrics import f1_score
import math
labels_list = list(set(pho_y_test.values))
labels_list.sort()
bm_score = benchmark_gs.score(pho2_X_test,pho2_y_test)
bm_f1 = f1_score(pho2_y_test, benchmark_gs.predict(pho2_X_test),average=None, labels=labels_list)
sol_score = phoneme_classifier.score(pho_X_test, pho_y_test)
sol_f1 = f1_score(pho_y_test, phoneme_classifier.predict(pho_X_test),average=None, labels=labels_list)
print(bm_score, sol_score, len(pho2_y_test), np.std(bm_f1), np.std(sol_f1), labels)
print('benchmark f1:',bm_f1, 'std error:', np.std(bm_f1)/math.sqrt(len(pho2_y_test)))
print('solution f1:',sol_f1, 'std error:', np.std(sol_f1)/math.sqrt(len(pho_y_test)))
for label in range(len(labels_list)):
print(labels_list[label],',',bm_f1[label],',',sol_f1[label])
In [131]:
phoneme_classifier.best_estimator_
Out[131]:
Tweaking optimal parameters slightly, we fit and test a slightly different version of the solution model to see how small changes in parameters affect performance. After that, we see how negative forcing, the uniform diminution of original values, affects the solution model's performance. Last is white noise testing, where Gaussian noise is added to each row.
In [139]:
# Parameter Sensitivity test
sensitivity_1 = MLPC(hidden_layer_sizes=(60,60,60,60,60,60,60),alpha=.0001)
sensitivity_1.fit(pho_X_train, pho_y_train)
sens_1 = sensitivity_1.score(pho_X_test, pho_y_test)
print(sens_1)
In [155]:
# Row-wise forcing sensitivity test
pho_X_test_rf = copy.deepcopy(pho_X_test)
pho_X_test_rf['pc-0'] = pho_X_test_rf['pc-0'] * random.random()
rf_test = phoneme_classifier.score(pho_X_test_rf, pho_y_test)
# Column-wise forcing sensitivity test
pho_X_test_cf = copy.deepcopy(pho_X_test)
pho_X_test_cf[1:100] = pho_X_test_rf[1:100] * random.random()
cf_test = phoneme_classifier.score(pho_X_test_cf, pho_y_test)
# Random white noise sensitivity test
pho_X_test_wn = copy.deepcopy(pho_X_test)
pho_X_test_wn.iloc[:,0:10] = pho_X_test_wn.iloc[:,0:10]+np.random.normal(0, 0.15, 10)
wn_test = phoneme_classifier.score(pho_X_test_wn, pho_y_test)
print('row-wise forcing score:', rf_test)
print('column-wise forcing score:', cf_test)
print('white noise addition:', wn_test)