In [2]:
import torch
import torch.nn as nn
import torchvision.models as models
from utils import Dataset
import torch.nn.functional as F

In [105]:
# how to test if this kwinners implementation isd oing the right thing?
# I can test it directly in a model
# or try to implement the same class in a more simple setting
# let's do the simple setting

from sklearn import datasets iris = datasets.load_iris() x = torch.tensor(iris.data, dtype=torch.float) y = torch.tensor(iris.target, dtype=torch.long) x.shape, y.shape


In [8]:
dataset = Dataset(config=dict(dataset_name='MNIST', data_dir='~/nta/results'))

In [107]:
# build up a small neural network
inputs = []

def init_weights():
    W1 = torch.randn((4,10), requires_grad=True)
    b1 = torch.zeros(10, requires_grad=True)
    W2 = torch.randn((10,3), requires_grad=True)
    b2 = torch.zeros(3, requires_grad=True)
    return [W1, b1, W2, b2]

# torch cross_entropy is log softmax activation + negative log likelihood
loss_func = F.cross_entropy

# simple feedforward model
def model(input):
    W1, b1, W2, b2 = parameters
    x = input @ W1 + b1
    x = F.relu(x)
    x = x @ W2 + b2
    return x
  
# calculate accuracy
def accuracy(out, y):
    preds = torch.argmax(out, dim=1)
    return (preds == y).float().mean().item()

In [108]:
from sklearn.model_selection import StratifiedKFold
cv = StratifiedKFold(n_splits=3)

In [111]:
# train
lr = 0.01
epochs = 1000
for train, test in cv.split(x, y):
    x_train, y_train = x[train], y[train] 
    x_test, y_test = x[test], y[test] 
    parameters = init_weights()
    print("Accuracy before training: {:.4f}".format(accuracy(model(x), y)))
    for epoch in range(epochs):
        loss = loss_func(model(x_train), y_train)
        if epoch % (epochs/5) == 0:
          print("Loss: {:.8f}".format(loss.item()))
        # backpropagate
        loss.backward()
        with torch.no_grad():
          for param in parameters:
              # update weights
              param -= lr * param.grad
              # zero gradients
              param.grad.zero_()

    print("Training Accuracy after training: {:.4f}".format(accuracy(model(x_train), y_train)))
    print("Test Accuracy after training: {:.4f}".format(accuracy(model(x_test), y_test)))
    print("---------------------------")


Accuracy before training: 0.2733
Loss: 11.21857452
Loss: 0.41098920
Loss: 0.25873697
Loss: 0.20796219
Loss: 0.18057358
Training Accuracy after training: 0.9192
Test Accuracy after training: 1.0000
---------------------------
Accuracy before training: 0.3333
Loss: 13.61304092
Loss: 0.34098408
Loss: 0.24114083
Loss: 0.18845302
Loss: 0.15532117
Training Accuracy after training: 0.9596
Test Accuracy after training: 0.9216
---------------------------
Accuracy before training: 0.3333
Loss: 29.98571205
Loss: 0.25740978
Loss: 0.18164127
Loss: 0.15090778
Loss: 0.13390265
Training Accuracy after training: 0.9608
Test Accuracy after training: 0.9792
---------------------------

Seems to be overfitting the model nicely. Actions:

  • Test accuracy - DONE
  • Repeat the experiment with a held out test set, still holds? - DONE
  • Replace RELU with k-Winners - is k-Winners working? - TODO
  • Extend to larger dataset, MNIST
  • Replace RELU with a class
  • Extend to larger model, CNNs
  • Run similar tests for both RELU and k-Winners - results hold?

In [42]:
import torch
from torch import nn
from torchvision import models

class KWinners(nn.Module):

    def __init__(self, k=10):
        super(KWinners, self).__init__()

        self.duty_cycle = None
        self.k = 10
        self.beta = 100
        self.T = 1000
        self.current_time = 0

    def forward(self, x):

        # initialize duty cycle
        if self.duty_cycle is None:
            self.duty_cycle = torch.zeros_like(k)

        # keep track of number of past iteratctions
        if self.current_time < self.T:
            self.current_time += 1

        # calculating threshold and updating duty cycle 
        # should not be in the graph
        tx = x.clone().detach()
        # no need to calculate gradients
        with torch.set_grad_enabled(False):
            # get threshold
            # nonzero_mask = torch.nonzero(tx) # will need for sparse weights
            threshold = self._get_threshold(tx)
            # calculate boosting
            self._update_duty_cycle(mask)
            boosting = self._calculate_boosting()
            # get mask
            tx *= boosting
            mask = tx > threshold
            
        return x * mask

    def _get_threshold(self, x):
        """Calculate dynamic theshold""" 
        abs_x = torch.abs(x).view(-1)
        pos = abs_x.size()[0] - self.k
        threshold, _ = torch.kthvalue(abs_x, pos)

        return threshold

    def _update_duty_cycle(self, mask):
        """Update duty cycle""" 
        time = min(self.T, self.current_time)
        self.duty_cycle *= (time-1)/time
        self.duty_cycle += mask.float() / time

    def _calculate_boosting(self):
        """Calculate boosting according to formula on spatial pooling paper"""
        mean_duty_cycle = torch.mean(self.duty_cycle)
        diff_duty_cycle = self.duty_cycle - mean_duty_cycle
        boosting = (self.beta * diff_duty_cycle).exp()

        return boosting

In [ ]: