Learning Snap

Train an LSTM model to detect snapping fingers in the Myo armband data.


In [ ]:
from __future__ import print_function
import os
import sys
import numpy as np
import random
import string
import tensorflow as tf

from sklearn.decomposition import PCA
from sklearn.decomposition import FastICA

import outputer

Load the data


In [ ]:
data_path = "../MyoSnap/testing"
data_files = [
    "nothing.csv"
]

for i in range(11):
    data_files.append("trial" + str(i) + ".csv")

data_sets = {}

snap_threshold = 0.001

for file_name in data_files:
    with open(os.path.join(data_path, file_name), 'r') as f:
        skip_count = 20 # Give signals time to stabalize
        start_time = None
        emg_data = []
        snap_times = []
        was_quiet = True
        snap_count = 0
        for line in f:
            if skip_count > 0:
                skip_count -= 1
            else:
                parts = line.strip().split(",")
                if parts[0] == "E":
                    emg_data.append((int(parts[1]), [int(v) for v in parts[2:]]))
                elif parts[0] == "A":
                    energy = float(parts[2])
                    if was_quiet and energy > snap_threshold:
                        snap_times.append(int(parts[1]))
                        snap_count += 1
                    was_quiet = energy < snap_threshold
        name = file_name[:-4]
        data_sets[name] = (emg_data, snap_times)
        print(name, "samples:", len(emg_data), "snaps:", snap_count)

In [ ]:
def npEMG(data_set):
    emg_data = data_set[0]
    data = np.zeros([len(emg_data), len(emg_data[0][1]) + 1], dtype=np.float)
    for i, entry in enumerate(emg_data):
        data[i][0] = entry[0]
        data[i][1:] = entry[1]
    return data

In [ ]:
emg2 = npEMG(data_sets["trial2"])

In [ ]:
emg2_norm = emg2[:,1:]/128

In [ ]:
print(np.min(emg2_norm, axis=0))
print(np.max(emg2_norm, axis=0))
print(np.average(emg2_norm, axis=0))

In [ ]:
pca = PCA(n_components=8)
pca.fit(emg2_norm)

print(pca.explained_variance_ratio_)
print(pca.components_)

In [ ]:
# Fit an ICA model to the data
ica = FastICA(random_state=42)
ica.fit(emg2_norm)

print(ica.components_)

In [ ]:
class BatchGenerator(object):
    def __init__(self, emg, snaps, snap_duration, size, unrolls):
        self._emg = emg
        self._snaps = snaps
        self._snap_duration = snap_duration
        self._channels = len(emg[0][1])
        self._batch_size = size
        self._unrolls = unrolls
        segment = len(self._emg) // size
        self._cursor = [ offset * segment for offset in range(size)]
        self._last_batch = self._next_batch()
  
    def _next_batch(self):
        """Generate a single batch from the current cursor position in the data."""
        batch = np.zeros(shape=(self._batch_size, self._channels), dtype=np.float)
        label = np.zeros(shape=(self._batch_size, 1), dtype=np.float)
        for b in range(self._batch_size):
            entry = self._emg[self._cursor[b]]
            batch[b, :] = entry[1]
            self._cursor[b] = (self._cursor[b] + 1) % len(self._emg)
            for snap in self._snaps:
                offset = entry[0] - snap
                if offset > 0 and offset < self._snap_duration:
                    label[b][0] = 1.0
        return (batch / 128.0, label)
  
    def next(self):
        """Generate the next array of batches from the data. The array consists of
        the last batch of the previous array, followed by unrolls new ones.
        """
        batches = [self._last_batch]
        for step in range(self._unrolls):
            batches.append(self._next_batch())
        self._last_batch = batches[-1]
        return batches

In [ ]:
batcher_test = BatchGenerator(data_sets["trial0"][0], data_sets["trial0"][1], 100000, 10, 5)
print(batcher_test.next())

In [ ]:
def setup_graph(node_count, channel_count, label_count, batch_size, unrolls):
    graph = tf.Graph()
    with graph.as_default():
        gate_count = 4
        # Parameters:
        # Gates: input, previous output, and bias.
        input_weights = tf.Variable(tf.truncated_normal([channel_count, node_count * gate_count], -0.1, 0.1))
        output_weights = tf.Variable(tf.truncated_normal([node_count, node_count * gate_count], -0.1, 0.1))
        bias = tf.Variable(tf.zeros([1, node_count * gate_count]))
        # Variables saving state across unrollings.
        saved_output = tf.Variable(tf.zeros([batch_size, node_count]), trainable=False)
        saved_state = tf.Variable(tf.zeros([batch_size, node_count]), trainable=False)
        # Classifier weights and biases.
        w = tf.Variable(tf.truncated_normal([node_count, label_count], -0.1, 0.1))
        b = tf.Variable(tf.zeros([label_count]))

        # Definition of the cell computation.
        def lstm_cell(i, o, state):
            """Create a LSTM cell. See e.g.: http://arxiv.org/pdf/1402.1128v1.pdf
            Note that in this formulation, we omit the various connections between the
            previous state and the gates."""
            values = tf.matmul(i, input_weights) + tf.matmul(o, output_weights) + bias
            values = tf.split(1, gate_count, values)
            input_gate = tf.sigmoid(values[0])
            forget_gate = tf.sigmoid(values[1])
            update = values[2]
            state = forget_gate * state + input_gate * tf.tanh(update)
            output_gate = tf.sigmoid(values[3])
            return output_gate * tf.tanh(state), state

        # Input data.
        train_inputs = list()
        train_labels = list()
        for _ in range(unrolls):
            train_inputs.append(tf.placeholder(tf.float32, shape=[batch_size, channel_count]))
            train_labels.append(tf.placeholder(tf.float32, shape=[batch_size, label_count]))

        # Unrolled LSTM loop.
        outputs = list()
        output = saved_output
        state = saved_state
        for i in train_inputs:
            output, state = lstm_cell(i, output, state)
            outputs.append(output)

        # State saving across unrollings.
        with tf.control_dependencies([saved_output.assign(output), saved_state.assign(state)]):
            # Classifier.
            logits = tf.nn.xw_plus_b(tf.concat(0, outputs), w, b)
            loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits, tf.concat(0, train_labels)))

        # Optimizer.
        global_step = tf.Variable(0)
        learning_rate = tf.train.exponential_decay(10.0, global_step, 5000, 0.1, staircase=True)
        optimizer = tf.train.GradientDescentOptimizer(learning_rate)
        gradients, v = zip(*optimizer.compute_gradients(loss))
        gradients, _ = tf.clip_by_global_norm(gradients, 1.25)
        optimizer = optimizer.apply_gradients(zip(gradients, v), global_step=global_step)

        # Predictions.
        train_prediction = tf.nn.softmax(logits)
    
    return {
        "graph": graph,
        "batch_size": batch_size,
        "unrolls": unrolls,
        "train_inputs": train_inputs,
        "train_labels": train_labels,
        "optimizer": optimizer,
        "loss": loss,
        "train_prediction": train_prediction,
        "learning_rate": learning_rate
    }

In [ ]:
def run_lstm(setup, training, validation, snap_duration, step_count, report_every):
    train_batches = BatchGenerator(
        training[0], training[1],
        snap_duration,
        setup["batch_size"], setup["unrolls"]
    )
    # valid_batches = batcher(validation, 1, 1)
    with tf.Session(graph=setup["graph"]) as session:
        tf.initialize_all_variables().run()
        print('Initialized')
        mean_loss = 0
        for step in range(step_count + 1):
            batches = train_batches.next()
            feed_dict = {}
            for i in range(setup["unrolls"]):
                feed_dict[setup["train_inputs"][i]] = batches[i][0]
                feed_dict[setup["train_labels"][i]] = batches[i][1]
                
            targets = [
                setup["optimizer"],
                setup["loss"],
                setup["train_prediction"],
                setup["learning_rate"]
            ]

            _, l, predictions, lr = session.run(targets, feed_dict=feed_dict)

            mean_loss += l
            if step % report_every == 0:
                if step > 0:
                    mean_loss = mean_loss / report_every
                # The mean loss is an estimate of the loss over the last few batches.
                print('Average loss at step %d: %f learning rate: %f' % (step, mean_loss, lr))
                mean_loss = 0
                labels = np.concatenate([b[1] for b in batches])
                print(predictions.shape)
                print(labels)
                
                # Measure validation set perplexity.
                #valid_logprob = 0
                #for _ in range(len(validation)):
                #    b = valid_batches.next()
                #    predictions = setup["sample_prediction"].eval({setup["sample_input"]: b[0]})
                #    valid_logprob = valid_logprob + logprob(predictions, b[1])
                #print('Validation set perplexity: %.2f' % float(np.exp(valid_logprob / valid_size)))

In [ ]:
setup = setup_graph(20, len(data_sets["trial0"][0][0][1]), 1, 128, 100)

In [ ]:
run_lstm(setup, data_sets["trial0"], data_sets["trial10"], 100000, 1000, 100)

In [ ]: