In [32]:
import pandas as pd
import numpy as np
import scipy.stats as stats
import scipy.signal as signal
import matplotlib.pyplot as plt
from sklearn.cross_validation import train_test_split
from tpot import TPOTClassifier
EPOCH_LENGTH = 440
VARIANCE_THRESHOLD = 600
In [33]:
# Data has been collected, let's import it
open_data = pd.read_csv("../Muse Data/DanoThursdayOpenRawEEG0.csv", header=0, index_col=False)
closed_data = pd.read_csv("../Muse Data/DanoThursdayClosedRawEEG1.csv", header=0, index_col=False)
In [34]:
# Unfortunately, haven't come up with a good way to feed multi-dimensional data (i.e. including all 4 channels) into sklearn yet.
# To get around this, we'll drop everything except Channel 1's EEG data so everything works
open_array = open_data['Channel 1']
closed_array = closed_data['Channel 1']
In [35]:
# Prune a few rows from the tail of these arrays so that they are all divisible by our desired epoch length
open_overflow = open_array.size % EPOCH_LENGTH
open_array = open_array[0:-open_overflow]
closed_overflow = closed_array.size % EPOCH_LENGTH
closed_array = closed_array[0:-closed_overflow]
In [37]:
'''
Split DataFrames into many different dataframes 440 samples long
np.array_split breaks apart a single array into arrays with a certain length
in this case, it splits every 440 rows into different arrays
np.stack puts multiple arrays on top of each other along an axis
here it stacks all the 440-length arrays we created on top of each other as different rows in a matrix
'''
split_open_data = np.stack(np.array_split(open_array, EPOCH_LENGTH), axis=1)
split_closed_data = np.stack(np.array_split(closed_array, EPOCH_LENGTH), axis=1)
# Transform data into a 3D pandas Panel ( n epochs x 4 channels x 440 samples )
open_df = pd.DataFrame(split_open_data)
closed_df = pd.DataFrame(split_closed_data)
open_df.shape[0]
Out[37]:
In [31]:
# Remove epochs with too much variance
def removeNoise(df):
for index, row in df.iterrows():
if np.var(row) > VARIANCE_THRESHOLD:
print('variance ', np.var(row))
df.drop(row)
return df
open_df = removeNoise(open_df)
closed_df = removeNoise(closed_df)
closed_df.shape
Out[31]:
In [39]:
'''
Create a combined dataframe with both the open and closed eye data stacked on top of each other (epochs x EPOCH_LENGTH)
The first closed_df.shape[0] rows will be 1s, indicating eyes closed, and the rest will be 0s
'''
combined_df = pd.concat([closed_df, open_df], axis=0, ignore_index=True)
labels = np.append(np.ones(closed_df.shape[0]),np.zeros(open_df.shape[0]))
# Create a sklearn train test split with this big combined df
X_train, X_test, y_train, y_test = train_test_split(combined_df, labels,
train_size=0.75,
test_size=0.25)
In [8]:
# Create a TPOTClassifier that will run for 10 generations
my_tpot = TPOTClassifier(generations=10)
# Fit this baby! Takes a long time to run
my_tpot.fit(X_train, y_train)
# See what kind of score we get
print(my_tpot.score(X_test, y_test))
In [9]:
# Holy crap! That's really good (or perhaps broken). Let's export the pipeline and see what TPOT came up with
my_tpot.export('exported_pipeline.py')
Here's what it came up with,
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.svm import LinearSVC
# NOTE: Make sure that the class is labeled 'class' in the data file
tpot_data = np.recfromcsv('PATH/TO/DATA/FILE', delimiter='COLUMN_SEPARATOR', dtype=np.float64)
features = np.delete(tpot_data.view(np.float64).reshape(tpot_data.size, -1), tpot_data.dtype.names.index('class'), axis=1)
training_features, testing_features, training_classes, testing_classes = \
train_test_split(features, tpot_data['class'], random_state=42)
exported_pipeline = LinearSVC(C=25.0, dual=False, penalty="l1", tol=0.1)
exported_pipeline.fit(training_features, training_classes)
results = exported_pipeline.predict(testing_features)
In [80]:
# Let's import some new data and test this classifier out
new_open_data = pd.read_csv("../Muse Data/DanoEyesOpenRawEEG0.csv", header=0, index_col=False)
new_closed_data = pd.read_csv("../Muse Data/DanoEyesClosedRawEEG1.csv", header=0, index_col=False)
In [89]:
# Get channel 1 data
open_array = new_open_data['Channel 1']
closed_array = new_closed_data['Channel 1']
# Prune a few rows from the tail of these arrays so that they are all divisible by our desired epoch length
open_overflow = open_array.size % EPOCH_LENGTH
open_array = open_array[0:-open_overflow]
closed_overflow = closed_array.size % EPOCH_LENGTH
closed_array = closed_array[0:-closed_overflow]
# Split into multiple arrays of EPOCH_LENGTH
split_open_data = np.stack(np.array_split(open_array, EPOCH_LENGTH), axis=1)
split_closed_data = np.stack(np.array_split(closed_array, EPOCH_LENGTH), axis=1)
# Transform data into a 3D pandas Panel ( n epochs x 4 channels x 440 samples )
open_df = pd.DataFrame(split_open_data)
closed_df = pd.DataFrame(split_closed_data)
# Remove noise
open_df = removeNoise(open_df)
closed_df = removeNoise(closed_df)
In [90]:
new_combined_df = pd.concat([closed_df, open_df], axis=0, ignore_index=True)
new_labels = np.append(np.ones(closed_df.shape[0]),np.zeros(open_df.shape[0]))
# Create a sklearn train test split with this big combined df
X_train, X_test, y_train, y_test = train_test_split(new_combined_df, new_labels,
train_size=0.75,
test_size=0.25)
In [91]:
# Since we're doing this by hand in the notebook, I'll just use the meaty parts of the exported pipeline
from sklearn.svm import LinearSVC
training_features, testing_features, training_classes, testing_classes = \
train_test_split(new_combined_df, new_labels, random_state=42)
exported_pipeline = LinearSVC(C=25.0, dual=False, penalty="l1", tol=0.1)
exported_pipeline.fit(training_features, training_classes)
results = exported_pipeline.predict(testing_features)
In [95]:
# Let's check the accuracy of this guy
exported_pipeline.score(testing_features, testing_classes)
Out[95]:
Welp, there it is again