An example of distribution approximation using Generative Adversarial Networks in TensorFlow.

Based on the blog post by Eric Jang: http://blog.evjang.com/2016/06/generative-adversarial-nets-in.html, and of course the original GAN paper by Ian Goodfellow et. al.: https://arxiv.org/abs/1406.2661


In [1]:
import argparse
import numpy as np
from scipy.stats import norm
import tensorflow as tf
import matplotlib.pyplot as plt
from matplotlib import animation, rc
import seaborn as sns
from IPython.display import HTML


/usr/local/lib/python2.7/site-packages/IPython/html.py:14: ShimWarning: The `IPython.html` package has been deprecated. You should import from `notebook` instead. `IPython.html.widgets` has moved to `ipywidgets`.
  "`IPython.html.widgets` has moved to `ipywidgets`.", ShimWarning)

Fix the numpy and tensorflow random seeds so that the results are reproducible


In [2]:
seed = 42
np.random.seed(seed)
tf.set_random_seed(seed)

Create an object that produces the 'true' data distribution - this is the distribution that we will try and approximate with the generator


In [3]:
class DataDistribution(object):
    def __init__(self):
        self.mu = -1
        self.sigma = 1

    def sample(self, N):
        samples = np.random.normal(self.mu, self.sigma, N)
        samples.sort()
        return samples

Create an object that produces the generator input noise distribution


In [4]:
class GeneratorDistribution(object):
    def __init__(self, range):
        self.range = range

    def sample(self, N):
        return np.linspace(-self.range, self.range, N) + \
            np.random.random(N) * 0.01

Both the generator and discriminator need to be differentiable so that gradients can flow through the networks, and we can then train them using gradient descent. In the original GAN paper both networks were multilayer perceptrons, and so this is the network structure that we use here. Each MLP consists of 3 layers and uses the tanh nonlinearity.


In [5]:
def mlp(input, h_dim):
    init_const = tf.constant_initializer(0.0)
    init_norm = tf.random_normal_initializer()
    w0 = tf.get_variable('w0', [input.get_shape()[1], h_dim], initializer=init_norm)
    b0 = tf.get_variable('b0', [h_dim], initializer=init_const)
    w1 = tf.get_variable('w1', [h_dim, h_dim], initializer=init_norm)
    b1 = tf.get_variable('b1', [h_dim], initializer=init_const)
    h0 = tf.tanh(tf.matmul(input, w0) + b0)
    h1 = tf.tanh(tf.matmul(h0, w1) + b1)
    return h1, [w0, b0, w1, b1]


def generator(input, h_dim):
    transform, params = mlp(input, h_dim)
    init_const = tf.constant_initializer(0.0)
    init_norm = tf.random_normal_initializer()
    w = tf.get_variable('g_w', [h_dim, 1], initializer=init_norm)
    b = tf.get_variable('g_b', [1], initializer=init_const)
    h = tf.matmul(transform, w) + b
    return h, params + [w, b]


def discriminator(input, h_dim):
    transform, params = mlp(input, h_dim)
    init_const = tf.constant_initializer(0.0)
    init_norm = tf.random_normal_initializer()
    w = tf.get_variable('d_w', [h_dim, 1], initializer=init_norm)
    b = tf.get_variable('d_b', [1], initializer=init_const)
    h = tf.sigmoid(tf.matmul(transform, w) + b)
    return h, params + [w, b]

Next we create a gradient descent optimizer (using exponential learning rate decay). The same optimizer parameters are used when training both the discriminator and generator networks.


In [6]:
def optimizer(loss, var_list, num_epochs):
    initial_learning_rate = 0.01
    decay = 0.95
    num_decay_steps = num_epochs // 4
    batch = tf.Variable(0)
    learning_rate = tf.train.exponential_decay(
        initial_learning_rate,
        batch,
        num_decay_steps,
        decay,
        staircase=True
    )
    optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(
        loss,
        global_step=batch,
        var_list=var_list
    )
    return optimizer

Called every anim_frame_every epochs to capture a single snapshot of p(d), discriminator's decision boundary and p(g):


In [7]:
anim_frames = []

def plot_distributions(GAN, session, loss_d, loss_g):
    num_points = 100000
    num_bins = 100
    xs = np.linspace(-GAN.gen.range, GAN.gen.range, num_points)
    bins = np.linspace(-GAN.gen.range, GAN.gen.range, num_bins)

    # p(data)
    d_sample = GAN.data.sample(num_points)

    # decision boundary
    ds = np.zeros((num_points, 1))  # decision surface
    for i in range(num_points // GAN.batch_size):
        ds[GAN.batch_size * i:GAN.batch_size * (i + 1)] = session.run(GAN.D1, {
            GAN.x: np.reshape(xs[GAN.batch_size * i:GAN.batch_size * (i + 1)], (GAN.batch_size, 1))
        })

    # p(generator)
    zs = np.linspace(-GAN.gen.range, GAN.gen.range, num_points)
    gs = np.zeros((num_points, 1))  # generator function
    for i in range(num_points // GAN.batch_size):
        gs[GAN.batch_size * i:GAN.batch_size * (i + 1)] = session.run(GAN.G, {
            GAN.z: np.reshape(
                zs[GAN.batch_size * i:GAN.batch_size * (i + 1)],
                (GAN.batch_size, 1)
            )
        })
           
    anim_frames.append((d_sample, ds, gs, loss_d, loss_g))

Building the GAN


In [8]:
class GAN(object):
    def __init__(self, data, gen, num_epochs):
        self.data = data
        self.gen = gen
        self.num_epochs = num_epochs
        self.log_every = 100
        self.anim_frame_every = 5
        self.batch_size = 128
        self.mlp_hidden_size = 4
        self._create_model()

    def _create_model(self):
        # In order to make sure that the discriminator is providing useful gradient
        # information to the generator from the start, we're going to pretrain the
        # discriminator using a maximum likelihood objective. We define the network
        # for this pretraining step scoped as D_pre.
        with tf.variable_scope('D_pre'):
            self.pre_input = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            self.pre_labels = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            D, self.pre_theta = discriminator(self.pre_input, self.mlp_hidden_size)
            self.pre_loss = tf.reduce_mean(tf.square(D - self.pre_labels))
            self.pre_opt = optimizer(self.pre_loss, None, self.num_epochs)
        
        # This defines the generator network - it takes samples from a noise
        # distribution as input, and passes them through an MLP.
        with tf.variable_scope('G'):
            self.z = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            self.G, theta_g = generator(self.z, self.mlp_hidden_size)

        # The discriminator tries to tell the difference between samples from the
        # true data distribution (self.x) and the generated samples (self.z).
        #
        # Here we create two copies of the discriminator network (that share parameters),
        # as you cannot use the same network with different inputs in TensorFlow.
        with tf.variable_scope('D') as scope:
            self.x = tf.placeholder(tf.float32, shape=(self.batch_size, 1))
            self.D1, self.theta_d1 = discriminator(self.x, self.mlp_hidden_size)
            scope.reuse_variables()
            self.D2, self.theta_d2 = discriminator(self.G, self.mlp_hidden_size)

        # Define the loss for discriminator and generator networks (see the original
        # paper for details), and create optimizers for both
        self.loss_d = tf.reduce_mean(-tf.log(self.D1) - tf.log(1 - self.D2))
        self.loss_g = tf.reduce_mean(-tf.log(self.D2))

        self.opt_d = optimizer(self.loss_d, self.theta_d2, self.num_epochs)
        self.opt_g = optimizer(self.loss_g, theta_g, self.num_epochs)
    
    def train(self):
        with tf.Session() as session:
            tf.initialize_all_variables().run()

            # discriminator pretraining
            num_pretrain_steps = 1000
            for step in xrange(num_pretrain_steps):
                d = (np.random.random(self.batch_size) - 0.5) * 10.0
                labels = norm.pdf(d, loc=self.data.mu, scale=self.data.sigma)
                pretrain_loss, _ = session.run([self.pre_loss, self.pre_opt], {
                    self.pre_input: np.reshape(d, (self.batch_size, 1)),
                    self.pre_labels: np.reshape(labels, (self.batch_size, 1))
                })
            self.weightsD = session.run(self.pre_theta)

            # copy weights from pre-training over to new D network
            for i, v in enumerate(self.theta_d2):
                session.run(v.assign(self.weightsD[i]))

            for step in xrange(self.num_epochs):
                # update discriminator
                x = self.data.sample(self.batch_size)
                z = self.gen.sample(self.batch_size)

                loss_d, _ = session.run([self.loss_d, self.opt_d], {
                    self.x: np.reshape(x, (self.batch_size, 1)),
                    self.z: np.reshape(z, (self.batch_size, 1))
                })

                # update generator
                z = self.gen.sample(self.batch_size)
                loss_g, _ = session.run([self.loss_g, self.opt_g], {
                    self.z: np.reshape(z, (self.batch_size, 1))
                })

                if step % self.log_every == 0:
                    print('{}: {}\t{}'.format(step, loss_d, loss_g))

                if step % self.anim_frame_every == 0:
                    plot_distributions(self, session, loss_d, loss_g)

Train the GAN


In [9]:
num_epochs = 1000

model = GAN(DataDistribution(), GeneratorDistribution(range=8), num_epochs)
model.train()


0: 1.7477325201	1.6593003273
100: 1.55887150764	0.716367125511
200: 1.47069978714	0.766529321671
300: 1.44156455994	0.754581212997
400: 1.42336750031	0.722344994545
500: 1.41921949387	0.71108520031
600: 1.40711009502	0.713812530041
700: 1.39862394333	0.716824114323
800: 1.39552330971	0.714896678925
900: 1.38714814186	0.714502751827

Visualizing the results

Setting up the plot and initiating three separate lines for p(d), discriminator's decision boundary and p(g):


In [10]:
f, ax = plt.subplots(figsize=(6,4))
f.suptitle('1D Generative Adversarial Network', fontsize=15)
plt.ylabel('Probability')
ax.set_xlim(-6, 6)
ax.set_ylim(0, 1.4)
line_d, = ax.plot([], [], label='p_d')
line_ds, = ax.plot([], [], label='decision boundary')
line_g, = ax.plot([], [], label='p_g')
frame_text = ax.text(0.02, 0.95,'',horizontalalignment='left',verticalalignment='top', transform=ax.transAxes)
ax.legend()


Out[10]:
<matplotlib.legend.Legend at 0x11c7c0190>

Animating the frames we captured during training:


In [11]:
def init():
    line_d.set_data([],[])
    line_ds.set_data([],[])
    line_g.set_data([],[])
    frame_text.set_text('Start')
    return (line_d,) + (line_ds,) + (line_g,) + (frame_text,)

def animate(i):
    bins = np.linspace(-5, 5, 50)
    x = np.linspace(-5, 5, 100000)
    
    histd, _ = np.histogram(anim_frames[i][0], bins=bins, normed=True)
    line_d.set_data(bins[1:], histd)

    ds = anim_frames[i][1]
    line_ds.set_data(x, ds)
    
    histg, _ = np.histogram(anim_frames[i][2], bins=bins, normed=True)
    line_g.set_data(bins[1:], histg)
    
    frame_text.set_text('Timestep = %.1d/%.1d' % (i, len(anim_frames)))
    
    return (line_d,) + (line_ds,) + (line_g,) + (frame_text,)

anim = animation.FuncAnimation(f, animate, init_func=init,
                               frames=len(anim_frames), blit=True)

HTML(anim.to_html5_video())


Out[11]:

Now let's visualize the training loss as well:


In [12]:
f_loss, ax_loss = plt.subplots(figsize=(6,3))
f_loss.suptitle('Training Loss', fontsize=15)
plt.xlabel('Epochs')
plt.ylabel('Loss')
ax_loss.set_xlim(-100, num_epochs)
ax_loss.set_ylim(0.5, 2)
line_loss_d, = ax_loss.plot([], [], label='loss_d')
line_loss_g, = ax_loss.plot([], [], label='loss_g')
ax_loss.legend()


Out[12]:
<matplotlib.legend.Legend at 0x13a68b390>

In [13]:
def init_loss():
    line_loss_d.set_data([],[])
    line_loss_g.set_data([],[])
    frame_text.set_text('Start')
    return (line_loss_d,) + (line_loss_g,)

def animate_loss(i):
    xs=[]
    ys_d=[]
    ys_g=[]    
    for point in range(0, i):
        xs.append(point*5)
        ys_d.append(anim_frames[point][3])
        ys_g.append(anim_frames[point][4])
        
    line_loss_d.set_data(xs, ys_d)
    line_loss_g.set_data(xs, ys_g)
    
    return (line_loss_d,) + (line_loss_g,)

anim_loss = animation.FuncAnimation(f_loss, animate_loss, init_func=init_loss,
                               frames=len(anim_frames), blit=True)

HTML(anim_loss.to_html5_video())


Out[13]:

Open challenges

Generative models are an active area of research for us here at AYLIEN. Here are some open challenges that we've encountered, and encourage you to work on if you're interested in this line of work:

  • Run the 1D GAN with different probability distributions
  • Create a 2D version of the GAN
  • Integrate changes so that the GAN learns to spread over the distribution better (instead of narrowing down to the mean)
  • Implement encoder/decoder networks so that it's possible to perform inference (see Adversarially Learned Inference by Dumoulin et al.)

If you have interesting solutions for any of these, we would love to hear from you, so please feel free to submit a pull request, or reach out to us directly at hello@aylien.com.