Intro

Exploratory notebook related to Generative Adversarial Networks (GANs). Includes toy examples implementation and testing of related techniques or subjects.

Generative Adversarial Networks (GANs)

Architecture that learns by posing two networks in competition with each others. Goal is to learn parameters in order to produce a distribution close to our dataset distribution (true distribution).

  • Discriminator: detect if an image belongs to a target dataset or not (generated by the generator)
  • Generator: generate new examples that looks like the training/target data (fool the discriminator)

In [ ]:
import time
from PIL import Image
import numpy as np
import pdb
import os
import sys
import seaborn as sns

import matplotlib
import matplotlib.pyplot as plt
from matplotlib import animation

from keras.models import Sequential
from keras.layers.core import Reshape, Dense, Dropout, Activation, Flatten
from keras.layers.convolutional import Convolution2D, MaxPooling2D, ZeroPadding2D, UpSampling2D
from keras.layers.advanced_activations import LeakyReLU
from keras import backend as K
from keras import optimizers
from keras.layers.normalization import BatchNormalization
from keras.datasets import mnist

from tqdm import tqdm_notebook as tqdm

%matplotlib notebook
sns.set_style("dark")


sys.path.append(os.path.join(os.getcwd(), os.pardir))
from utils.plot_utils import plot_sample_imgs
from utils.generative_utils import NoiseDistribution, set_trainable

RES_DIR = os.path.join(*[os.pardir]*2, 'data', 'deep_learning')

%load_ext autoreload
%autoreload 2

Gaussian Distribution Approximation (Keras)

Example adapted from Aylien blog.

Check also here for Keras code


In [ ]:
# target 1D gaussian distribution class
class GaussianDistribution:
    def __init__(self, mu=4, sigma=0.5):
        self.mu = mu
        self.sigma = sigma
    
    def sample(self, N):
        samples = np.random.normal(self.mu, self.sigma, N)
        samples.sort()
        return samples

In [ ]:
# generator input noise distribution class
class GeneratorNoiseDistribution:
    def __init__(self, vrange):
        self.vrange = vrange
        
    def sample(self, N):
        return np.linspace(-self.vrange, self.vrange, N) + \
                    np.random.random(N) * 0.01

In [ ]:
def generator(input_dim, hidden_size):
    g = Sequential()
    g.add(Dense(hidden_size, input_dim=input_dim, activation=K.softplus))
    g.add(Dense(input_dim))
    return g

In [ ]:
def discriminator(input_dim, hidden_size):
    d = Sequential()
    d.add(Dense(hidden_size*2, input_dim=input_dim, activation=K.tanh))
    d.add(Dense(hidden_size*2, activation=K.tanh))
    d.add(Dense(hidden_size*2, activation=K.tanh))
    d.add(Dense(1, activation=K.sigmoid))
    return d

In [ ]:
# init distributions
gaussian_d = GaussianDistribution()
generator_d = GeneratorNoiseDistribution(8)

In [ ]:
# init GAN components
d = discriminator(1, 128)
g = generator(1, 128)

In [ ]:
# discriminator model
optimizer = optimizers.RMSprop(lr=0.0008, clipvalue=1.0, decay=6e-8)
discriminator_model = d
discriminator_model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

In [ ]:
# adversarial model
optimizer = optimizers.RMSprop(lr=0.0004, clipvalue=1.0, decay=3e-8)
adversarial_model = Sequential()
adversarial_model.add(g)
adversarial_model.add(d)
adversarial_model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

In [ ]:
batch_size = 64

fig, ax = plt.subplots(dpi=100, figsize=(5, 4))
true_dist = np.reshape(gaussian_d.sample(1000), (1000, 1))
plt.show()

def animate(step):
#for step in range(100):
    
    # generate data
    # first we sample from the true distribution, then we generate some
    # "fake" data by feeding noise to the generator
    true_sample = np.reshape(gaussian_d.sample(batch_size), (batch_size, 1))
    noise = generator_d.sample(batch_size)
    fake_sample = g.predict(noise)
    #pdb.set_trace()
    
    # train discriminator
    # feed true and fake samples with respective labels (1, 0) to the discriminator
    x = np.reshape(np.concatenate((true_sample, fake_sample)), (batch_size*2, 1))
    y = np.ones([batch_size*2, 1])
    y[batch_size:, :] = 0
    d_loss = discriminator_model.train_on_batch(x, y)
    
    # train GAN
    # feed noise to the model and expect true (1) response from discriminator,
    # which is in turn fed with data generated by the generator
    noise = np.reshape(generator_d.sample(batch_size), (batch_size, 1))
    y = np.ones([batch_size, 1])
    a_loss = adversarial_model.train_on_batch(noise, y)
    
    log_mesg = "%d: [D loss: %f, acc: %f]" % (step, d_loss[0], d_loss[1])
    log_mesg = "%s  [A loss: %f, acc: %f]" % (log_mesg, a_loss[0], a_loss[1])
    
    # plot
    fig.clf()
    fake = sns.distplot(fake_sample)
    fake.set_xlim([0,8])
    fake.set_ylim([0,3])
    sns.distplot(true_dist)
    sns.plt.text(3, 2, "Epoch {}, a_loss {:.3f}".format(step, a_loss[0]))
    
anim = animation.FuncAnimation(fig, animate, 200, repeat=False)

In [ ]:
noise = generator_d.sample(batch_size)
fake_sample = g.predict(noise)

In [ ]:
sns.distplot(fake_sample)
sns.plt.show()

MNIST GAN (Keras)


In [ ]:
noise_d = NoiseDistribution()

In [ ]:
input_dim = 100
img_shape = (28,28,1)

Generator Model


In [ ]:
# model takes real values vector of size input_dim and via upsampling,
# reshaping, and various convolutional filters generates a 28x28 b/w image
def generator_model(input_dim, n_channels=128, init_side=7):
    m = Sequential()
    m.add(Dense(init_side*init_side*n_channels, input_dim=input_dim, activation=LeakyReLU()))
    m.add(BatchNormalization(mode=2))
    m.add(Reshape((init_side, init_side, n_channels)))
    
    m.add(UpSampling2D())
    m.add(Convolution2D(n_channels//2, 3, 3, border_mode='same', activation=LeakyReLU()))
    m.add(BatchNormalization(mode=2))
    m.add(UpSampling2D())
    m.add(Convolution2D(n_channels//4, 3, 3, border_mode='same', activation=LeakyReLU()))
    m.add(BatchNormalization(mode=2))
    #?? Tanh 
    m.add(Convolution2D(1, 1, 1, border_mode='same', activation='sigmoid'))
    return m

In [ ]:
g = generator_model(input_dim=input_dim, n_channels=512)
g.summary()

In [ ]:
# plot random generated image
plt.imshow(g.predict(noise_d.sample((1, input_dim)))[0]
           .reshape(28, 28))
plt.show()

Discriminator Model


In [ ]:
# model takes image and after convolution and flattening
# outputs a probability value
def discriminator_model(input_shape, init_filters=64):
    m = Sequential()
    m.add(Convolution2D(init_filters, 5, 5, subsample=(2, 2), input_shape=input_shape, border_mode='same',
                       activation=LeakyReLU(0.2)))
    #?? maxpooling and dropout? MaxPool2D(pool_size=2)
    m.add(Convolution2D(init_filters*2, 5, 5, subsample=(2, 2), border_mode='same',
                       activation=LeakyReLU(0.2)))
    #m.add(Convolution2D(init_filters*4, 3, 5, border_mode='same', 
    #                    activation=LeakyReLU(0.2)))
    m.add(Flatten())
    m.add(Dense(256, activation=LeakyReLU()))
    m.add(Dense(1, activation='sigmoid'))
    return m

In [ ]:
d = discriminator_model(input_shape=(28,28,1), init_filters=256)
d.summary()

In [ ]:
# print prediction for random image
d.predict(g.predict(noise_d.sample((1, input_dim))))

GAN Model


In [ ]:
# init GAN components
g = generator_model(input_dim)
d = discriminator_model(img_shape)

# compile generator
#g_optimizer = optimizers.Adam(lr=0.0001)
#g.compile(loss='binary_crossentropy', optimizer=g_optimizer)

# compile discriminator
d_optimizer = optimizers.Adam(lr=0.001)
d.compile(loss='binary_crossentropy', optimizer=d_optimizer)

In [ ]:
# build adversarial model
gan = Sequential()
gan.add(g)
gan.add(d)
gan_optimizer = optimizers.Adam(lr=0.0001)
gan.compile(loss='binary_crossentropy', optimizer=gan_optimizer)

In [ ]:
gan.summary()

Train


In [ ]:
generator_fun = lambda num_samples : generator.predict(noise_d.sample((num_samples, input_dim)))

In [ ]:
# load mnist data using Keras
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# reshape and normalize train data
X_train = np.expand_dims(X_train, axis=-1)
X_train = X_train.astype('float32')/255
print(X_train.shape)
print(y_train.shape)

In [ ]:
def train_discriminator(d, g, noise_d, input_dim, X_train, batch_size, epoch):
    # generate data
    # first we sample from the true distribution (mnist dataset), then we generate some
    # "fake" images by feeding noise to the generator
    
    # generate random indexes for train data
    batch_idxs = np.random.randint(0, len(X_train), batch_size)
    # collect images corresponsing to previously generated index, and add a dimension
    true_sample = X_train[batch_idxs,:,:,:]
    # generate fake sample
    fake_sample = g.predict(noise_d.sample((batch_size, input_dim)))
    
    # prepare train batch data
    # concatenativ true and fake samples and adjusting labels accordingly
    x = np.concatenate((true_sample, fake_sample))
    y = np.ones([batch_size*2, 1])
    y[batch_size:,:] = 0
    
    # train discriminator
    # feed true and fake samples with respective labels (1, 0) to the discriminator
    set_trainable(d, True, None, None)
    d_loss = d.train_on_batch(x, y)
    #print("Epoch {}: [D loss: {}]".format(epoch, d_loss))
    return d_loss

In [ ]:
def train(d, g, gan, noise_d, input_dim, X_train, batch_size=32, n_epochs=1, add_epoch=0):
    losses = {'g':[], 'd':[]}
    for epoch in range(n_epochs):
        # train discriminator
        d_loss = train_discriminator(d, g, noise_d, input_dim, X_train, batch_size, epoch)
        losses['d'].append(d_loss)
        set_trainable(d, False, None, None)

        # train GAN
        # feed noise to the model and expect true (1) response from discriminator,
        # which is in turn fed with data generated by the generator
        noise = noise_d.sample((batch_size, input_dim))
        y = np.ones([batch_size, 1])

        g_loss = gan.train_on_batch(noise, y)
        losses['g'].append(g_loss)

        #print("Epoch {}: [G loss: {}]".format(epoch, g_loss))

        if (epoch%10)==0:
            plot_sample_imgs(generator_fun, img_shape[:2], savepath=os.path.join('data', 'mnist_gan', 'mnist_gen{}.jpg'.format(epoch+add_epoch)))
    return losses

In [ ]:
# pretrain discriminator
batch_size = 128
n_epochs = 1
for epoch in range(n_epochs):
    train_discriminator(d, g, noise_d, input_dim, X_train, batch_size, epoch)

In [ ]:
#plt.ion()
plt.ioff()

K.set_value(d.optimizer.lr, 1e-3)
K.set_value(gan.optimizer.lr, 1e-3)

losses = train(d, g, gan, noise_d, input_dim, X_train, 
               batch_size=256, n_epochs=1000, add_epoch=120)

In [ ]:
# plot random generated image
plt.imshow(g.predict(np.random.randn(input_dim).reshape(1, -1))
           .reshape(28, 28))
plt.show()

In [ ]:
plt.imshow(true_sample[2].reshape(28, 28))
plt.show()

In [ ]:
gan.test_on_batch(noise, y)

In [ ]:
gan.train_on_batch(noise, y)

Wasserstein GAN


In [ ]:
from keras.layers import *
from keras.models import *
from keras.optimizers import *
from keras.initializers import *
from keras.callbacks import *
from keras.utils.generic_utils import Progbar

In [ ]:
noise_d = NoiseDistribution()

In [ ]:
input_dim = 100
img_shape = (28,28,1)
num_classes = 10

Generator Model


In [ ]:
# utility for the standard deconvolution block used in the generator
def generator_deconv_block(filters, block_input, kernel_size=(3, 3), strides=(1, 1)):
    block = UpSampling2D()(block_input)
    block = Convolution2D(filters, (3, 3), strides=strides, padding='same')(block)
    block = LeakyReLU()(block)
    block = BatchNormalization()(block)
    return block

In [ ]:
# different from basic DCGAN, this WGAN model
# takes as input both the prior sample (noise) and the image class
def generator_model(input_dim, voc_size, init_filters=128, init_side=7, num_deconv_blocks=2):
    # Input combination part
    input_class = Input(shape=(1, ), dtype='int32')
    e = Embedding(voc_size, input_dim)(input_class)
    embedded_class = Flatten()(e)
    # noise
    noise = Input(shape=(input_dim, ))
    # hadamard product
    h = multiply([noise, embedded_class])
    
    # CNN part
    x = Dense(1024)(h)
    x = LeakyReLU()(x)
    
    x = Dense(init_side*init_side*init_filters)(x)
    x = LeakyReLU()(x)
    x = BatchNormalization()(x)
    x = Reshape((init_side, init_side, init_filters))(x)

    for i in range(num_deconv_blocks):
        x = generator_deconv_block(init_filters//(2**(i+1)), block_input=x, kernel_size=(5, 5))

    x = Convolution2D(1, (2, 2), padding='same', activation='tanh')(x)
    
    return Model(inputs=[noise, input_class], outputs=x)

In [ ]:
# instantiate generate model
gen = generator_model(input_dim=input_dim, voc_size=10, init_filters=128)
gen.summary()

In [ ]:
# plot random generated image
plt.imshow(gen.predict([noise_d.sample((1, input_dim)), np.array([7])])[0]
           .reshape(28, 28))
plt.show()

Discriminator Model


In [ ]:
# utility for the standard convolution block used in the discriminator
def discriminator_conv_block(filters, block_input, kernel_size=(3, 3), strides=(1, 1), pool_size=None):
    block = Convolution2D(filters, kernel_size, strides=strides, padding='same')(block_input)
    block = LeakyReLU()(block)
    block = BatchNormalization()(block)
    
    # if given, add max pooling
    if pool_size:
        block = MaxPool2D(pool_size=pool_size)(block)
        
    return block

In [ ]:
# different from basic DCGAN, this WGAN discriminator model
# takes an image as input, and output both a prediction about image autheticity
# as well as one for the image class
def discriminator_model(input_shape, num_classes, init_filters=32, num_conv_blocks=3):
    input_image = Input(shape=input_shape)
    
    x = input_image
    for i in range(num_conv_blocks):
        x = discriminator_conv_block(init_filters*(2**i), block_input=x, pool_size=None)

    features = Flatten()(x)
    
    out_autheticity = Dense(1, activation='linear')(features)
    out_class = Dense(num_classes, activation='softmax')(features)
    
    return Model(inputs=[input_image], outputs=[out_autheticity, out_class])

In [ ]:
# instantiate discriminator model
dis = discriminator_model(input_shape=img_shape, num_classes=10, init_filters=32)
dis.summary()

In [ ]:
# print prediction for random image
dis.predict(gen.predict([noise_d.sample((1, input_dim)), np.array([3])]))

GAN Model


In [ ]:
# loss function for discriminator
def d_loss(y_true, y_pred):
    return K.mean(y_true * y_pred)

In [ ]:
# init GAN components
gen = generator_model(input_dim=input_dim, voc_size=num_classes, init_filters=128)
dis = discriminator_model(input_shape=img_shape, num_classes=num_classes, init_filters=32)

# compile discriminator
dis.compile(loss=[d_loss, 'sparse_categorical_crossentropy'], 
            optimizer=RMSprop(lr=1e-4))

In [ ]:
# Build adversarial model

noise = Input(shape=(input_dim, ))
input_class = Input(shape=(1, ), dtype='int32')
out_autheticity, out_class = dis(gen(inputs=[noise, input_class]))


gan = Model(inputs=[noise, input_class], outputs=[out_autheticity, out_class])
gan.compile(loss=[d_loss, 'sparse_categorical_crossentropy'], 
            optimizer=RMSprop(lr=1e-4))

In [ ]:
gan.summary()

Train


In [ ]:
# given that generator uses tanh activation function, 
# we need to process its output to make it a valid image
#deprocess = lambda img : np.transpose((img/2+0.5).clip(0,1), (1,2,0))
deprocess = lambda img : (img/2+0.5).clip(0,1)

In [ ]:
generator_fun = lambda num_samples : deprocess(gen.predict([noise_d.sample((num_samples, input_dim)),
                               np.random.randint(0, num_classes, num_samples)]))

In [ ]:
# load mnist data using Keras
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# normalize to -1..1 range and reshape
X_train = (X_train.astype(np.float32) - 127.5) / 127.5
X_train = np.expand_dims(X_train, axis=-1)
print(X_train.shape)
print(y_train.shape)

In [ ]:
def train_discriminator(dis, gen, noise_d, input_dim, num_classes, X_train, Y_train, batch_size, epoch):
    # clip weights
    for l in dis.layers:
        weights = l.get_weights()
        weights = [np.clip(w, -0.01, 0.01) for w in weights]
        l.set_weights(weights)
    
    # generate data
    # first we sample from the true distribution (mnist dataset), then we generate some
    # "fake" images by feeding noise to the generator
    
    # generate random indexes for train data
    batch_idxs = np.random.randint(0, len(X_train), batch_size)
    # collect images corresponsing to previously generated index, and add a dimension
    true_sample = X_train[batch_idxs]
    true_sample_classes = y_train[batch_idxs]
    
    # train on true samples
    dis_true_loss = dis.train_on_batch(true_sample, 
                                       [-np.ones(batch_size), true_sample_classes])
    
    # generate fake sample
    noise = noise_d.sample((batch_size, input_dim))
    generated_classes = np.random.randint(0, num_classes, batch_size)
    fake_sample = gen.predict([noise, generated_classes.reshape(-1, 1)])
    
    # train on fake samples
    dis_fake_loss = dis.train_on_batch(fake_sample, 
                                   [np.ones(batch_size), generated_classes])
    #print("Epoch {}: [D loss: {}]".format(epoch, d_loss))
    return dis_true_loss, dis_fake_loss

In [ ]:
def train(dis, gen, gan, noise_d, input_dim, num_classes, X_train, Y_train,
          batch_size=32, n_epochs=1, add_epochs=0):
    losses = {'gan':[], 'dis_fake_loss':[], 'dis_true_loss':[]}
    for epoch in tqdm(range(n_epochs), desc='Training GAN'):
        if (epoch+add_epochs % 1000) < 15 or epoch+add_epochs % 500 == 0: # 25 times in 1000, every 500th
            d_iters = 40
        else:
            d_iters = 5#D_ITERS
        
        # train discriminator
        set_trainable(dis, True, None, None)
        for d_epoch in range(d_iters):
            dis_true_loss, dis_fake_loss = train_discriminator(dis, gen, noise_d, input_dim, num_classes, 
                                         X_train, Y_train, batch_size, epoch)
            losses['dis_fake_loss'].append(dis_fake_loss)
            losses['dis_true_loss'].append(dis_true_loss)
        
        set_trainable(dis, False, None, None)

        # train GAN
        # feed noise to the model and expect true (1) response from discriminator,
        # which is in turn fed with data generated by the generator
        noise = noise_d.sample((batch_size, input_dim))
        generated_classes = np.random.randint(0, num_classes, batch_size)

        gan_loss = gan.train_on_batch(
            [noise, generated_classes.reshape((-1, 1))], 
            [-np.ones(batch_size), generated_classes])
        losses['gan'].append(gan_loss)

        #print("Epoch {}: [G loss: {}]".format(epoch, g_loss))

        if epoch%10 == 0:
            plot_sample_imgs(generator_fun, img_shape[:2], 
                             savepath=os.path.join(RES_DIR, 'data', 'mnist_wgan', 'mnist_gen{}.jpg'.format(epoch+add_epochs)))
    return losses

In [ ]:
add_epochs = 0

In [ ]:
#plt.ion()
plt.ioff()

n_epochs = 500
losses = train(dis, gen, gan, noise_d, input_dim, num_classes, 
               X_train, y_train, 
               batch_size=64, n_epochs=n_epochs, add_epochs=add_epochs)
add_epochs += n_epochs

Plot Losses


In [ ]:
def plot_losses(losses):
    f = plt.figure()
    #plt.plot(losses['dis_fake_loss'], label='dis_fake_loss')
    #plt.plot(losses['dis_fake_loss'], label='dis_true_loss')
    plt.plot(np.array(losses['gan'])[:,2], label='gan loss')
    plt.legend()
    plt.show()

In [ ]:
plot_losses(losses)

In [ ]:
np.array(losses['gan'])[:,0]

Generate gif


In [ ]:
import imageio
import os
RES_DIR = os.path.join(os.pardir, os.pardir, 'data', 'deep_learning')
dir_path = os.path.join(RES_DIR, 'data', 'mnist_wgan', 'test_2')
filenames = [(os.path.join(dir_path, filename), int(filename[9:-4])) for filename in os.listdir(dir_path)]
images = []
for filename in sorted(filenames, key=lambda x:x[1]):
    images.append(imageio.imread(filename[0]))
imageio.mimsave(os.path.join(RES_DIR, 'data', 'mnist_wgan', 'wgan.gif'), images)

In [ ]: