Autoencoders


Intro

Exploratory notebook related to Autoencoders. Includes toy examples implementation and testing of related techniques or subjects.

Most of the examples have been implemented with Keras/Tensorflow 2.0

Resources

Building Autoencoders in Keras


In [ ]:
# Basic libraries import
import time
import numpy as np
import pandas as pd
import pdb
import sys
import os
from pathlib import Path
import seaborn as sns
import yaml
import functools
from datetime import datetime
from tqdm import tqdm_notebook as tqdm
import cv2

# Keras
from tensorflow.python.keras.models import *
from tensorflow.python.keras.layers import *
from tensorflow.python.keras.layers.core import Activation, Dense
from tensorflow.python.keras import backend as K
from tensorflow.python.keras import optimizers
from tensorflow.python.keras import datasets
from tensorflow.python.keras.initializers import *
from tensorflow.python.keras.callbacks import *

# Plotting
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import animation
plt.rcParams['animation.ffmpeg_path'] = str(Path.home() / "anaconda3/envs/image-processing/bin/ffmpeg")

sns.set_style("dark")
sns.set_context("paper")

%matplotlib notebook

# Local utils
%load_ext autoreload
%autoreload 2

from Autoencoder import Autoencoder
from VAE import VAE
from ds_utils import image_processing
from ds_utils.plot_utils import plot_sample_imgs

Autoencoder

The goal of an autoencoder is to learn a compressed and distributed representation of a dataset. In the most general case it is then required for the autoencoder to be able to reconstruct the original input as accurately as possible (minimize reconstruction error). This technique implicitly operates features extraction and learning, which generally would outperform handcrafted features results.

For a single-layer feedforward net this can be achieved by using an hidden size smaller than the input one, and training on a function that consider how well the net is then able to reconstruct the input data. If hidden size is equal or higher than input size, the net should learn the identity matrix.

Additional concepts:

  • sparsity and regularization
  • Denoising Autoencoders (DAE): where the training is between a corrupted version of the input and the correct one as output
  • Variational Autoencoder

In [ ]:
data_folder = Path.home() / "Documents/datasets"

Numbers Encoding

An autoencoder that tries to learn a compressed (?binary) representation for one-hot encoded numbers.

1 = 00001
2 = 00010
3 = 00100
4 = 01000
5 = 10000


In [ ]:
# create one-hot encoded numbers
input_dim = 10
nums = np.eye(input_dim)[np.arange(input_dim)]
nums

In [ ]:
# model parameters
hidden_size = input_dim//2

# Keras model
model = Sequential()
model.add(Dense(hidden_size, input_dim=input_dim, activation=K.sigmoid))
model.add(Dense(input_dim, activation=K.sigmoid))
          
# compile model
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

In [ ]:
# fit model
model.fit(nums, nums, epochs=100)

In [ ]:
model.summary()
layer_name = 'dense_2'

In [ ]:
# hidden layer weights
sns.heatmap(model.get_layer(layer_name).get_weights()[0])
sns.plt.show()

In [ ]:
# get hidden layer output building "intermediate model"
intermediate_layer_model = Model(inputs=model.input,
                                 outputs=model.get_layer(layer_name).output)
intermediate_output = intermediate_layer_model.predict(nums)

In [ ]:
intermediate_output

In [ ]:
# predictions
sns.heatmap(model.predict(nums[np.array([1,2,3,5,6])]))
sns.plt.show()

MNIST

Train autoencoder on the MNIST dataset.


In [ ]:
from keras.datasets import mnist

In [ ]:
(X_train, y_train), (X_test, y_test) = mnist.load_data()

In [ ]:
# flatten 28*28 images to a 784 vector for each image
num_pixels = X_train.shape[1] * X_train.shape[2]
# get only subset of images
num_images = 1000
X_train = X_train[:num_images].reshape(num_images, num_pixels).astype('float32')
X_test = X_test[:num_images].reshape(num_images, num_pixels).astype('float32')

In [ ]:
# normalize inputs from 0-255 to 0-1
X_train = X_train / 255
X_test = X_test / 255

In [ ]:
# Keras model
model = Sequential()
model.add(Dense(512, input_dim=num_pixels, activation=K.relu))
model.add(Dense(256, activation=K.relu))
model.add(Dense(512, activation=K.relu))
model.add(Dense(num_pixels, activation=K.relu))
          
# compile model
model.compile(loss='mean_squared_error', optimizer='adam')

In [ ]:
model.summary()

In [ ]:
model.fit(X_train, X_train, batch_size=100, epochs=10)

In [ ]:
# show original test example
plt.imshow(X_test[5].reshape(28, 28), cmap='gray')

In [ ]:
# show predicted results
pred = model.predict(X_test[5].reshape(1, num_pixels))
plt.imshow(pred.reshape(28, 28), cmap='gray')

In [ ]:
# pick some original sample indexes
plot_side = 5
sample_indexes = np.random.choice(range(X_train.shape[0]), plot_side*plot_side)

In [ ]:
# show several original test examples
plot_sample_imgs(lambda _: X_test[sample_indexes], (28, 28), plot_side=plot_side)

In [ ]:
# show predicted results
plot_sample_imgs(lambda _: model.predict(X_test[sample_indexes]), (28, 28), plot_side=plot_side)

[TODO] Denoising Autoencoder (DAE)

Improves on vanilla version by using a dataset where the training inputs have been corrupted with some sort of noise.

Variational Autoencoder (VAE)

Just one constrain separates a normal autoencoder from a variational one: forcing "it to generate latent vectors that roughly follow a unit Gaussian distribution". The generation process is then about sampling a latent vector and feeding it to the decoder.

Resources:


In [ ]:
# load model config
with open('configs/vae_config.yaml', 'r') as f:
    config = yaml.load(f)
HIDDEN_DIM = config['model']['encoder']['latent_dim']
IMG_SHAPE = config['data']['input_shape']
IMG_IS_BW = IMG_SHAPE[2] == 1
PLOT_IMG_SHAPE = IMG_SHAPE[:2] if IMG_IS_BW else IMG_SHAPE
config

Data


In [ ]:
# load Fashion MNIST dataset
((X_train, y_train), (X_test, y_test)) = datasets.fashion_mnist.load_data()
X_train = (X_train/255.)[:, :, :, np.newaxis]  # or np.expand_dims(X_train, axis=-1)
X_test = (X_test/255.)[:, :, :, np.newaxis]    # ?? .astype('float32')

In [ ]:
print(X_train[0].shape)
print(X_train[0].max())
print(X_train[0].min())

print(X_train.shape)
#print(y_train.shape)

img_shape = X_train[0].shape
assert img_shape == tuple(config['data']['input_shape'])

Model


In [ ]:
# Instantiate VAE
vae = VAE(config['data']['input_shape'], config)

In [ ]:
# test encoder
encoder_out = vae.encoder.predict(X_train[0:1])
np.array(encoder_out).shape

In [ ]:
# test decoder
decoder_out = vae.decoder.predict(encoder_out[2])
np.array(decoder_out).shape

In [ ]:
# plot random generated image
plt.imshow(vae.decoder.predict([np.random.randn(1, HIDDEN_DIM)])[0]
           .reshape(PLOT_IMG_SHAPE), cmap='gray' if IMG_IS_BW else 'jet')
plt.show()

In [ ]:
vae.decoder.summary()

Train


In [ ]:
# setup model directory for checkpoint and tensorboard logs
model_name = "vae_celeba"
model_dir = Path.home() / "Documents/models/tf_playground/autoencoders" / model_name
model_dir.mkdir(exist_ok=True, parents=True)
log_dir = model_dir / "logs" / datetime.now().strftime("%Y%m%d-%H%M%S")

In [ ]:
# train using tf.dataset
nb_epochs = 2000
vae.train(train_ds=vae.setup_dataset(celeba_train_ds),
                validation_ds=vae.setup_dataset(celeba_test_ds),
                nb_epochs=nb_epochs,
                log_dir=log_dir,
                checkpoint_dir=None,
                is_tfdataset=True)

In [ ]:
# train using pure numpy data
nb_epochs = 2000
vae.train(train_ds=X_train[:5000],
                validation_ds=X_test[:100],
                nb_epochs=nb_epochs,
                log_dir=log_dir,
                checkpoint_dir=None)

In [ ]:
export_dir = model_dir / 'export'
export_dir.mkdir(exist_ok=True)
vae.model.save(str(export_dir / (datetime.now().strftime("%Y%m%d-%H%M%S") + '.h5')))

In [ ]:
# plot VAE results
plot_side = 5
plot_sample_imgs(lambda x: vae.model.predict(X_train[:plot_side*plot_side]), 
                 img_shape=PLOT_IMG_SHAPE,
                 plot_side=plot_side)

In [ ]:
# plot decoder results
plot_side = 5
plot_sample_imgs(lambda x: vae.decoder.predict([np.random.randn(plot_side*plot_side, HIDDEN_DIM)]), 
                 img_shape=PLOT_IMG_SHAPE,
                 plot_side=plot_side)

Explore Latent Space

Latent Space Interpolation

Animation of the continuous interpolation between two distinct examples.


In [ ]:
%matplotlib notebook

start_idx = np.random.randint(len(X_train))
end_idx = np.random.randint(len(X_train))

# Get latent vector for start and end, and compute diff
z_start = vae.encoder.predict(X_train[start_idx:start_idx+1])[2]
z_end = vae.encoder.predict(X_train[end_idx:end_idx+1])[2]
z_diff = z_end - z_start

# setup plot
nb_frames = 50
fig, ax = plt.subplots(dpi=100, figsize=(5, 4))
im = ax.imshow(X_train[start_idx].reshape(PLOT_IMG_SHAPE), cmap='gray' if IMG_IS_BW else 'jet')
plt.axis('off')

def animate(i, z_start, z_diff, nb_frames):
    z_start += z_diff/nb_frames
    im.set_data(vae.decoder.predict(z_start).reshape(PLOT_IMG_SHAPE))

ani = animation.FuncAnimation(fig, animate, frames=nb_frames, interval=100, 
                              fargs=[z_start, z_diff, nb_frames])

Animation of interpolation across multiple samples


In [ ]:
%matplotlib inline

In [ ]:
render_dir = Path.home() / 'Documents/videos/vae' / "vae_celeba"

nb_samples = 30
nb_transition_frames = 10
nb_frames = min(2000, (nb_samples-1)*nb_transition_frames)

# random list of z vectors
z_s = [vae.encoder.predict(X_train[idx:idx+1])[2] for idx in np.random.randint(len(X_train), size=nb_samples)]
#z_s = [np.random.randn(1, HIDDEN_DIM) for idx in np.random.randint(len(X_train), size=nb_samples)]

# setup plot
dpi = 100
fig, ax = plt.subplots(dpi=dpi, figsize=(PLOT_IMG_SHAPE[0] / dpi, PLOT_IMG_SHAPE[1] / dpi))
fig.subplots_adjust(left=0,right=1,bottom=0,top=1)
im = ax.imshow(X_train[start_idx].reshape(PLOT_IMG_SHAPE))
plt.axis('off')

def animate(i, vae, z_s, nb_transition_frames):
    z_start = z_s[i//nb_transition_frames]
    z_end = z_s[i//nb_transition_frames+1]
    z_diff = z_end - z_start
    cur_z = z_start + (z_diff/nb_transition_frames)*(i%nb_transition_frames)
    im.set_data(vae.decoder.predict(cur_z).reshape(PLOT_IMG_SHAPE))

ani = animation.FuncAnimation(fig, animate, frames=nb_frames, interval=1, 
                              fargs=[vae, z_s, nb_transition_frames])

if render_dir:
    render_dir.mkdir(parents=True, exist_ok=True)
    ani.save(str(render_dir / (datetime.now().strftime("%Y%m%d-%H%M%S") + '.mp4')), 
             animation.FFMpegFileWriter(fps=30))

Animation of results produces by continuous variation of a specific feature of the latent vector


In [ ]:
render_dir = Path.home() / 'Documents/videos/vae' / "vae_celeba_idxs"

nb_transition_frames = 150

# random list of z vectors
rand_idx = np.random.randint(len(X_train))
z_start = vae.encoder.predict(X_train[rand_idx:rand_idx+1])[2]
vals = np.linspace(-1., 1., nb_transition_frames)

# setup plot
dpi = 100
fig, ax = plt.subplots(dpi=dpi, figsize=(PLOT_IMG_SHAPE[0] / dpi, PLOT_IMG_SHAPE[1] / dpi))
fig.subplots_adjust(left=0,right=1,bottom=0,top=1)
#fig, ax = plt.subplots(dpi=100, figsize=(5, 4))
im = ax.imshow(X_train[start_idx].reshape(PLOT_IMG_SHAPE))
plt.axis('off')

def animate(i, vae, z_start, idx, vals):
    z_start[0][idx] = vals[i]
    im.set_data(vae.decoder.predict(z_start).reshape(PLOT_IMG_SHAPE))

for z_idx in range(100):
    ani = animation.FuncAnimation(fig, animate, frames=nb_transition_frames, interval=10, 
                                  fargs=[vae, z_start.copy(), z_idx, vals])

    if render_dir:
        render_dir.mkdir(parents=True, exist_ok=True)
        ani.save(str(render_dir / 'idx{}.mp4'.format(z_idx)), animation.FFMpegFileWriter(fps=30))