In [8]:
%run homework_modules.ipynb
In [9]:
import torch
from torch.autograd import Variable
import numpy
import unittest
In [3]:
def generate_idx_pairs(labels):
classes_size = labels.size()[0]
classes_eq = (labels.repeat(classes_size, 1) == labels.view(-1, 1).repeat(1, classes_size)).data
pos_inds = (classes_eq == 1).nonzero()
pos_idx_pairs = pos_inds[pos_inds[:,0] < pos_inds[:,1]]
pos_inds_1 = pos_idx_pairs[:, 0]
pos_inds_2 = pos_idx_pairs[:, 1]
neg_inds = (~classes_eq == 1).nonzero()
neg_idx_pairs = neg_inds[neg_inds[:,0] < neg_inds[:,1]]
neg_inds_1 = neg_idx_pairs[:, 0]
neg_inds_2 = neg_idx_pairs[:, 1]
return pos_inds_1, pos_inds_2, neg_inds_1, neg_inds_2
def calc_dist(feat, pids):
pos_inds_1, pos_inds_2, neg_inds_1, neg_inds_2 = generate_idx_pairs(pids)
if torch.cuda.is_available() :
pos_inds_1, pos_inds_2, neg_inds_1, neg_inds_2 = pos_inds_1.cuda(), pos_inds_2.cuda(), neg_inds_1.cuda(), neg_inds_2.cuda()
p = torch.sqrt(torch.sum(torch.pow(feat[pos_inds_1] - feat[pos_inds_2], 2), dim=1))
n = torch.sqrt(torch.sum(torch.pow(feat[neg_inds_1] - feat[neg_inds_2], 2), dim=1))
return p,n
class ContrastiveLoss(torch.nn.Module):
def __init__(self, m, mode = 'sum'):
self.m = m
self.mode = mode
super(ContrastiveLoss, self).__init__()
def forward(self, d, y): # lables are 0 and 1
l = y*torch.pow(d, 2) + (1-y)*torch.pow(torch.nn.functional.relu(self.m-d), 2)
if self.mode == 'sum':
return torch.sum(l)
elif self.mode == 'mean':
return torch.mean(l)
else:
raise Exception("not implemented mode :" , self.mode)
class ContrastiveCombinedLoss(torch.nn.Module):
def __init__(self, m, mode = 'mean'):
super(ContrastiveCombinedLoss, self).__init__()
self.criterion = ContrastiveLoss(m, mode)
def forward(self, feat, pids):
p,n = calc_dist(feat, pids)
ploss = self.criterion(p, torch.ones(len(p)))
nloss = self.criterion(n, torch.zeros(len(n)))
return(0.5*ploss+0.5*nloss)
In [13]:
%run homework_modules.ipynb
class TestLayers(unittest.TestCase):
def test_Linear(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in, n_out = 2, 3, 4
for _ in range(100):
# layers initialization
torch_layer = torch.nn.Linear(n_in, n_out)
custom_layer = Linear(n_in, n_out)
custom_layer.W = torch_layer.weight.data.numpy()
custom_layer.b = torch_layer.bias.data.numpy()
layer_input = np.random.uniform(-10, 10, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.uniform(-10, 10, (batch_size, n_out)).astype(np.float32)
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var)
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
# 2. check layer input grad
custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))
# 3. check layer parameters grad
custom_layer.accGradParameters(layer_input, next_layer_grad)
weight_grad = custom_layer.gradW
bias_grad = custom_layer.gradb
torch_weight_grad = torch_layer.weight.grad.data.numpy()
torch_bias_grad = torch_layer.bias.grad.data.numpy()
self.assertTrue(np.allclose(torch_weight_grad, weight_grad, atol=1e-6))
self.assertTrue(np.allclose(torch_bias_grad, bias_grad, atol=1e-6))
def test_SoftMax(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 2, 4
for _ in range(100):
# layers initialization
torch_layer = torch.nn.Softmax(dim=1)
custom_layer = SoftMax()
layer_input = np.random.uniform(-10, 10, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.random((batch_size, n_in)).astype(np.float32)
next_layer_grad /= next_layer_grad.sum(axis=-1, keepdims=True)
next_layer_grad = next_layer_grad.clip(1e-5,1.)
next_layer_grad = 1. / next_layer_grad
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var)
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-5))
# 2. check layer input grad
custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-5))
def test_LogSoftMax(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 2, 4
for _ in range(100):
# layers initialization
torch_layer = torch.nn.LogSoftmax(dim=1)
custom_layer = LogSoftMax()
layer_input = np.random.uniform(-10, 10, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.random((batch_size, n_in)).astype(np.float32)
next_layer_grad /= next_layer_grad.sum(axis=-1, keepdims=True)
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var)
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
# 2. check layer input grad
custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))
def test_BatchNormalization(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 32, 16
for _ in range(100):
# layers initialization
slope = np.random.uniform(0.01, 0.05)
alpha = 0.9
custom_layer = BatchNormalization(alpha)
custom_layer.train()
torch_layer = torch.nn.BatchNorm1d(n_in, eps=custom_layer.EPS, momentum=1.-alpha, affine=False)
custom_layer.moving_mean = torch_layer.running_mean.numpy().copy()
custom_layer.moving_variance = torch_layer.running_var.numpy().copy()
layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
# 1. check layer output
custom_layer_output_train = custom_layer.updateOutput(layer_input)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var)
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output_train, atol=1e-6))
# 2. check layer input grad
custom_layer_grad_train = custom_layer.updateGradInput(layer_input, next_layer_grad)
torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
torch_layer_grad_var = layer_input_var.grad
# please, don't increase `atol` parameter, it's garanteed that you can implement batch norm layer
# with tolerance 1e-5
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad_train, atol=1e-5))
# 3. check moving mean
self.assertTrue(np.allclose(custom_layer.moving_mean, torch_layer.running_mean.numpy()))
# we don't check moving_variance because pytorch uses slightly different formula for it:
# it computes moving average for unbiased variance (i.e var*N/(N-1))
#self.assertTrue(np.allclose(custom_layer.moving_variance, torch_layer.running_var.numpy()))
# 4. check evaluation mode
custom_layer.moving_variance = torch_layer.running_var.numpy().copy()
custom_layer.evaluate()
custom_layer_output = custom_layer.updateOutput(layer_input)
torch_layer.eval()
torch_layer_output_var = torch_layer(layer_input_var)
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
def test_Sequential(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 2, 4
for _ in range(100):
# layers initialization
alpha = 0.9
torch_layer = torch.nn.BatchNorm1d(n_in, eps=BatchNormalization.EPS, momentum=1.-alpha, affine=True)
torch_layer.bias.data = torch.from_numpy(np.random.random(n_in).astype(np.float32))
custom_layer = Sequential()
bn_layer = BatchNormalization(alpha)
bn_layer.moving_mean = torch_layer.running_mean.numpy().copy()
bn_layer.moving_variance = torch_layer.running_var.numpy().copy()
custom_layer.add(bn_layer)
scaling_layer = ChannelwiseScaling(n_in)
scaling_layer.gamma = torch_layer.weight.data.numpy()
scaling_layer.beta = torch_layer.bias.data.numpy()
custom_layer.add(scaling_layer)
custom_layer.train()
layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var)
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
# 2. check layer input grad
custom_layer_grad = custom_layer.backward(layer_input, next_layer_grad)
torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-5))
# 3. check layer parameters grad
weight_grad, bias_grad = custom_layer.getGradParameters()[1]
torch_weight_grad = torch_layer.weight.grad.data.numpy()
torch_bias_grad = torch_layer.bias.grad.data.numpy()
self.assertTrue(np.allclose(torch_weight_grad, weight_grad, atol=1e-6))
self.assertTrue(np.allclose(torch_bias_grad, bias_grad, atol=1e-6))
def test_Dropout(self):
np.random.seed(42)
batch_size, n_in = 2, 4
for _ in range(100):
# layers initialization
p = np.random.uniform(0.3, 0.7)
layer = Dropout(p)
layer.train()
layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
# 1. check layer output
layer_output = layer.updateOutput(layer_input)
self.assertTrue(np.all(np.logical_or(np.isclose(layer_output, 0),
np.isclose(layer_output*(1.-p), layer_input))))
# 2. check layer input grad
layer_grad1 = layer.updateGradInput(layer_input, next_layer_grad)
self.assertTrue(np.all(np.logical_or(np.isclose(layer_grad1, 0),
np.isclose(layer_grad1*(1.-p), next_layer_grad))))
# 3. check evaluation mode
layer.evaluate()
layer_output = layer.updateOutput(layer_input)
self.assertTrue(np.allclose(layer_output, layer_input))
# 4. check mask
p = 0.0
layer = Dropout(p)
layer.train()
layer_output2 = layer.updateOutput(layer_input)
self.assertTrue(np.allclose(layer_output2, layer_input))
p = 0.5
layer = Dropout(p)
layer.train()
layer_input = np.random.uniform(5, 10, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.uniform(5, 10, (batch_size, n_in)).astype(np.float32)
layer_output3 = layer.updateOutput(layer_input)
zeroed_elem_mask = np.isclose(layer_output3, 0)
layer_grad4 = layer.updateGradInput(layer_input, next_layer_grad)
self.assertTrue(np.all(zeroed_elem_mask == np.isclose(layer_grad4, 0)))
# 5. dropout mask should be generated independently for every input matrix element, not for row/column
batch_size, n_in = 1000, 1
p = 0.8
layer = Dropout(p)
layer.train()
layer_input = np.random.uniform(5, 10, (batch_size, n_in)).astype(np.float32)
layer_output = layer.updateOutput(layer_input)
self.assertTrue(np.sum(np.isclose(layer_output, 0)) != layer_input.size)
layer_input = layer_input.T
layer_output = layer.updateOutput(layer_input)
self.assertTrue(np.sum(np.isclose(layer_output, 0)) != layer_input.size)
def test_LeakyReLU(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 2, 4
for _ in range(100):
# layers initialization
slope = np.random.uniform(0.01, 0.05)
torch_layer = torch.nn.LeakyReLU(slope)
custom_layer = LeakyReLU(slope)
layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var)
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
# 2. check layer input grad
custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))
def test_ELU(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 2, 4
for _ in range(100):
# layers initialization
alpha = 1.0
torch_layer = torch.nn.ELU(alpha)
custom_layer = ELU(alpha)
layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var)
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
# 2. check layer input grad
custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))
def test_SoftPlus(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 2, 4
for _ in range(100):
# layers initialization
torch_layer = torch.nn.Softplus()
custom_layer = SoftPlus()
layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
next_layer_grad = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var)
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
# 2. check layer input grad
custom_layer_grad = custom_layer.updateGradInput(layer_input, next_layer_grad)
torch_layer_output_var.backward(torch.from_numpy(next_layer_grad))
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))
def test_ClassNLLCriterionUnstable(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 2, 4
for _ in range(100):
# layers initialization
torch_layer = torch.nn.NLLLoss()
custom_layer = ClassNLLCriterionUnstable()
layer_input = np.random.uniform(0, 1, (batch_size, n_in)).astype(np.float32)
layer_input /= layer_input.sum(axis=-1, keepdims=True)
layer_input = layer_input.clip(custom_layer.EPS, 1. - custom_layer.EPS) # unifies input
target_labels = np.random.choice(n_in, batch_size)
target = np.zeros((batch_size, n_in), np.float32)
target[np.arange(batch_size), target_labels] = 1 # one-hot encoding
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input, target)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(torch.log(layer_input_var),
Variable(torch.from_numpy(target_labels), requires_grad=False))
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
# 2. check layer input grad
custom_layer_grad = custom_layer.updateGradInput(layer_input, target)
torch_layer_output_var.backward()
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))
def test_ClassNLLCriterion(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 2, 4
for _ in range(100):
# layers initialization
torch_layer = torch.nn.NLLLoss()
custom_layer = ClassNLLCriterion()
layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
layer_input = torch.nn.LogSoftmax(dim=1)(Variable(torch.from_numpy(layer_input))).data.numpy()
target_labels = np.random.choice(n_in, batch_size)
target = np.zeros((batch_size, n_in), np.float32)
target[np.arange(batch_size), target_labels] = 1 # one-hot encoding
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input, target)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var,
Variable(torch.from_numpy(target_labels), requires_grad=False))
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
# 2. check layer input grad
custom_layer_grad = custom_layer.updateGradInput(layer_input, target)
torch_layer_output_var.backward()
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))
def test_ClassContrastiveCriterion(self):
np.random.seed(42)
torch.manual_seed(42)
batch_size, n_in = 8, 4 ## should be > n_in so that there are positive pairs in the batch!
m = 10
for _ in range(100):
# layers initialization
torch_layer = ContrastiveCombinedLoss(m=10)
custom_layer = ClassContrastiveCriterion(M=10)
layer_input = np.random.uniform(-5, 5, (batch_size, n_in)).astype(np.float32)
target_labels = np.random.choice(n_in, batch_size)
# 1. check layer output
custom_layer_output = custom_layer.updateOutput(layer_input, target_labels)
layer_input_var = Variable(torch.from_numpy(layer_input), requires_grad=True)
torch_layer_output_var = torch_layer(layer_input_var,
Variable(torch.from_numpy(target_labels), requires_grad=False))
self.assertTrue(np.allclose(torch_layer_output_var.data.numpy(), custom_layer_output, atol=1e-6))
# 2. check layer input grad
custom_layer_grad = custom_layer.updateGradInput(layer_input, target_labels)
torch_layer_output_var.backward()
torch_layer_grad_var = layer_input_var.grad
self.assertTrue(np.allclose(torch_layer_grad_var.data.numpy(), custom_layer_grad, atol=1e-6))
def test_adam_optimizer(self):
state = {}
config = {'learning_rate': 1e-3, 'beta1': 0.9, 'beta2':0.999, 'epsilon':1e-8}
variables = [[np.arange(10).astype(np.float64)]]
gradients = [[np.arange(10).astype(np.float64)]]
adam_optimizer(variables, gradients, config, state)
self.assertTrue(np.allclose(state['m'][0], np.array([0. , 0.1, 0.2, 0.3, 0.4, 0.5,
0.6, 0.7, 0.8, 0.9])))
self.assertTrue(np.allclose(state['v'][0], np.array([0., 0.001, 0.004, 0.009, 0.016, 0.025,
0.036, 0.049, 0.064, 0.081])))
self.assertTrue(state['t'] == 1)
self.assertTrue(np.allclose(variables[0][0], np.array([0., 0.999, 1.999, 2.999, 3.999, 4.999,
5.999, 6.999, 7.999, 8.999])))
adam_optimizer(variables, gradients, config, state)
self.assertTrue(np.allclose(state['m'][0], np.array([0., 0.19, 0.38, 0.57, 0.76, 0.95, 1.14,
1.33, 1.52, 1.71])))
self.assertTrue(np.allclose(state['v'][0], np.array([0., 0.001999, 0.007996, 0.017991,
0.031984, 0.049975, 0.071964, 0.097951,
0.127936, 0.161919])))
self.assertTrue(state['t'] == 2)
self.assertTrue(np.allclose(variables[0][0], np.array([0., 0.998, 1.998, 2.998, 3.998, 4.998,
5.998, 6.998, 7.998, 8.998])))
suite = unittest.TestLoader().loadTestsFromTestCase(TestLayers)
unittest.TextTestRunner(verbosity=2).run(suite)
Out[13]: