Exploratory notebook related to Generative Adversarial Networks (GANs). Includes toy examples implementation and testing of related techniques or subjects.
Architecture that learns by posing two networks in competition with each others. Goal is to learn parameters in order to produce a distribution close to our dataset distribution (true distribution).
In [ ]:
import time
from PIL import Image
import numpy as np
import pdb
import os
import sys
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import animation
from keras.models import Sequential
from keras.layers.core import Reshape, Dense, Dropout, Activation, Flatten
from keras.layers.convolutional import Convolution2D, MaxPooling2D, ZeroPadding2D, UpSampling2D
from keras.layers.advanced_activations import LeakyReLU
from keras import backend as K
from keras import optimizers
from keras.layers.normalization import BatchNormalization
from keras.datasets import mnist
from tqdm import tqdm_notebook as tqdm
%matplotlib notebook
sns.set_style("dark")
sys.path.append(os.path.join(os.getcwd(), os.pardir))
from utils.plot_utils import plot_sample_imgs
from utils.generative_utils import NoiseDistribution, set_trainable
RES_DIR = os.path.join(*[os.pardir]*2, 'data', 'deep_learning')
%load_ext autoreload
%autoreload 2
Example adapted from Aylien blog.
Check also here for Keras code
In [ ]:
# target 1D gaussian distribution class
class GaussianDistribution:
def __init__(self, mu=4, sigma=0.5):
self.mu = mu
self.sigma = sigma
def sample(self, N):
samples = np.random.normal(self.mu, self.sigma, N)
samples.sort()
return samples
In [ ]:
# generator input noise distribution class
class GeneratorNoiseDistribution:
def __init__(self, vrange):
self.vrange = vrange
def sample(self, N):
return np.linspace(-self.vrange, self.vrange, N) + \
np.random.random(N) * 0.01
In [ ]:
def generator(input_dim, hidden_size):
g = Sequential()
g.add(Dense(hidden_size, input_dim=input_dim, activation=K.softplus))
g.add(Dense(input_dim))
return g
In [ ]:
def discriminator(input_dim, hidden_size):
d = Sequential()
d.add(Dense(hidden_size*2, input_dim=input_dim, activation=K.tanh))
d.add(Dense(hidden_size*2, activation=K.tanh))
d.add(Dense(hidden_size*2, activation=K.tanh))
d.add(Dense(1, activation=K.sigmoid))
return d
In [ ]:
# init distributions
gaussian_d = GaussianDistribution()
generator_d = GeneratorNoiseDistribution(8)
In [ ]:
# init GAN components
d = discriminator(1, 128)
g = generator(1, 128)
In [ ]:
# discriminator model
optimizer = optimizers.RMSprop(lr=0.0008, clipvalue=1.0, decay=6e-8)
discriminator_model = d
discriminator_model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
In [ ]:
# adversarial model
optimizer = optimizers.RMSprop(lr=0.0004, clipvalue=1.0, decay=3e-8)
adversarial_model = Sequential()
adversarial_model.add(g)
adversarial_model.add(d)
adversarial_model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
In [ ]:
batch_size = 64
fig, ax = plt.subplots(dpi=100, figsize=(5, 4))
true_dist = np.reshape(gaussian_d.sample(1000), (1000, 1))
plt.show()
def animate(step):
#for step in range(100):
# generate data
# first we sample from the true distribution, then we generate some
# "fake" data by feeding noise to the generator
true_sample = np.reshape(gaussian_d.sample(batch_size), (batch_size, 1))
noise = generator_d.sample(batch_size)
fake_sample = g.predict(noise)
#pdb.set_trace()
# train discriminator
# feed true and fake samples with respective labels (1, 0) to the discriminator
x = np.reshape(np.concatenate((true_sample, fake_sample)), (batch_size*2, 1))
y = np.ones([batch_size*2, 1])
y[batch_size:, :] = 0
d_loss = discriminator_model.train_on_batch(x, y)
# train GAN
# feed noise to the model and expect true (1) response from discriminator,
# which is in turn fed with data generated by the generator
noise = np.reshape(generator_d.sample(batch_size), (batch_size, 1))
y = np.ones([batch_size, 1])
a_loss = adversarial_model.train_on_batch(noise, y)
log_mesg = "%d: [D loss: %f, acc: %f]" % (step, d_loss[0], d_loss[1])
log_mesg = "%s [A loss: %f, acc: %f]" % (log_mesg, a_loss[0], a_loss[1])
# plot
fig.clf()
fake = sns.distplot(fake_sample)
fake.set_xlim([0,8])
fake.set_ylim([0,3])
sns.distplot(true_dist)
sns.plt.text(3, 2, "Epoch {}, a_loss {:.3f}".format(step, a_loss[0]))
anim = animation.FuncAnimation(fig, animate, 200, repeat=False)
In [ ]:
noise = generator_d.sample(batch_size)
fake_sample = g.predict(noise)
In [ ]:
sns.distplot(fake_sample)
sns.plt.show()
Example adapted from MNIST Generative Adversarial Model in Keras
In [ ]:
noise_d = NoiseDistribution()
In [ ]:
input_dim = 100
img_shape = (28,28,1)
In [ ]:
# model takes real values vector of size input_dim and via upsampling,
# reshaping, and various convolutional filters generates a 28x28 b/w image
def generator_model(input_dim, n_channels=128, init_side=7):
m = Sequential()
m.add(Dense(init_side*init_side*n_channels, input_dim=input_dim, activation=LeakyReLU()))
m.add(BatchNormalization(mode=2))
m.add(Reshape((init_side, init_side, n_channels)))
m.add(UpSampling2D())
m.add(Convolution2D(n_channels//2, 3, 3, border_mode='same', activation=LeakyReLU()))
m.add(BatchNormalization(mode=2))
m.add(UpSampling2D())
m.add(Convolution2D(n_channels//4, 3, 3, border_mode='same', activation=LeakyReLU()))
m.add(BatchNormalization(mode=2))
#?? Tanh
m.add(Convolution2D(1, 1, 1, border_mode='same', activation='sigmoid'))
return m
In [ ]:
g = generator_model(input_dim=input_dim, n_channels=512)
g.summary()
In [ ]:
# plot random generated image
plt.imshow(g.predict(noise_d.sample((1, input_dim)))[0]
.reshape(28, 28))
plt.show()
In [ ]:
# model takes image and after convolution and flattening
# outputs a probability value
def discriminator_model(input_shape, init_filters=64):
m = Sequential()
m.add(Convolution2D(init_filters, 5, 5, subsample=(2, 2), input_shape=input_shape, border_mode='same',
activation=LeakyReLU(0.2)))
#?? maxpooling and dropout? MaxPool2D(pool_size=2)
m.add(Convolution2D(init_filters*2, 5, 5, subsample=(2, 2), border_mode='same',
activation=LeakyReLU(0.2)))
#m.add(Convolution2D(init_filters*4, 3, 5, border_mode='same',
# activation=LeakyReLU(0.2)))
m.add(Flatten())
m.add(Dense(256, activation=LeakyReLU()))
m.add(Dense(1, activation='sigmoid'))
return m
In [ ]:
d = discriminator_model(input_shape=(28,28,1), init_filters=256)
d.summary()
In [ ]:
# print prediction for random image
d.predict(g.predict(noise_d.sample((1, input_dim))))
In [ ]:
# init GAN components
g = generator_model(input_dim)
d = discriminator_model(img_shape)
# compile generator
#g_optimizer = optimizers.Adam(lr=0.0001)
#g.compile(loss='binary_crossentropy', optimizer=g_optimizer)
# compile discriminator
d_optimizer = optimizers.Adam(lr=0.001)
d.compile(loss='binary_crossentropy', optimizer=d_optimizer)
In [ ]:
# build adversarial model
gan = Sequential()
gan.add(g)
gan.add(d)
gan_optimizer = optimizers.Adam(lr=0.0001)
gan.compile(loss='binary_crossentropy', optimizer=gan_optimizer)
In [ ]:
gan.summary()
In [ ]:
generator_fun = lambda num_samples : generator.predict(noise_d.sample((num_samples, input_dim)))
In [ ]:
# load mnist data using Keras
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# reshape and normalize train data
X_train = np.expand_dims(X_train, axis=-1)
X_train = X_train.astype('float32')/255
print(X_train.shape)
print(y_train.shape)
In [ ]:
def train_discriminator(d, g, noise_d, input_dim, X_train, batch_size, epoch):
# generate data
# first we sample from the true distribution (mnist dataset), then we generate some
# "fake" images by feeding noise to the generator
# generate random indexes for train data
batch_idxs = np.random.randint(0, len(X_train), batch_size)
# collect images corresponsing to previously generated index, and add a dimension
true_sample = X_train[batch_idxs,:,:,:]
# generate fake sample
fake_sample = g.predict(noise_d.sample((batch_size, input_dim)))
# prepare train batch data
# concatenativ true and fake samples and adjusting labels accordingly
x = np.concatenate((true_sample, fake_sample))
y = np.ones([batch_size*2, 1])
y[batch_size:,:] = 0
# train discriminator
# feed true and fake samples with respective labels (1, 0) to the discriminator
set_trainable(d, True, None, None)
d_loss = d.train_on_batch(x, y)
#print("Epoch {}: [D loss: {}]".format(epoch, d_loss))
return d_loss
In [ ]:
def train(d, g, gan, noise_d, input_dim, X_train, batch_size=32, n_epochs=1, add_epoch=0):
losses = {'g':[], 'd':[]}
for epoch in range(n_epochs):
# train discriminator
d_loss = train_discriminator(d, g, noise_d, input_dim, X_train, batch_size, epoch)
losses['d'].append(d_loss)
set_trainable(d, False, None, None)
# train GAN
# feed noise to the model and expect true (1) response from discriminator,
# which is in turn fed with data generated by the generator
noise = noise_d.sample((batch_size, input_dim))
y = np.ones([batch_size, 1])
g_loss = gan.train_on_batch(noise, y)
losses['g'].append(g_loss)
#print("Epoch {}: [G loss: {}]".format(epoch, g_loss))
if (epoch%10)==0:
plot_sample_imgs(generator_fun, img_shape[:2], savepath=os.path.join('data', 'mnist_gan', 'mnist_gen{}.jpg'.format(epoch+add_epoch)))
return losses
In [ ]:
# pretrain discriminator
batch_size = 128
n_epochs = 1
for epoch in range(n_epochs):
train_discriminator(d, g, noise_d, input_dim, X_train, batch_size, epoch)
In [ ]:
#plt.ion()
plt.ioff()
K.set_value(d.optimizer.lr, 1e-3)
K.set_value(gan.optimizer.lr, 1e-3)
losses = train(d, g, gan, noise_d, input_dim, X_train,
batch_size=256, n_epochs=1000, add_epoch=120)
In [ ]:
# plot random generated image
plt.imshow(g.predict(np.random.randn(input_dim).reshape(1, -1))
.reshape(28, 28))
plt.show()
In [ ]:
plt.imshow(true_sample[2].reshape(28, 28))
plt.show()
In [ ]:
gan.test_on_batch(noise, y)
In [ ]:
gan.train_on_batch(noise, y)
In [ ]:
from keras.layers import *
from keras.models import *
from keras.optimizers import *
from keras.initializers import *
from keras.callbacks import *
from keras.utils.generic_utils import Progbar
In [ ]:
noise_d = NoiseDistribution()
In [ ]:
input_dim = 100
img_shape = (28,28,1)
num_classes = 10
In [ ]:
# utility for the standard deconvolution block used in the generator
def generator_deconv_block(filters, block_input, kernel_size=(3, 3), strides=(1, 1)):
block = UpSampling2D()(block_input)
block = Convolution2D(filters, (3, 3), strides=strides, padding='same')(block)
block = LeakyReLU()(block)
block = BatchNormalization()(block)
return block
In [ ]:
# different from basic DCGAN, this WGAN model
# takes as input both the prior sample (noise) and the image class
def generator_model(input_dim, voc_size, init_filters=128, init_side=7, num_deconv_blocks=2):
# Input combination part
input_class = Input(shape=(1, ), dtype='int32')
e = Embedding(voc_size, input_dim)(input_class)
embedded_class = Flatten()(e)
# noise
noise = Input(shape=(input_dim, ))
# hadamard product
h = multiply([noise, embedded_class])
# CNN part
x = Dense(1024)(h)
x = LeakyReLU()(x)
x = Dense(init_side*init_side*init_filters)(x)
x = LeakyReLU()(x)
x = BatchNormalization()(x)
x = Reshape((init_side, init_side, init_filters))(x)
for i in range(num_deconv_blocks):
x = generator_deconv_block(init_filters//(2**(i+1)), block_input=x, kernel_size=(5, 5))
x = Convolution2D(1, (2, 2), padding='same', activation='tanh')(x)
return Model(inputs=[noise, input_class], outputs=x)
In [ ]:
# instantiate generate model
gen = generator_model(input_dim=input_dim, voc_size=10, init_filters=128)
gen.summary()
In [ ]:
# plot random generated image
plt.imshow(gen.predict([noise_d.sample((1, input_dim)), np.array([7])])[0]
.reshape(28, 28))
plt.show()
In [ ]:
# utility for the standard convolution block used in the discriminator
def discriminator_conv_block(filters, block_input, kernel_size=(3, 3), strides=(1, 1), pool_size=None):
block = Convolution2D(filters, kernel_size, strides=strides, padding='same')(block_input)
block = LeakyReLU()(block)
block = BatchNormalization()(block)
# if given, add max pooling
if pool_size:
block = MaxPool2D(pool_size=pool_size)(block)
return block
In [ ]:
# different from basic DCGAN, this WGAN discriminator model
# takes an image as input, and output both a prediction about image autheticity
# as well as one for the image class
def discriminator_model(input_shape, num_classes, init_filters=32, num_conv_blocks=3):
input_image = Input(shape=input_shape)
x = input_image
for i in range(num_conv_blocks):
x = discriminator_conv_block(init_filters*(2**i), block_input=x, pool_size=None)
features = Flatten()(x)
out_autheticity = Dense(1, activation='linear')(features)
out_class = Dense(num_classes, activation='softmax')(features)
return Model(inputs=[input_image], outputs=[out_autheticity, out_class])
In [ ]:
# instantiate discriminator model
dis = discriminator_model(input_shape=img_shape, num_classes=10, init_filters=32)
dis.summary()
In [ ]:
# print prediction for random image
dis.predict(gen.predict([noise_d.sample((1, input_dim)), np.array([3])]))
In [ ]:
# loss function for discriminator
def d_loss(y_true, y_pred):
return K.mean(y_true * y_pred)
In [ ]:
# init GAN components
gen = generator_model(input_dim=input_dim, voc_size=num_classes, init_filters=128)
dis = discriminator_model(input_shape=img_shape, num_classes=num_classes, init_filters=32)
# compile discriminator
dis.compile(loss=[d_loss, 'sparse_categorical_crossentropy'],
optimizer=RMSprop(lr=1e-4))
In [ ]:
# Build adversarial model
noise = Input(shape=(input_dim, ))
input_class = Input(shape=(1, ), dtype='int32')
out_autheticity, out_class = dis(gen(inputs=[noise, input_class]))
gan = Model(inputs=[noise, input_class], outputs=[out_autheticity, out_class])
gan.compile(loss=[d_loss, 'sparse_categorical_crossentropy'],
optimizer=RMSprop(lr=1e-4))
In [ ]:
gan.summary()
In [ ]:
# given that generator uses tanh activation function,
# we need to process its output to make it a valid image
#deprocess = lambda img : np.transpose((img/2+0.5).clip(0,1), (1,2,0))
deprocess = lambda img : (img/2+0.5).clip(0,1)
In [ ]:
generator_fun = lambda num_samples : deprocess(gen.predict([noise_d.sample((num_samples, input_dim)),
np.random.randint(0, num_classes, num_samples)]))
In [ ]:
# load mnist data using Keras
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# normalize to -1..1 range and reshape
X_train = (X_train.astype(np.float32) - 127.5) / 127.5
X_train = np.expand_dims(X_train, axis=-1)
print(X_train.shape)
print(y_train.shape)
In [ ]:
def train_discriminator(dis, gen, noise_d, input_dim, num_classes, X_train, Y_train, batch_size, epoch):
# clip weights
for l in dis.layers:
weights = l.get_weights()
weights = [np.clip(w, -0.01, 0.01) for w in weights]
l.set_weights(weights)
# generate data
# first we sample from the true distribution (mnist dataset), then we generate some
# "fake" images by feeding noise to the generator
# generate random indexes for train data
batch_idxs = np.random.randint(0, len(X_train), batch_size)
# collect images corresponsing to previously generated index, and add a dimension
true_sample = X_train[batch_idxs]
true_sample_classes = y_train[batch_idxs]
# train on true samples
dis_true_loss = dis.train_on_batch(true_sample,
[-np.ones(batch_size), true_sample_classes])
# generate fake sample
noise = noise_d.sample((batch_size, input_dim))
generated_classes = np.random.randint(0, num_classes, batch_size)
fake_sample = gen.predict([noise, generated_classes.reshape(-1, 1)])
# train on fake samples
dis_fake_loss = dis.train_on_batch(fake_sample,
[np.ones(batch_size), generated_classes])
#print("Epoch {}: [D loss: {}]".format(epoch, d_loss))
return dis_true_loss, dis_fake_loss
In [ ]:
def train(dis, gen, gan, noise_d, input_dim, num_classes, X_train, Y_train,
batch_size=32, n_epochs=1, add_epochs=0):
losses = {'gan':[], 'dis_fake_loss':[], 'dis_true_loss':[]}
for epoch in tqdm(range(n_epochs), desc='Training GAN'):
if (epoch+add_epochs % 1000) < 15 or epoch+add_epochs % 500 == 0: # 25 times in 1000, every 500th
d_iters = 40
else:
d_iters = 5#D_ITERS
# train discriminator
set_trainable(dis, True, None, None)
for d_epoch in range(d_iters):
dis_true_loss, dis_fake_loss = train_discriminator(dis, gen, noise_d, input_dim, num_classes,
X_train, Y_train, batch_size, epoch)
losses['dis_fake_loss'].append(dis_fake_loss)
losses['dis_true_loss'].append(dis_true_loss)
set_trainable(dis, False, None, None)
# train GAN
# feed noise to the model and expect true (1) response from discriminator,
# which is in turn fed with data generated by the generator
noise = noise_d.sample((batch_size, input_dim))
generated_classes = np.random.randint(0, num_classes, batch_size)
gan_loss = gan.train_on_batch(
[noise, generated_classes.reshape((-1, 1))],
[-np.ones(batch_size), generated_classes])
losses['gan'].append(gan_loss)
#print("Epoch {}: [G loss: {}]".format(epoch, g_loss))
if epoch%10 == 0:
plot_sample_imgs(generator_fun, img_shape[:2],
savepath=os.path.join(RES_DIR, 'data', 'mnist_wgan', 'mnist_gen{}.jpg'.format(epoch+add_epochs)))
return losses
In [ ]:
add_epochs = 0
In [ ]:
#plt.ion()
plt.ioff()
n_epochs = 500
losses = train(dis, gen, gan, noise_d, input_dim, num_classes,
X_train, y_train,
batch_size=64, n_epochs=n_epochs, add_epochs=add_epochs)
add_epochs += n_epochs
In [ ]:
def plot_losses(losses):
f = plt.figure()
#plt.plot(losses['dis_fake_loss'], label='dis_fake_loss')
#plt.plot(losses['dis_fake_loss'], label='dis_true_loss')
plt.plot(np.array(losses['gan'])[:,2], label='gan loss')
plt.legend()
plt.show()
In [ ]:
plot_losses(losses)
In [ ]:
np.array(losses['gan'])[:,0]
In [ ]:
import imageio
import os
RES_DIR = os.path.join(os.pardir, os.pardir, 'data', 'deep_learning')
dir_path = os.path.join(RES_DIR, 'data', 'mnist_wgan', 'test_2')
filenames = [(os.path.join(dir_path, filename), int(filename[9:-4])) for filename in os.listdir(dir_path)]
images = []
for filename in sorted(filenames, key=lambda x:x[1]):
images.append(imageio.imread(filename[0]))
imageio.mimsave(os.path.join(RES_DIR, 'data', 'mnist_wgan', 'wgan.gif'), images)
In [ ]: