Working with MNIST dataset is no fun, so I modified it a bit to support CIFAR dataset.
In [32]:
import os
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
import torch.distributions as td
from torchvision import datasets, transforms
from torchvision.utils import make_grid
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
BATCH_SIZE = 128
EPOCHS = 50
LOG_INTERVAL = 100
SEED = 42
DATA_DIR = '/vulcan/scratch/kampta/data/cifar'
torch.manual_seed(SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
In [23]:
kwargs = {'num_workers': 1, 'pin_memory': True} if torch.cuda.is_available() else {}
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
train_set = datasets.CIFAR10(root=DATA_DIR, train=True,
download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=BATCH_SIZE,
shuffle=True, num_workers=2)
test_set = datasets.CIFAR10(root=DATA_DIR, train=False,
download=True, transform=transform)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=BATCH_SIZE,
shuffle=False, num_workers=2)
In [11]:
class VAE(nn.Module):
def __init__(self):
super(VAE, self).__init__()
# Encoder
# self.conv1 = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
# self.bn1 = nn.BatchNorm2d(16)
# self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1, bias=False)
# self.bn2 = nn.BatchNorm2d(32)
# self.conv3 = nn.Conv2d(32, 32, kernel_size=3, stride=1, padding=1, bias=False)
# self.bn3 = nn.BatchNorm2d(32)
# self.conv4 = nn.Conv2d(32, 16, kernel_size=3, stride=2, padding=1, bias=False)
# self.bn4 = nn.BatchNorm2d(16)
self.encoder = nn.Sequential(
nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(64),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=False),
nn.BatchNorm2d(128),
nn.ReLU(),
nn.MaxPool2d(2),
)
self.fc1 = nn.Linear(8 * 8 * 16, 512)
self.fc_bn1 = nn.BatchNorm1d(512)
self.fc21 = nn.Linear(512, 512)
self.fc22 = nn.Linear(512, 512)
# Decoder
self.fc3 = nn.Linear(512, 512)
self.fc_bn3 = nn.BatchNorm1d(512)
self.fc4 = nn.Linear(512, 8 * 8 * 16)
self.fc_bn4 = nn.BatchNorm1d(8 * 8 * 16)
self.conv5 = nn.ConvTranspose2d(16, 32, kernel_size=3, stride=2, padding=1, output_padding=1, bias=False)
self.bn5 = nn.BatchNorm2d(32)
self.conv6 = nn.ConvTranspose2d(32, 32, kernel_size=3, stride=1, padding=1, bias=False)
self.bn6 = nn.BatchNorm2d(32)
self.conv7 = nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1, bias=False)
self.bn7 = nn.BatchNorm2d(16)
self.conv8 = nn.ConvTranspose2d(16, 3, kernel_size=3, stride=1, padding=1, bias=False)
self.relu = nn.ReLU()
def encode(self, x):
conv1 = self.relu(self.bn1(self.conv1(x)))
conv2 = self.relu(self.bn2(self.conv2(conv1)))
conv3 = self.relu(self.bn3(self.conv3(conv2)))
conv4 = self.relu(self.bn4(self.conv4(conv3))).view(-1, 8 * 8 * 16)
fc1 = self.relu(self.fc_bn1(self.fc1(conv4)))
return self.fc21(fc1), self.fc22(fc1)
def reparameterize(self, mu, logvar):
std = torch.exp(0.5*logvar)
eps = torch.randn_like(std)
return mu + eps*std
def decode(self, z):
fc3 = self.relu(self.fc_bn3(self.fc3(z)))
fc4 = self.relu(self.fc_bn4(self.fc4(fc3))).view(-1, 16, 8, 8)
conv5 = self.relu(self.bn5(self.conv5(fc4)))
conv6 = self.relu(self.bn6(self.conv6(conv5)))
conv7 = self.relu(self.bn7(self.conv7(conv6)))
return self.conv8(conv7).view(-1, 3, 32, 32)
def forward(self, x):
mu, logvar = self.encode(x)
z = self.reparameterize(mu, logvar)
return self.decode(z), mu, logvar
In [25]:
# Reconstruction + KL divergence losses summed over all elements and batch
def loss_function(recon_x, x, mu, logvar):
REC = F.mse_loss(recon_x, x, reduction='sum')
# see Appendix B from VAE paper:
# Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
# https://arxiv.org/abs/1312.6114
# 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return REC + KLD
def train(epoch):
model.train()
train_loss = 0
for batch_idx, (data, _) in enumerate(train_loader):
data = data.to(device)
optimizer.zero_grad()
recon_batch, mu, logvar = model(data)
loss = loss_function(recon_batch, data, mu, logvar)
loss.backward()
train_loss += loss.item()
optimizer.step()
if batch_idx % LOG_INTERVAL == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader),
loss.item() / len(data)))
print('====> Epoch: {} Average loss: {:.4f}'.format(
epoch, train_loss / len(train_loader.dataset)))
def test(epoch):
model.eval()
test_loss = 0
with torch.no_grad():
for i, (data, _) in enumerate(test_loader):
data = data.to(device)
recon_batch, mu, logvar = model(data)
test_loss += loss_function(recon_batch, data, mu, logvar).item()
test_loss /= len(test_loader.dataset)
print('====> Test set loss: {:.4f}'.format(test_loss))
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
In [33]:
model = VAE().to(device)
print(f"# Parameters: {count_parameters(model)}")
optimizer = optim.Adam(model.parameters(), lr=1e-2)
fixed_z = torch.randn(8, 512).to(device)
fixed_x, _ = next(iter(test_loader))
fixed_x = fixed_x[:8].to(device)
fixed_x_unnormalized = ((fixed_x*0.5) + 0.5)
for epoch in range(1, EPOCHS + 1):
train(epoch)
test(epoch)
fig, axs = plt.subplots(3, 8, figsize=(20, 10))
with torch.no_grad():
# Reconstruction
recon_x, *_ = model(fixed_x)
recon_x = torch.clamp((recon_x*0.5) + 0.5, 0, 1)
for h in range(8):
axs[0][h].imshow(fixed_x_unnormalized.cpu()[h].permute(1, 2, 0).numpy())
axs[0][h].set_axis_off()
for h in range(8):
axs[1][h].imshow(recon_x.cpu()[h].permute(1, 2, 0).numpy())
axs[1][h].set_axis_off()
# Generated samples
fixed_z_gen = model.decode(fixed_z).cpu()
fixed_z_gen = torch.clamp((fixed_z_gen*0.5) + 0.5, 0, 1)
for h in range(8):
axs[2][h].imshow(fixed_z_gen[h].permute(1, 2, 0).numpy())
axs[2][h].set_axis_off()
plt.show()