An example of distribution approximation using Generative Adversarial Networks in TensorFlow.
Based on the blog post by Eric Jang: http://blog.evjang.com/2016/06/generative-adversarial-nets-in.html, and of course the original GAN paper by Ian Goodfellow et. al.: https://arxiv.org/abs/1406.2661
In [1]:
import argparse
import numpy as np
from scipy.stats import norm
import tensorflow as tf
import matplotlib.pyplot as plt
from matplotlib import animation, rc
import seaborn as sns
from IPython.display import HTML
Fix the numpy and tensorflow random seeds so that the results are reproducible
In [2]:
seed = 42
np.random.seed(seed)
tf.set_random_seed(seed)
Create an object that produces the 'true' data distribution - this is the distribution that we will try and approximate with the generator
In [3]:
class DataDistribution(object):
def __init__(self):
self.mu = -1
self.sigma = 1
def sample(self, N):
samples = np.random.normal(self.mu, self.sigma, N)
samples.sort()
return samples
Create an object that produces the generator input noise distribution
In [4]:
class GeneratorDistribution(object):
def __init__(self, range):
self.range = range
def sample(self, N):
return np.linspace(-self.range, self.range, N) + \
np.random.random(N) * 0.01
Both the generator and discriminator need to be differentiable so that gradients can flow through the networks, and we can then train them using gradient descent. In the original GAN paper both networks were multilayer perceptrons, and so this is the network structure that we use here. Each MLP consists of 3 layers and uses the tanh nonlinearity.
In [5]:
def mlp(input, h_dim):
init_const = tf.constant_initializer(0.0)
init_norm = tf.random_normal_initializer()
w0 = tf.get_variable('w0', [input.get_shape()[1], h_dim], initializer=init_norm)
b0 = tf.get_variable('b0', [h_dim], initializer=init_const)
w1 = tf.get_variable('w1', [h_dim, h_dim], initializer=init_norm)
b1 = tf.get_variable('b1', [h_dim], initializer=init_const)
h0 = tf.tanh(tf.matmul(input, w0) + b0)
h1 = tf.tanh(tf.matmul(h0, w1) + b1)
return h1, [w0, b0, w1, b1]
def generator(input, h_dim):
transform, params = mlp(input, h_dim)
init_const = tf.constant_initializer(0.0)
init_norm = tf.random_normal_initializer()
w = tf.get_variable('g_w', [h_dim, 1], initializer=init_norm)
b = tf.get_variable('g_b', [1], initializer=init_const)
h = tf.matmul(transform, w) + b
return h, params + [w, b]
def discriminator(input, h_dim):
transform, params = mlp(input, h_dim)
init_const = tf.constant_initializer(0.0)
init_norm = tf.random_normal_initializer()
w = tf.get_variable('d_w', [h_dim, 1], initializer=init_norm)
b = tf.get_variable('d_b', [1], initializer=init_const)
h = tf.sigmoid(tf.matmul(transform, w) + b)
return h, params + [w, b]
Next we create a gradient descent optimizer (using exponential learning rate decay). The same optimizer parameters are used when training both the discriminator and generator networks.
In [6]:
def optimizer(loss, var_list, num_epochs):
initial_learning_rate = 0.01
decay = 0.95
num_decay_steps = num_epochs // 4
batch = tf.Variable(0)
learning_rate = tf.train.exponential_decay(
initial_learning_rate,
batch,
num_decay_steps,
decay,
staircase=True
)
optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(
loss,
global_step=batch,
var_list=var_list
)
return optimizer
Called every anim_frame_every
epochs to capture a single snapshot of p(d), discriminator's decision boundary and p(g):
In [7]:
anim_frames = []
def plot_distributions(GAN, session, loss_d, loss_g):
num_points = 100000
num_bins = 100
xs = np.linspace(-GAN.gen.range, GAN.gen.range, num_points)
bins = np.linspace(-GAN.gen.range, GAN.gen.range, num_bins)
# p(data)
d_sample = GAN.data.sample(num_points)
# decision boundary
ds = np.zeros((num_points, 1)) # decision surface
for i in range(num_points // GAN.batch_size):
ds[GAN.batch_size * i:GAN.batch_size * (i + 1)] = session.run(GAN.D1, {
GAN.x: np.reshape(xs[GAN.batch_size * i:GAN.batch_size * (i + 1)], (GAN.batch_size, 1))
})
# p(generator)
zs = np.linspace(-GAN.gen.range, GAN.gen.range, num_points)
gs = np.zeros((num_points, 1)) # generator function
for i in range(num_points // GAN.batch_size):
gs[GAN.batch_size * i:GAN.batch_size * (i + 1)] = session.run(GAN.G, {
GAN.z: np.reshape(
zs[GAN.batch_size * i:GAN.batch_size * (i + 1)],
(GAN.batch_size, 1)
)
})
anim_frames.append((d_sample, ds, gs, loss_d, loss_g))
In [8]:
class GAN(object):
def __init__(self, data, gen, num_epochs):
self.data = data
self.gen = gen
self.num_epochs = num_epochs
self.log_every = 100
self.anim_frame_every = 5
self.batch_size = 128
self.mlp_hidden_size = 4
self._create_model()
def _create_model(self):
# In order to make sure that the discriminator is providing useful gradient
# information to the generator from the start, we're going to pretrain the
# discriminator using a maximum likelihood objective. We define the network
# for this pretraining step scoped as D_pre.
with tf.variable_scope('D_pre'):
self.pre_input = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
self.pre_labels = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
D, self.pre_theta = discriminator(self.pre_input, self.mlp_hidden_size)
self.pre_loss = tf.reduce_mean(tf.square(D - self.pre_labels))
self.pre_opt = optimizer(self.pre_loss, None, self.num_epochs)
# This defines the generator network - it takes samples from a noise
# distribution as input, and passes them through an MLP.
with tf.variable_scope('G'):
self.z = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
self.G, theta_g = generator(self.z, self.mlp_hidden_size)
# The discriminator tries to tell the difference between samples from the
# true data distribution (self.x) and the generated samples (self.z).
#
# Here we create two copies of the discriminator network (that share parameters),
# as you cannot use the same network with different inputs in TensorFlow.
with tf.variable_scope('D') as scope:
self.x = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
self.D1, self.theta_d1 = discriminator(self.x, self.mlp_hidden_size)
scope.reuse_variables()
self.D2, self.theta_d2 = discriminator(self.G, self.mlp_hidden_size)
# Define the loss for discriminator and generator networks (see the original
# paper for details), and create optimizers for both
self.loss_d = tf.reduce_mean(-tf.log(self.D1) - tf.log(1 - self.D2))
self.loss_g = tf.reduce_mean(-tf.log(self.D2))
self.opt_d = optimizer(self.loss_d, self.theta_d2, self.num_epochs)
self.opt_g = optimizer(self.loss_g, theta_g, self.num_epochs)
def train(self):
with tf.Session() as session:
tf.initialize_all_variables().run()
# discriminator pretraining
num_pretrain_steps = 1000
for step in xrange(num_pretrain_steps):
d = (np.random.random(self.batch_size) - 0.5) * 10.0
labels = norm.pdf(d, loc=self.data.mu, scale=self.data.sigma)
pretrain_loss, _ = session.run([self.pre_loss, self.pre_opt], {
self.pre_input: np.reshape(d, (self.batch_size, 1)),
self.pre_labels: np.reshape(labels, (self.batch_size, 1))
})
self.weightsD = session.run(self.pre_theta)
# copy weights from pre-training over to new D network
for i, v in enumerate(self.theta_d2):
session.run(v.assign(self.weightsD[i]))
for step in xrange(self.num_epochs):
# update discriminator
x = self.data.sample(self.batch_size)
z = self.gen.sample(self.batch_size)
loss_d, _ = session.run([self.loss_d, self.opt_d], {
self.x: np.reshape(x, (self.batch_size, 1)),
self.z: np.reshape(z, (self.batch_size, 1))
})
# update generator
z = self.gen.sample(self.batch_size)
loss_g, _ = session.run([self.loss_g, self.opt_g], {
self.z: np.reshape(z, (self.batch_size, 1))
})
if step % self.log_every == 0:
print('{}: {}\t{}'.format(step, loss_d, loss_g))
if step % self.anim_frame_every == 0:
plot_distributions(self, session, loss_d, loss_g)
In [9]:
num_epochs = 1000
model = GAN(DataDistribution(), GeneratorDistribution(range=8), num_epochs)
model.train()
Setting up the plot and initiating three separate lines for p(d), discriminator's decision boundary and p(g):
In [10]:
f, ax = plt.subplots(figsize=(6,4))
f.suptitle('1D Generative Adversarial Network', fontsize=15)
plt.ylabel('Probability')
ax.set_xlim(-6, 6)
ax.set_ylim(0, 1.4)
line_d, = ax.plot([], [], label='p_d')
line_ds, = ax.plot([], [], label='decision boundary')
line_g, = ax.plot([], [], label='p_g')
frame_text = ax.text(0.02, 0.95,'',horizontalalignment='left',verticalalignment='top', transform=ax.transAxes)
ax.legend()
Out[10]:
Animating the frames we captured during training:
In [11]:
def init():
line_d.set_data([],[])
line_ds.set_data([],[])
line_g.set_data([],[])
frame_text.set_text('Start')
return (line_d,) + (line_ds,) + (line_g,) + (frame_text,)
def animate(i):
bins = np.linspace(-5, 5, 50)
x = np.linspace(-5, 5, 100000)
histd, _ = np.histogram(anim_frames[i][0], bins=bins, normed=True)
line_d.set_data(bins[1:], histd)
ds = anim_frames[i][1]
line_ds.set_data(x, ds)
histg, _ = np.histogram(anim_frames[i][2], bins=bins, normed=True)
line_g.set_data(bins[1:], histg)
frame_text.set_text('Timestep = %.1d/%.1d' % (i, len(anim_frames)))
return (line_d,) + (line_ds,) + (line_g,) + (frame_text,)
anim = animation.FuncAnimation(f, animate, init_func=init,
frames=len(anim_frames), blit=True)
HTML(anim.to_html5_video())
Out[11]:
Now let's visualize the training loss as well:
In [12]:
f_loss, ax_loss = plt.subplots(figsize=(6,3))
f_loss.suptitle('Training Loss', fontsize=15)
plt.xlabel('Epochs')
plt.ylabel('Loss')
ax_loss.set_xlim(-100, num_epochs)
ax_loss.set_ylim(0.5, 2)
line_loss_d, = ax_loss.plot([], [], label='loss_d')
line_loss_g, = ax_loss.plot([], [], label='loss_g')
ax_loss.legend()
Out[12]:
In [13]:
def init_loss():
line_loss_d.set_data([],[])
line_loss_g.set_data([],[])
frame_text.set_text('Start')
return (line_loss_d,) + (line_loss_g,)
def animate_loss(i):
xs=[]
ys_d=[]
ys_g=[]
for point in range(0, i):
xs.append(point*5)
ys_d.append(anim_frames[point][3])
ys_g.append(anim_frames[point][4])
line_loss_d.set_data(xs, ys_d)
line_loss_g.set_data(xs, ys_g)
return (line_loss_d,) + (line_loss_g,)
anim_loss = animation.FuncAnimation(f_loss, animate_loss, init_func=init_loss,
frames=len(anim_frames), blit=True)
HTML(anim_loss.to_html5_video())
Out[13]:
Generative models are an active area of research for us here at AYLIEN. Here are some open challenges that we've encountered, and encourage you to work on if you're interested in this line of work:
If you have interesting solutions for any of these, we would love to hear from you, so please feel free to submit a pull request, or reach out to us directly at hello@aylien.com.