In [ ]:
# This tutorial is meant to be done after MLP_theano_with_comments.ipynb
# We are working with MNIST again, this time no labels are necessary -
# the denoising autoencoder (DAE) is an unsupervised model that tries to reconstruct the original input.
# All imports up here this time
import pickle
import numpy
import numpy.random as rng
import theano
import theano.tensor as T
import theano.sandbox.rng_mrg as RNG_MRG
from utils import tile_raster_images
from PIL import Image as pil_img
from IPython.display import Image
# Load our data
# Download and unzip pickled version from here: http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz
(train_x, _), (valid_x, _), (test_x, _) = pickle.load(open('data/mnist.pkl', 'r'))
print "Shapes:"
print train_x.shape
print valid_x.shape
print test_x.shape
In [ ]:
# The DAE data flow looks like this: input -> input_(add noise) -> hiddens -> input
# This can be repeated by sampling from the reconstructed input. Repeating like that is
# a pseudo Gibbs sampling process. We can define how many times we want to repeat (known as walkbacks).
# We can specify any hyperparameters to play with up here:
input_size = 784 # 28x28 images
hidden_size = 1000
w_mean = 0.0
w_std = 0.05
noise = 0.3
walkbacks = 3
learning_rate = 0.1
batch_size = 100
epochs = 100
check_frequency = 10
# To make the organization better, lets define all the variables and parameters here.
# Just like with the MLP, we need a symbolic matrix to input the images
x = T.matrix('x')
# Next, we need the weights matrix W_x. This will be used to go both from input -> hidden and
# hidden -> input (by using its transpose). This is called tied weights.
# Again, initialization has a lot of literature, but we are just goint to stick with gaussian at the moment.
W_x = numpy.asarray(rng.normal(loc=w_mean, scale=w_std, size=(input_size, hidden_size)), dtype=theano.config.floatX)
# (Don't forget to make parameters into shared variables so they can be updated!)
W_x = theano.shared(W_x, "W_x")
# Because we are outputting back into the input space, we also need a bias vector for both the input
# and hidden layers.
b_x = numpy.zeros((input_size,), dtype=theano.config.floatX)
b_h = numpy.zeros((hidden_size,), dtype=theano.config.floatX)
b_x = theano.shared(b_x, "b_x")
b_h = theano.shared(b_h, "b_h")
In [ ]:
# Now for the most important part of a denoising autoencoder - making the input noisy!
# Noise acts as regularization so the autoencoder doesn't just memorize the training set.
# This makes it more effective for test data by reducing overfitting.
# We deal with adding noise during training but not testing in Theano with a switch variable!
# Switches can be turned on or off to direct data flow in the computation graph -
# so we turn on for train and off for test!
# You guessed it - we need a shared variable to represent the switch condition so we can change it at runtime.
noise_switch = theano.shared(1, "noise_switch")
# One important thing to note - the type of noise has to correspond to the type of input.
# i.e. we can't add real-value noise when the input is expected to be binary
# So for these binary inputs, we will add salt-and-pepper masking noise!
# This is a function so we can keep adding it during the computation chain when we alternate sampling
# from input and reconstructing from hiddens.
# Theano random number generator
theano_rng = RNG_MRG.MRG_RandomStreams(1)
def salt_and_pepper(variable):
mask = theano_rng.binomial(size=variable.shape, n=1, p=(1-noise), dtype=theano.config.floatX)
saltpepper = theano_rng.binomial(size=variable.shape, n=1, p=0.5, dtype=theano.config.floatX)
ones = T.eq(mask, 0) * saltpepper
# Randomly set some bits to 0 or 1 with equal probability.
noisy = variable*mask + ones
return T.switch(noise_switch,
# true condition
noisy,
# false condition
variable)
In [ ]:
# Now we are ready to create the computation graph!
# Remember it is noisy_x -> hiddens -> x -> hiddens -> x .....
inputs=[x]
for walkback in range(walkbacks):
# First, we want to corrupt the input x
noisy_x = salt_and_pepper(inputs[-1])
# Now calculate the hiddens
h = T.tanh(
T.dot(noisy_x, W_x) + b_h
)
# From the hiddens, reconstruct x.
# We have to use an appropriate activation function for the type of inputs!
# In our case with MNIST images, it is binary so sigmoid works.
reconstruction = T.nnet.sigmoid(
T.dot(h, W_x.T) + b_x
)
# That is all for an autoencoder!
inputs.append(reconstruction)
# Remove the original input from our reconstructions list
reconstructions = inputs[1:]
In [ ]:
# The output of our computation graph is the last reconstructed input in the Gibbs chain.
output = reconstructions[-1]
# Our cost function is now the reconstruction error between all reconstructions and the original input.
# Again, because our input space is binary, using mean binary cross-entropy is a good analog for
# reconstruction error.
# For real-valued inputs, we could use mean square error.
cost = numpy.sum([T.mean(T.nnet.binary_crossentropy(recon, x)) for recon in reconstructions])
In [ ]:
# Just like with the MLP, compute gradient updates for the parameters to use with training.
parameters = [W_x, b_h, b_x]
# Automagic differentiation! (Still love it)
gradients = T.grad(cost, parameters)
# Update the parameters for stochastic gradient descent!
train_updates = [(param, param - learning_rate*gradient) for param, gradient in zip(parameters, gradients)]
In [ ]:
# Compile our training and testing function like before!
# Train function updates the parameters and returns the total train cost to monitor.
f_train = theano.function(
inputs=[x],
outputs=cost,
updates=train_updates,
allow_input_downcast=True
)
# Our test function will return the final reconstruction, and it needs to include updates from scan.
f_test = theano.function(
inputs=[x],
outputs=output,
allow_input_downcast=True
)
In [ ]:
# That's it! Now perform SGD like before.
# Main training loop
train_batches = len(train_x) / batch_size
try:
for epoch in range(epochs):
print epoch+1, ":",
# Don't forget to turn on our noise switch for training! Just set the shared variable to 1 (True)
noise_switch.set_value(1.)
train_costs = []
for i in range(train_batches):
batch_x = train_x[i*batch_size:(i+1)*batch_size]
costs = f_train(batch_x)
train_costs.append(costs)
print "cost:", numpy.mean(train_costs)
if (epoch+1) % check_frequency == 0:
print "Saving images..."
# For validation, let's run a few images through and see the reconstruction
# (with the noise from training still added)
valid_recons = f_test(valid_x[:25])
# Use the tile_raster_image helper function to rearrange the matrix into a 5x10 image of digits
# (Two 5x5 images next to each other - the first the inputs, the second the reconstructions.)
valid_stacked = numpy.vstack(
[numpy.vstack([
valid_x[i*5:(i+1)*5],
valid_recons[i*5:(i+1)*5]
])
for i in range(5)]
)
valid_image = pil_img.fromarray(
# helper from utils.py
tile_raster_images(valid_stacked, (28, 28), (5, 10), (1, 1))
)
valid_image.save("dae_valid_%d.png"%(epoch+1))
# Now do the same for test, but don't add any noise
# This means set the noise switches to 0. (False)
noise_switch.set_value(0.)
test_recons = f_test(test_x[:25])
test_stacked = numpy.vstack(
[numpy.vstack([
test_x[i*5:(i+1)*5],
test_recons[i*5:(i+1)*5]
])
for i in range(5)]
)
test_image = pil_img.fromarray(
tile_raster_images(test_stacked, (28, 28), (5, 10), (1, 1))
)
test_image.save("dae_test_%d.png"%(epoch+1))
except KeyboardInterrupt:
pass
# Let's finally save an image of the filters the DAE learned - this is simply the transpose of the weights!
weight_filters = pil_img.fromarray(
tile_raster_images(
W_x.get_value(borrow=True).T,
img_shape=(28, 28),
tile_shape=(25, 40),
tile_spacing=(1, 1)
)
)
print "Saving filters..."
weight_filters.save("dae_filters.png")
print "Done!"
In [ ]: