In [1]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
from torchvision import datasets
import torch
import torch.nn as nn
import torch.optim as optim
In [2]:
train_data = datasets.FashionMNIST('data', download=True, train=True)
# we need FloatTensors as input
train_X = train_data.data.float()
train_y = train_data.targets
test_data = datasets.FashionMNIST('data', download=True, train=False)
test_X = test_data.data.float()
test_y = test_data.targets
In [3]:
labels = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']
idx = np.random.randint(len(train_X))
sample_X = train_X[idx].numpy()
sample_y = train_y[idx].numpy()
print("Label: {}".format(labels[sample_y]))
plt.imshow(sample_X, 'gray')
Out[3]:
In [4]:
train_X.size()
Out[4]:
In [5]:
np.unique(train_y.numpy(), return_counts=True)
Out[5]:
In [6]:
print("Before flattening:")
print("Train size:", train_X.size(), train_y.size())
print("Test size:", test_X.size(), test_y.size())
train_X = train_X.view(-1, 28 * 28).squeeze(1)
test_X = test_X.view(-1, 28 * 28).squeeze(1)
print("\nAfter flattening:")
print("Train size:", train_X.size(), train_y.size())
print("Test size:", test_X.size(), test_y.size())
In [7]:
all_idx = np.arange(len(train_X))
np.random.shuffle(all_idx)
train_idx = all_idx[:50000]
dev_idx = all_idx[50000:]
print("The overlap between train and dev should be an empty set:", set(train_idx) & set(dev_idx))
print("")
dev_X = train_X[dev_idx]
dev_y = train_y[dev_idx]
train_X = train_X[train_idx]
train_y = train_y[train_idx]
print("Train size:", train_X.size(), train_y.size())
print("Dev size:", dev_X.size(), dev_y.size())
print("Test size:", test_X.size(), test_y.size())
In [8]:
class BatchedIterator:
def __init__(self, X, y, batch_size):
self.X = X
self.y = y
self.batch_size = batch_size
def iterate_once(self):
for start in range(0, len(self.X), self.batch_size):
end = start + self.batch_size
yield self.X[start:end], self.y[start:end]
Testing the iterator:
In [9]:
train_iter = BatchedIterator(train_X, train_y, 33333)
for batch in train_iter.iterate_once():
print(batch[0].size(), batch[1].size())
In [10]:
class SimpleClassifier(nn.Module):
def __init__(self, input_dim, output_dim, hidden_dim):
super().__init__()
self.input_layer = nn.Linear(input_dim, hidden_dim)
self.relu = nn.ReLU()
self.output_layer = nn.Linear(hidden_dim, output_dim)
def forward(self, X):
h = self.input_layer(X)
h = self.relu(h)
out = self.output_layer(h)
return out
In [11]:
model = SimpleClassifier(
input_dim=train_X.size(1),
output_dim=10,
hidden_dim=50
)
model
Out[11]:
In [12]:
for n, p in model.named_parameters():
print(n, p.size())
In [13]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
In [14]:
test_pred = model(test_X).max(axis=1)[1]
test_acc = torch.eq(test_pred, test_y).sum().float() / len(test_X)
test_acc
Out[14]:
In [15]:
batch_size = 1000
train_iter = BatchedIterator(train_X, train_y, batch_size)
dev_iter = BatchedIterator(dev_X, dev_y, batch_size)
test_iter = BatchedIterator(test_X, test_y, batch_size)
all_train_loss = []
all_dev_loss = []
all_train_acc = []
all_dev_acc = []
n_epochs = 10
for epoch in range(n_epochs):
# training loop
for bi, (batch_x, batch_y) in enumerate(train_iter.iterate_once()):
y_out = model(batch_x)
loss = criterion(y_out, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# one train epoch finished, evaluate on the train and the dev set (NOT the test)
train_out = model(train_X)
train_loss = criterion(train_out, train_y)
all_train_loss.append(train_loss.item())
train_pred = train_out.max(axis=1)[1]
train_acc = torch.eq(train_pred, train_y).sum().float() / len(train_X)
all_train_acc.append(train_acc)
dev_out = model(dev_X)
dev_loss = criterion(dev_out, dev_y)
all_dev_loss.append(dev_loss.item())
dev_pred = dev_out.max(axis=1)[1]
dev_acc = torch.eq(dev_pred, dev_y).sum().float() / len(dev_X)
all_dev_acc.append(dev_acc)
print(f"Epoch: {epoch}\n train accuracy: {train_acc} train loss: {train_loss}")
print(f" dev accuracy: {dev_acc} dev loss: {dev_loss}")
In [16]:
test_pred = model(test_X).max(axis=1)[1]
test_acc = torch.eq(test_pred, test_y).sum().float() / len(test_X)
test_acc
Out[16]:
In [17]:
plt.plot(all_train_loss, label='train')
plt.plot(all_dev_loss, label='dev')
plt.legend()
Out[17]:
In [18]:
plt.plot(all_train_acc, label='train')
plt.plot(all_dev_acc, label='dev')
plt.legend()
Out[18]:
In [19]:
toy_X = train_X[:5]
toy_y = train_y[:5]
model = SimpleClassifier(
input_dim=train_X.size(1),
output_dim=10,
hidden_dim=500
)
optimizer = optim.Adam(model.parameters())
In [20]:
batch_size = 20
toy_train_iter = BatchedIterator(toy_X, toy_y, batch_size)
all_train_loss = []
all_dev_loss = []
all_train_acc = []
all_dev_acc = []
n_epochs = 20
for epoch in range(n_epochs):
# training loop
for bi, (batch_x, batch_y) in enumerate(toy_train_iter.iterate_once()):
y_out = model(batch_x)
loss = criterion(y_out, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# one train epoch finished, evaluate on the train and the dev set (NOT the test)
train_out = model(toy_X)
train_loss = criterion(train_out, toy_y)
all_train_loss.append(train_loss.item())
train_pred = train_out.max(axis=1)[1]
train_acc = torch.eq(train_pred, toy_y).sum().float() / len(toy_X)
all_train_acc.append(train_acc)
dev_out = model(dev_X)
dev_loss = criterion(dev_out, dev_y)
all_dev_loss.append(dev_loss.item())
dev_pred = dev_out.max(axis=1)[1]
dev_acc = torch.eq(dev_pred, dev_y).sum().float() / len(dev_X)
all_dev_acc.append(dev_acc)
In [21]:
fig, ax = plt.subplots(1, 2, figsize=(12, 5))
ax[0].set_title("Loss")
ax[1].set_title("Accuracy")
ax[0].set_xlabel("epoch")
ax[1].set_xlabel("epoch")
ax[0].plot(all_train_loss, label='train')
ax[0].plot(all_dev_loss, label='dev')
ax[1].plot(all_train_acc, label='train')
ax[1].plot(all_dev_acc, label='dev')
plt.legend()
Out[21]:
In [22]:
class SimpleClassifier(nn.Module):
def __init__(self, input_dim, output_dim,
hidden_dim):
super().__init__()
self.input_layer = nn.Linear(
input_dim, hidden_dim)
# let's add some extra layers in a list
self.extra_layers = [
nn.Linear(hidden_dim, 100),
nn.ReLU(),
nn.Linear(100, hidden_dim),
nn.ReLU(),
]
self.relu = nn.ReLU()
self.output_layer = nn.Linear(
hidden_dim, output_dim)
def forward(self, X):
h = self.input_layer(X)
h = self.relu(h)
# passing through extra layers
for layer in self.extra_layers:
h = layer(h)
out = self.output_layer(h)
return out
they are not part of the model
In [23]:
m = SimpleClassifier(4, 5, 6)
print(m)
print("Parameters:")
for name, param in m.named_parameters():
print("Name: {}, size: {}".format(name, param.size()))
In [24]:
class SimpleClassifier(nn.Module):
def __init__(self, input_dim, output_dim,
hidden_dim):
super().__init__()
self.input_layer = nn.Linear(
input_dim, hidden_dim)
# use ModuleList
self.extra_layers = nn.ModuleList([
nn.Linear(hidden_dim, 100),
nn.ReLU(),
nn.Linear(100, hidden_dim),
nn.ReLU(),
])
self.relu = nn.ReLU()
self.output_layer = nn.Linear(
hidden_dim, output_dim)
def forward(self, X):
h = self.input_layer(X)
h = self.relu(h)
# passing through extra layers
for layer in self.extra_layers:
h = layer(h)
out = self.output_layer(h)
return out
In [25]:
m = SimpleClassifier(4, 5, 6)
print(m)
print("Parameters:")
for name, param in m.named_parameters():
print("Name: {}, size: {}".format(name, param.size()))
In [ ]:
In [26]:
use_cuda = torch.cuda.is_available()
print(use_cuda)
Moving things manually to the GPU:
This should be automatically handled by your code the following way:
In [27]:
if use_cuda:
model = model.cuda()
criterion = criterion.cuda()