In [ ]:
# This tutorial is meant to be done after MLP_theano_with_comments.ipynb

# We are working with MNIST again, this time no labels are necessary - 
# the denoising autoencoder (DAE) is an unsupervised model that tries to reconstruct the original input.

# All imports up here this time
import pickle
import numpy
import numpy.random as rng
import theano
import theano.tensor as T
import theano.sandbox.rng_mrg as RNG_MRG
from utils import tile_raster_images
from PIL import Image as pil_img
from IPython.display import Image

# Load our data 
# Download and unzip pickled version from here: http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz
(train_x, _), (valid_x, _), (test_x, _) = pickle.load(open('data/mnist.pkl', 'r'))
print "Shapes:"
print train_x.shape
print valid_x.shape
print test_x.shape

In [ ]:
# We can specify any hyperparameters to play with up here:
input_size = 784  # 28x28 images
hidden_size = 1000
w_mean = 0.0
w_std = 0.05
w_interval = numpy.sqrt(6. / (input_size + hidden_size))
noise = 0.3
walkbacks = 3
learning_rate = 0.25
lr_decay = .985
batch_size = 100
epochs = 200
check_frequency = 10

# To make the organization better, lets define all the variables and parameters here.
x = T.matrix('x')
# W_x = numpy.asarray(rng.normal(loc=w_mean, scale=w_std, size=(input_size, hidden_size)), dtype=theano.config.floatX)
W_x = numpy.asarray(rng.uniform(low=-w_interval, high=w_interval, size=(input_size, hidden_size)), dtype=theano.config.floatX)
W_x = theano.shared(W_x, "W_x")

b_x = numpy.zeros((input_size,), dtype=theano.config.floatX)
b_h = numpy.zeros((hidden_size,), dtype=theano.config.floatX)
b_x = theano.shared(b_x, "b_x")
b_h = theano.shared(b_h, "b_h")

In [ ]:
# Now for the most important part of a denoising autoencoder - making the input noisy!
# Noise acts as regularization so the autoencoder doesn't just memorize the training set.
# This makes it more effective for test data by reducing overfitting.
noise_switch = theano.shared(1, "noise_switch")

theano_rng = RNG_MRG.MRG_RandomStreams(1)
def salt_and_pepper(variable):
    mask = theano_rng.binomial(size=variable.shape, n=1, p=(1-noise), dtype=theano.config.floatX)
    saltpepper = theano_rng.binomial(size=variable.shape, n=1, p=0.5, dtype=theano.config.floatX)
    ones = T.eq(mask, 0) * saltpepper
    noisy = variable*mask + ones
    return T.switch(noise_switch,
                    noisy,
                    variable)

In [ ]:
# Now we are ready to create the computation graph!
# Remember it is noisy_x -> hiddens -> x -> hiddens -> x .....

inputs=[x]
for walkback in range(walkbacks):
    noisy_x = salt_and_pepper(inputs[-1])

    h = T.tanh(
        T.dot(noisy_x, W_x) + b_h
    )

    reconstruction = T.nnet.sigmoid(
        T.dot(h, W_x.T) + b_x
    )

    inputs.append(reconstruction)
    
reconstructions = inputs[1:]

In [ ]:
# The output of our computation graph is the last reconstructed input in the Gibbs chain.
output = reconstructions[-1]

# Our cost function is now the reconstruction error between all reconstructions and the original input.
cost = numpy.sum([T.mean(T.nnet.binary_crossentropy(recon, x)) for recon in reconstructions])

In [ ]:
parameters = [W_x, b_h, b_x]
gradients = T.grad(cost, parameters)

lr = theano.shared(numpy.asarray(learning_rate, dtype='float32'), 'lr')
train_updates = [(param, param - lr*gradient) for param, gradient in zip(parameters, gradients)]

In [ ]:
# Compile our training and testing function like before!
# Train function updates the parameters and returns the total train cost to monitor.
f_train = theano.function(
    inputs=[x], 
    outputs=cost, 
    updates=train_updates, 
    allow_input_downcast=True
)

# Our test function will return the final reconstruction, and it needs to include updates from scan.
f_test = theano.function(
    inputs=[x], 
    outputs=output,
    allow_input_downcast=True
)

In [ ]:
# That's it! Now perform SGD like before.
# Main training loop

train_batches = len(train_x) / batch_size

try:
    for epoch in range(epochs):
        print epoch+1,":",

        # Don't forget to turn on our noise switch for training! Just set the shared variable to 1 (True)
        noise_switch.set_value(1.)

        train_costs = []
        for i in range(train_batches):
            batch_x = train_x[i*batch_size:(i+1)*batch_size]

            costs = f_train(batch_x)

            train_costs.append(costs)
        print "cost:", numpy.mean(train_costs),
        
        old_lr = lr.get_value()
        print "\tlearning rate:", old_lr
        new_lr = numpy.asarray(old_lr * lr_decay, dtype='float32')
        lr.set_value(new_lr)

        if (epoch+1) % check_frequency == 0:
            print "Saving images..."
            train_recons = f_test(train_x[:25])
            train_stacked = numpy.vstack(
                [numpy.vstack([
                        train_x[i*5:(i+1)*5],
                        train_recons[i*5:(i+1)*5]
                    ])
                 for i in range(5)]
            )
            train_image = pil_img.fromarray(
                tile_raster_images(train_stacked, (28, 28), (5, 10), (1, 1))
            )
            train_image.save("dae_train_%d.png"%(epoch+1))
            
            # For validation, let's run a few images through and see the reconstruction 
            # (with the noise from training still added)
            valid_recons = f_test(valid_x[:25])
            # Use the tile_raster_image helper function to rearrange the matrix into a 5x10 image of digits
            # (Two 5x5 images next to each other - the first the inputs, the second the reconstructions.)
            valid_stacked = numpy.vstack(
                [numpy.vstack([
                        valid_x[i*5:(i+1)*5],
                        valid_recons[i*5:(i+1)*5]
                    ])
                 for i in range(5)]
            )
            valid_image = pil_img.fromarray(
                # helper from utils.py
                tile_raster_images(valid_stacked, (28, 28), (5, 10), (1, 1))
            )
            valid_image.save("dae_valid_%d.png"%(epoch+1))

            # Now do the same for test, but don't add any noise
            # This means set the noise switches to 0. (False)
            noise_switch.set_value(0.)

            test_recons = f_test(test_x[:25])
            test_stacked = numpy.vstack(
                [numpy.vstack([
                        test_x[i*5:(i+1)*5],
                        test_recons[i*5:(i+1)*5]
                    ])
                 for i in range(5)]
            )
            test_image = pil_img.fromarray(
                tile_raster_images(test_stacked, (28, 28), (5, 10), (1, 1))
            )
            test_image.save("dae_test_%d.png"%(epoch+1))
            
            weight_filters = pil_img.fromarray(
                tile_raster_images(
                    W_x.get_value(borrow=True).T,
                    img_shape=(28, 28),
                    tile_shape=(25, 40),
                    tile_spacing=(1, 1)
                )
            )
            weight_filters.save("dae_filters_%d.png"%(epoch+1))
except KeyboardInterrupt:
    pass

In [ ]: