MNIST Digit Classification - CNN


In [1]:
from __future__ import division, print_function
from sklearn.metrics import accuracy_score, confusion_matrix
import matplotlib.pyplot as plt
import numpy as np
import theano
import theano.tensor as T

from theano.tensor.signal import pool
from theano.tensor.nnet import conv2d

import os
%matplotlib inline

In [2]:
DATA_DIR = "../../data"
TRAIN_FILE = os.path.join(DATA_DIR, "mnist_train.csv")
TEST_FILE = os.path.join(DATA_DIR, "mnist_test.csv")

MODEL_FILE = os.path.join(DATA_DIR, "theano-mnist-cnn")

LEARNING_RATE = 0.001
REG_LAMBDA = 0.01

INPUT_SIZE = 28
BATCH_SIZE = 128
NUM_CLASSES = 10
NUM_EPOCHS = 5

Prepare Data


In [3]:
def parse_file(filename):
    xdata, ydata = [], []
    fin = open(filename, "rb")
    i = 0
    for line in fin:
        if i % 10000 == 0:
            print("{:s}: {:d} lines read".format(
                os.path.basename(filename), i))
        cols = line.strip().split(",")
        ydata.append(int(cols[0]))
        x1d = np.array([float(x) / 255. for x in cols[1:]])
#         x1d = np.array([float(x) for x in cols[1:]])
        x3d = np.reshape(x1d, (1, INPUT_SIZE, INPUT_SIZE))
        xdata.append(x3d)
        i += 1
    fin.close()
    print("{:s}: {:d} lines read".format(os.path.basename(filename), i))
    X = np.array(xdata).astype("float32")
    y = np.array(ydata).astype("int32")
    return X, y

Xtrain, ytrain = parse_file(TRAIN_FILE)
Xtest, ytest = parse_file(TEST_FILE)
print(Xtrain.shape, ytrain.shape, Xtest.shape, ytest.shape)


mnist_train.csv: 0 lines read
mnist_train.csv: 10000 lines read
mnist_train.csv: 20000 lines read
mnist_train.csv: 30000 lines read
mnist_train.csv: 40000 lines read
mnist_train.csv: 50000 lines read
mnist_train.csv: 60000 lines read
mnist_test.csv: 0 lines read
mnist_test.csv: 10000 lines read
(60000, 1, 28, 28) (60000,) (10000, 1, 28, 28) (10000,)

Define Network


In [4]:
X = T.ftensor4("X")
y = T.ivector("y")

In [5]:
# CONV1: 5x5 kernel, channels: 1 => 32
# filtering reduces the image size to (28-5+1 , 28-5+1) = (24, 24)
# maxpooling reduces this further to (24/2, 24/2) = (12, 12)
# output tensor of shape (batch_size, 32, 12, 12)
W1 = theano.shared(np.random.randn(32, 1, 5, 5)
                   .astype(theano.config.floatX), name="W1")
conv_1 = conv2d(input=X, filters=W1, filter_shape=(32, 1, 5, 5),
                input_shape=(BATCH_SIZE, 1, INPUT_SIZE, INPUT_SIZE))
pool_1 = pool.pool_2d(input=conv_1, ws=(2, 2), ignore_border=True)
# relu_1 = T.nnet.relu(pool_1)
relu_1 = T.tanh(pool_1)

# CONV2: 5x5 kernel, channels: 32 => 64
# filtering reduces the image size to (12-5+1, 12-5+1) = (8, 8)
# maxpooling reduces this further to (8/2, 8/2) = (4, 4)
# output tensor of shape (batch_size, 64, 4, 4)
W2 = theano.shared(np.random.randn(64, 32, 5, 5)
                   .astype(theano.config.floatX), name="W2")
conv_2 = conv2d(input=relu_1, filters=W2, filter_shape=(64, 32, 5, 5),
               input_shape=(BATCH_SIZE, 32, 12, 12))
pool_2 = pool.pool_2d(input=conv_2, ws=(2, 2), ignore_border=True)
# relu_2 = T.nnet.relu(pool_2)
relu_2 = T.tanh(pool_2)

# output tensor of shape (batch_size, 1024)
flat_3 = relu_2.reshape((-1, 1024))

# fc1: 1024 => 512
W4 = theano.shared(np.random.randn(1024, 512)
                   .astype(theano.config.floatX), name="W4")
b4 = theano.shared(np.zeros(512).astype(theano.config.floatX), name="b4")
fc4 = flat_3.dot(W4) + b4
# fc4_relu = T.nnet.relu(fc4)
fc4_relu = T.tanh(fc4)

# fc2: 512 => 10
W5 = theano.shared(np.random.randn(512, 10)
                   .astype(theano.config.floatX), name="W5")
b5 = theano.shared(np.zeros(10).astype(theano.config.floatX), name="b5")
fc5 = fc4_relu.dot(W5) + b5
y_hat = T.nnet.softmax(fc5)

In [6]:
loss = T.nnet.categorical_crossentropy(y_hat, y).mean()
prediction = T.argmax(y_hat, axis=1)

In [7]:
forward_prop = theano.function([X], y_hat)
calculate_loss = theano.function([X, y], loss)
predict = theano.function([X], prediction)

In [8]:
params = [W5, b5, W4, b4, W2, W1]
grads = T.grad(loss, params)

In [9]:
updates = [
    (param_i, param_i - LEARNING_RATE * grad_i)
    for param_i, grad_i in zip(params, grads)
]

In [ ]:
gradient_step = theano.function(
    [X, y],
    updates=updates
)

Train Network


In [ ]:
history = []
num_batches = len(Xtrain) // BATCH_SIZE
num_val_recs = len(Xtrain) // 10
Xval, yval = Xtrain[0:num_val_recs], ytrain[0:num_val_recs]
for epoch in range(NUM_EPOCHS):
    shuffled_indices = np.random.permutation(np.arange(len(Xtrain)))
    total_loss, total_acc = 0., 0.
    for bid in range(num_batches - 1):
        bstart = bid * BATCH_SIZE
        bend = (bid + 1) * BATCH_SIZE
        Xbatch = np.array([Xtrain[i] for i in shuffled_indices[bstart:bend]])
        ybatch = np.array([ytrain[i] for i in shuffled_indices[bstart:bend]])
        gradient_step(Xbatch, ybatch)
        total_loss += calculate_loss(Xbatch, ybatch)
    total_loss /= num_batches
    # validate with 10% training data
    yval_ = predict(Xval)
    total_acc = accuracy_score(yval_, yval)
    history.append((total_loss, total_acc))
    print("Epoch {:d}/{:d}: loss={:.4f}, accuracy: {:.4f}".format(
        epoch+1, NUM_EPOCHS, total_loss, total_acc))


Epoch 1/5: loss=20.3532, accuracy: 0.2393

In [ ]:
losses = [x[0] for x in history]
accs = [x[1] for x in history]

plt.subplot(211)
plt.title("Accuracy")
plt.plot(accs)

plt.subplot(212)
plt.title("Loss")
plt.plot(losses)

plt.tight_layout()
plt.show()

Evaluate Network


In [ ]:
predict_fn = theano.function([X], prediction)
ytest_ = predict_fn(Xtest)
acc = accuracy_score(ytest_, ytest)
cm = confusion_matrix(ytest_, ytest)
print("accuracy: {:.3f}".format(acc))
print("confusion matrix")
print(cm)

In [ ]: