In [2]:
import torch
import torch.nn as nn
import torchvision.models as models
from utils import Dataset
import torch.nn.functional as F
In [105]:
# how to test if this kwinners implementation isd oing the right thing?
# I can test it directly in a model
# or try to implement the same class in a more simple setting
# let's do the simple setting
from sklearn import datasets iris = datasets.load_iris() x = torch.tensor(iris.data, dtype=torch.float) y = torch.tensor(iris.target, dtype=torch.long) x.shape, y.shape
In [8]:
dataset = Dataset(config=dict(dataset_name='MNIST', data_dir='~/nta/results'))
In [107]:
# build up a small neural network
inputs = []
def init_weights():
W1 = torch.randn((4,10), requires_grad=True)
b1 = torch.zeros(10, requires_grad=True)
W2 = torch.randn((10,3), requires_grad=True)
b2 = torch.zeros(3, requires_grad=True)
return [W1, b1, W2, b2]
# torch cross_entropy is log softmax activation + negative log likelihood
loss_func = F.cross_entropy
# simple feedforward model
def model(input):
W1, b1, W2, b2 = parameters
x = input @ W1 + b1
x = F.relu(x)
x = x @ W2 + b2
return x
# calculate accuracy
def accuracy(out, y):
preds = torch.argmax(out, dim=1)
return (preds == y).float().mean().item()
In [108]:
from sklearn.model_selection import StratifiedKFold
cv = StratifiedKFold(n_splits=3)
In [111]:
# train
lr = 0.01
epochs = 1000
for train, test in cv.split(x, y):
x_train, y_train = x[train], y[train]
x_test, y_test = x[test], y[test]
parameters = init_weights()
print("Accuracy before training: {:.4f}".format(accuracy(model(x), y)))
for epoch in range(epochs):
loss = loss_func(model(x_train), y_train)
if epoch % (epochs/5) == 0:
print("Loss: {:.8f}".format(loss.item()))
# backpropagate
loss.backward()
with torch.no_grad():
for param in parameters:
# update weights
param -= lr * param.grad
# zero gradients
param.grad.zero_()
print("Training Accuracy after training: {:.4f}".format(accuracy(model(x_train), y_train)))
print("Test Accuracy after training: {:.4f}".format(accuracy(model(x_test), y_test)))
print("---------------------------")
Seems to be overfitting the model nicely. Actions:
In [42]:
import torch
from torch import nn
from torchvision import models
class KWinners(nn.Module):
def __init__(self, k=10):
super(KWinners, self).__init__()
self.duty_cycle = None
self.k = 10
self.beta = 100
self.T = 1000
self.current_time = 0
def forward(self, x):
# initialize duty cycle
if self.duty_cycle is None:
self.duty_cycle = torch.zeros_like(k)
# keep track of number of past iteratctions
if self.current_time < self.T:
self.current_time += 1
# calculating threshold and updating duty cycle
# should not be in the graph
tx = x.clone().detach()
# no need to calculate gradients
with torch.set_grad_enabled(False):
# get threshold
# nonzero_mask = torch.nonzero(tx) # will need for sparse weights
threshold = self._get_threshold(tx)
# calculate boosting
self._update_duty_cycle(mask)
boosting = self._calculate_boosting()
# get mask
tx *= boosting
mask = tx > threshold
return x * mask
def _get_threshold(self, x):
"""Calculate dynamic theshold"""
abs_x = torch.abs(x).view(-1)
pos = abs_x.size()[0] - self.k
threshold, _ = torch.kthvalue(abs_x, pos)
return threshold
def _update_duty_cycle(self, mask):
"""Update duty cycle"""
time = min(self.T, self.current_time)
self.duty_cycle *= (time-1)/time
self.duty_cycle += mask.float() / time
def _calculate_boosting(self):
"""Calculate boosting according to formula on spatial pooling paper"""
mean_duty_cycle = torch.mean(self.duty_cycle)
diff_duty_cycle = self.duty_cycle - mean_duty_cycle
boosting = (self.beta * diff_duty_cycle).exp()
return boosting
In [ ]: