Following the design of Tensorflow Distributions, good folks at PyTorch also created Torch Distributions package of their own to have some simple parameterizable standard probability distributions. Since I work mostly with PyTorch at the moment, I decide to write some simple examples and scenarios where this package can reduce the amount of code I need to write.
Let's start with the official VAE example provided by the pytorch team here.
In [11]:
import os
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
import torch.distributions as td
from torchvision import datasets, transforms
from torchvision.utils import make_grid
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
BATCH_SIZE = 128
EPOCHS = 10
LOG_INTERVAL = 50
SEED = 42
DATA_DIR = '/vulcan/scratch/kampta/data/mnist'
torch.manual_seed(SEED)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if not os.path.exists(DATA_DIR):
os.makedirs(DATA_DIR)
In [8]:
kwargs = {'num_workers': 1, 'pin_memory': True} if torch.cuda.is_available() else {}
train_loader = torch.utils.data.DataLoader(
datasets.MNIST(DATA_DIR, train=True, download=True,
transform=transforms.ToTensor()),
batch_size=BATCH_SIZE, shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(DATA_DIR, train=False, transform=transforms.ToTensor()),
batch_size=BATCH_SIZE, shuffle=True, **kwargs)
In [9]:
class VAE(nn.Module):
def __init__(self):
super(VAE, self).__init__()
self.fc1 = nn.Linear(784, 400)
self.fc21 = nn.Linear(400, 20)
self.fc22 = nn.Linear(400, 20)
self.fc3 = nn.Linear(20, 400)
self.fc4 = nn.Linear(400, 784)
def encode(self, x):
h1 = F.relu(self.fc1(x))
return self.fc21(h1), self.fc22(h1)
def reparameterize(self, mu, logvar):
std = torch.exp(0.5*logvar)
eps = torch.randn_like(std)
return mu + eps*std
def decode(self, z):
h3 = F.relu(self.fc3(z))
return torch.sigmoid(self.fc4(h3))
def forward(self, x):
mu, logvar = self.encode(x.view(-1, 784))
z = self.reparameterize(mu, logvar)
return self.decode(z), mu, logvar
# Reconstruction + KL divergence losses summed over all elements and batch
def loss_function(recon_x, x, mu, logvar):
BCE = F.binary_cross_entropy(recon_x, x.view(-1, 784), reduction='sum')
# see Appendix B from VAE paper:
# Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
# https://arxiv.org/abs/1312.6114
# 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return BCE + KLD
def train(epoch):
model.train()
train_loss = 0
for batch_idx, (data, _) in enumerate(train_loader):
data = data.to(device)
optimizer.zero_grad()
recon_batch, mu, logvar = model(data)
loss = loss_function(recon_batch, data, mu, logvar)
loss.backward()
train_loss += loss.item()
optimizer.step()
if batch_idx % LOG_INTERVAL == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader),
loss.item() / len(data)))
print('====> Epoch: {} Average loss: {:.4f}'.format(
epoch, train_loss / len(train_loader.dataset)))
def test(epoch):
model.eval()
test_loss = 0
with torch.no_grad():
for i, (data, _) in enumerate(test_loader):
data = data.to(device)
recon_batch, mu, logvar = model(data)
test_loss += loss_function(recon_batch, data, mu, logvar).item()
test_loss /= len(test_loader.dataset)
print('====> Test set loss: {:.4f}'.format(test_loss))
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
In [10]:
model = VAE().to(device)
print(f"# Parameters: {count_parameters(model)}")
optimizer = optim.Adam(model.parameters(), lr=1e-3)
fixed_z = torch.randn(8, 20).to(device)
fixed_x, _ = next(iter(test_loader))
fixed_x = fixed_x[:8].to(device)
for epoch in range(1, EPOCHS + 1):
train(epoch)
test(epoch)
fig, axs = plt.subplots(3, 8, figsize=(20, 10))
with torch.no_grad():
# Reconstruction
recon_x, *_ = model(fixed_x)
for h in range(8):
axs[0][h].imshow(fixed_x.cpu().numpy()[h].reshape(28, 28))
axs[0][h].set_axis_off()
for h in range(8):
axs[1][h].imshow(recon_x.cpu().numpy()[h].reshape(28, 28))
axs[1][h].set_axis_off()
# Generated samples
fixed_z_gen = model.decode(fixed_z).cpu().numpy()
for h in range(8):
axs[2][h].imshow(fixed_z_gen[h].reshape(28, 28))
axs[2][h].set_axis_off()
plt.show()
In [25]:
class VAE(nn.Module):
def __init__(self):
super(VAE, self).__init__()
self.fc1 = nn.Linear(784, 400)
self.fc21 = nn.Linear(400, 20)
self.fc22 = nn.Linear(400, 20)
self.fc3 = nn.Linear(20, 400)
self.fc4 = nn.Linear(400, 784)
def encode(self, x):
h1 = F.relu(self.fc1(x))
return self.fc21(h1), self.fc22(h1)
def decode(self, z):
h3 = F.relu(self.fc3(z))
return torch.sigmoid(self.fc4(h3))
def forward(self, x):
mu, logvar = self.encode(x.view(-1, 784))
std = torch.exp(0.5*logvar)
z = td.normal.Normal(mu, std)
z_hat = z.rsample()
return self.decode(z_hat), z
# Reconstruction + KL divergence losses summed over all elements and batch
def loss_function(recon_x, x, z):
BCE = F.binary_cross_entropy(recon_x, x.view(-1, 784), reduction='sum')
# see Appendix B from VAE paper:
# Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
# https://arxiv.org/abs/1312.6114
# 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
standard_normal = td.normal.Normal(torch.zeros_like(z.loc), torch.ones_like(z.scale))
KLD = torch.sum(td.kl.kl_divergence(z, standard_normal))
return BCE + KLD
def train(epoch):
model.train()
train_loss = 0
for batch_idx, (data, _) in enumerate(train_loader):
data = data.to(device)
optimizer.zero_grad()
recon_batch, z = model(data)
loss = loss_function(recon_batch, data, z)
loss.backward()
train_loss += loss.item()
optimizer.step()
if batch_idx % LOG_INTERVAL == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader),
loss.item() / len(data)))
print('====> Epoch: {} Average loss: {:.4f}'.format(
epoch, train_loss / len(train_loader.dataset)))
def test(epoch):
model.eval()
test_loss = 0
with torch.no_grad():
for i, (data, _) in enumerate(test_loader):
data = data.to(device)
recon_batch, z = model(data)
test_loss += loss_function(recon_batch, data, z).item()
test_loss /= len(test_loader.dataset)
print('====> Test set loss: {:.4f}'.format(test_loss))
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
In [29]:
model = VAE().to(device)
print(f"# Parameters: {count_parameters(model)}")
optimizer = optim.Adam(model.parameters(), lr=1e-3)
fixed_z = torch.randn(8, 20).to(device)
fixed_x, _ = next(iter(test_loader))
fixed_x = fixed_x[:8].to(device)
for epoch in range(1, EPOCHS + 1):
train(epoch)
test(epoch)
fig, axs = plt.subplots(3, 8, figsize=(20, 10))
with torch.no_grad():
# Reconstruction
recon_x, *_ = model(fixed_x)
for h in range(8):
axs[0][h].imshow(fixed_x.cpu().numpy()[h].reshape(28, 28))
axs[0][h].set_axis_off()
for h in range(8):
axs[1][h].imshow(recon_x.cpu().numpy()[h].reshape(28, 28))
axs[1][h].set_axis_off()
# Generated samples
fixed_z_gen = model.decode(fixed_z).cpu().numpy()
for h in range(8):
axs[2][h].imshow(fixed_z_gen[h].reshape(28, 28))
axs[2][h].set_axis_off()
plt.show()