This notebook picks up after the simple_models notebook. After trying a range of classification algorithms, we'll try out some of the neural network models in [1]. These include fully-connected models of varying layer sizes, and finally convolutional models including the famous LeNet-5.
Along the way, we'll be using Keras which is a library sitting on top of Theano or Tensorflow. This allows easy construction, training and evaluation of neural nets. Before we get started, here's a recap of the simple_models notebook models.
[1] - Gradient-Based Learning Applied to Document Recognition, LeCun et al, Nov 1998
In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
plt.style.use('fivethirtyeight')
# plt.rcParams['font.family'] = 'serif'
plt.rcParams['font.serif'] = 'Helvetica'
plt.rcParams['font.monospace'] = 'Consolas'
plt.rcParams['font.size'] = 16
plt.rcParams['axes.labelsize'] = 16
plt.rcParams['axes.labelweight'] = 'bold'
plt.rcParams['xtick.labelsize'] = 14
plt.rcParams['ytick.labelsize'] = 14
plt.rcParams['legend.fontsize'] = 16
plt.rcParams['axes.titlesize'] = 20
plt.rcParams['lines.linewidth'] = 2
%matplotlib inline
# for auto-reloading external modules
%load_ext autoreload
%autoreload 2
In [4]:
# Set up the file directory and names
DIR = '../input/'
X_TRAIN = DIR + 'train-images-idx3-ubyte.pkl'
Y_TRAIN = DIR + 'train-labels-idx1-ubyte.pkl'
X_TEST = DIR + 't10k-images-idx3-ubyte.pkl'
Y_TEST = DIR + 't10k-labels-idx1-ubyte.pkl'
def load_data():
'''Loads pickled ubyte files with MNIST data
INPUT: X_train_file, y_train_file - strings with training filenames
X_test_file, y_test_File - strings with test filenames
RETURNS: Tuple with (X_train, y_train, X_test, y_test)
'''
print('Loading pickle files')
try:
X_train = pickle.load( open( X_TRAIN, "rb" ) )
y_train = pickle.load( open( Y_TRAIN, "rb" ) )
X_test = pickle.load( open( X_TEST, "rb" ) )
y_test = pickle.load( open( Y_TEST, "rb" ) )
except:
print('Error loading pickle file')
return None
return (X_train, y_train, X_test, y_test)
X_train, y_train, X_test, y_test = load_data()
In [5]:
def flatten_images(X):
''' Converts images to 1-d vectors
INPUT: X - Input array of shape [n, w, h]
RETURNS: Numpy array of shape [n, w*h]
'''
n, w, h = X.shape
X_flat = X.reshape((n, w * h))
return X_flat
def square_images(X, w=None, h=None):
'''Converts single-vector images into square images
INPUT: X - numpy array of images in single-vector form
w - width of images to convert to
h - height of images to convert to
RETURNS: Numpy array of shape [n, w, h]
'''
assert X.shape[1] == w * h, "Error - Can't square array of shape {} to {}".format(X.shape, (w, h))
n = X.shape[0]
X_square = X.reshape((n, w, h))
return X_square
N_TRAIN, W, H = X_train.shape
N_TEST, w_test, h_test = X_test.shape
# Flatten the images
X_train = flatten_images(X_train)
X_test = flatten_images(X_test)
# Do some checks on the data
assert N_TRAIN == 60000, 'Error - expected 60000 training images, got {}'.format(N_TRAIN)
assert N_TEST == 10000, 'Error - expected 60000 training images, got {}'.format(N_TEST)
assert W == w_test, 'Error - width mismatch. Train {}, Test {}'.format(w, w_test)
assert H == h_test, 'Error - height mismatch. Train {}, Test {}'.format(h, h_test)
assert np.array_equal(X_train, flatten_images(square_images(X_train, W, H)))
assert X_train.shape[0] == y_train.shape[0]
assert X_test.shape[0] == y_test.shape[0]
print('Loaded train images shape {}, labels shape {}'.format(X_train.shape, y_train.shape))
print('Loaded test images shape {}, labels shape {}'.format(X_test.shape, y_test.shape))
In [6]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer, StandardScaler
# Keras Common configuration
SEED = 1234 # Fix the seed for repeatability
N_JOBS=-2 # Leave 1 core free for UI updates
VERBOSE=2 # 3 is the most verbose level
EPOCHS = 20 # todo ! Check how many epochs in the paper
BATCH = 256 # todo ! Check this in the paper too
In [7]:
# Useful helper functions
def stratified_subsample(X, y, num_rows, verbose=False):
'''Creates a stratified subsample of X and y
INPUT: X and y, numpy arrays
RETURNS: subset of X and y, maintaining class balances
'''
# Create a stratified, shuffled subset of the training data if needed
N = X.shape[0]
new_X, new_y = X, y
if num_rows < N:
if verbose:
print('Reducing size from {} to {} examples'.format(N, num_rows))
new_X, _, new_y, _ = train_test_split(X_train, y_train, # Undersample by dropping "test" data
train_size=num_rows, random_state=SEED)
return new_X, new_y
def onehot_encode_y(y_train, y_test):
'''Convert y_train and y_test to a one-hot encoding version
INPUT: y_train - np.array of size (n_train,)
y_test - np.array of size (n_test,)
RETURNS: y_train - np.array of size (n_train, n_classes)
y_test - np.arary of size (n_test, n_classes)
'''
print('Converting y variables to one-hot encoding..')
lbe = LabelBinarizer()
lbe.fit(y_train)
y_train = lbe.transform(y_train)
y_test = lbe.transform(y_test)
return y_train, y_test
def z_norm_X(X_train, X_test):
'''Z-normalizes X_train and X_test with 0 mean and 1 std. dev.
INPUT: X_train - training set
X_test - test set
RETURNS: X_train - normalized version of same size
X_test - normalized version (using X_train parameters)
'''
print('Z-normalizing X data..')
std = StandardScaler()
X_train = X_train.astype(np.float32)
X_test = X_test.astype(np.float32)
std.fit(X_train)
X_train = std.transform(X_train)
X_test = std.transform(X_test)
return X_train, X_test
In [8]:
y_train, y_test = onehot_encode_y(y_train, y_test)
X_train, X_test = z_norm_X(X_train, X_test)
scores = dict()
print('Train images shape {}, labels shape {}'.format(X_train.shape, y_train.shape))
print('Test images shape {}, labels shape {}'.format(X_test.shape, y_test.shape))
In [9]:
from keras.models import Sequential
from keras.layers import Dense, Activation
from keras.optimizers import SGD
# Create a dictionary to store model training and test info
models = dict()
class KerasFCModel(object):
def __init__(self, model_name, model_type, input_dim, layers,
activation, output_activation, verbose=2):
'''Initializes a new keras model'''
self.model_name = model_name
self.verbose = verbose
model = model_type
for idx, size in enumerate(layers):
# First layer has to take input from image files
if idx == 0:
if self.verbose == 2:
print('Adding input dense layer, input dim {}, dim {}'.format(input_dim, size))
model.add(Dense(size, input_dim=input_dim))
model.add(Activation(activation))
# Last layer has to include the output activation
elif idx == len(layers) - 1:
if self.verbose == 2:
print('Adding dense layer {}, size {}, activation {}'.format(idx, size, activation))
model.add(Dense(size))
model.add(Activation(output_activation))
# Layers other than first and last have standard activation
else:
if self.verbose == 2:
print('Adding output layer {}, size {}, activation {}'.format(idx, size, output_activation))
model.add(Dense(size))
model.add(Activation(activation))
if self.verbose > 0:
print('Model summary:\n')
model.summary()
self.model = model
def compile_model(self, loss, optimizer, metrics):
'''Compile the model'''
self.metrics = metrics
self.loss = loss
self.optimizer = optimizer
# Need to flip error vs accuracy
metrics = ['acc' if metric is 'error' else metric for metric in metrics]
self.model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
def fit(self, X, y, epochs, batch_size):
'''Fit model to training data'''
self.history = self.model.fit(X, y, epochs=epochs, batch_size=batch_size, verbose=self.verbose)
def evaluate(self, X, y, batch_size):
'''Evaluates the model on test data'''
output = self.model.evaluate(X, y, batch_size=batch_size)
results = dict()
for idx, metric in enumerate(self.model.metrics_names):
if metric == 'acc':
results['error'] = 1.0 - output[idx]
else:
results[metric] = output[idx]
self.results = results
def report(self):
'''Prints a recap of the model, how it was trained, and performance'''
report = dict()
if self.verbose > 0:
report['model_info'] = self.model.summary()
report['loss'] = self.loss
report['optimizer'] = self.optimizer.get_config()
report['metrics'] = self.metrics
report['history'] = self.history
report['results'] = self.results
return report
In [10]:
# Helper function to evaluate fully-connected models
def evaluate_fc_model(name, layers, activation, optimizer,
X_tr, y_tr, X_te, y_te,
epochs, batch_size,
verbose=2):
"""Creates, trains, and evaluates neural network on provided data"""
print('Creating Keras model {}'.format(name))
model = KerasFCModel(model_name=name, model_type=Sequential(),
input_dim=784, layers=layers,
activation=activation, output_activation='softmax',
verbose=verbose)
print('Compiling model')
model.compile_model(loss='categorical_crossentropy',
optimizer=optimizer,
metrics=['error'])
print('Training model')
model.fit(X_tr, y_tr, epochs=epochs, batch_size=batch_size)
print('Evaluating model')
model.evaluate(X_te, y_te, batch_size=batch_size)
print('\nTest results: {:.2f}% error'.format(100.0 * model.report()['results']['error']))
return model
In [11]:
%%time
fc_results = dict()
fc_results['fc-300-10'] = evaluate_fc_model('fc-300-10', layers=(300,10), activation='tanh',
optimizer=SGD(lr=0.1, decay=1e-6, momentum=0.9, nesterov=True),
X_tr=X_train, y_tr=y_train, X_te=X_test, y_te=y_test,
epochs=EPOCHS, batch_size=BATCH,
verbose=0)
In [12]:
%%time
# FC 1000-10
fc_results['fc-1000-10'] = evaluate_fc_model('fc-1000-10', layers=(1000,10), activation='tanh',
optimizer=SGD(lr=0.1, decay=1e-6, momentum=0.9, nesterov=True),
X_tr=X_train, y_tr=y_train, X_te=X_test, y_te=y_test,
epochs=EPOCHS, batch_size=BATCH,
verbose=0)
In [13]:
%%time
# FC 300-100-10
fc_results['fc-300-100-10'] = evaluate_fc_model('fc-300-100-10', layers=(300,100,10), activation='tanh',
optimizer=SGD(lr=0.1, decay=1e-5, momentum=0.9, nesterov=True),
X_tr=X_train, y_tr=y_train, X_te=X_test, y_te=y_test,
epochs=EPOCHS, batch_size=BATCH,
verbose=0)
In [14]:
%%time
# FC 500-150-10
fc_results['fc-500-150-10'] = evaluate_fc_model('fc-500-150-10', layers=(500,150,10), activation='tanh',
optimizer=SGD(lr=0.1, decay=1e-5, momentum=0.9, nesterov=True),
X_tr=X_train, y_tr=y_train, X_te=X_test, y_te=y_test,
epochs=EPOCHS, batch_size=BATCH,
verbose=0)
In [15]:
# Compile the FC results so far into a dataframe for easy plotting
fc_scores = {result: value.results['error'] for result, value in fc_results.items()}
fc_scores_df = pd.DataFrame.from_dict(fc_scores, orient='index')
fc_scores_df.columns = ['error']
fc_scores_df['error'] *= 100.0
fc_scores_df = fc_scores_df.sort_values('error', ascending=True)
fc_scores_df.to_pickle('fc_scores.pkl')
fc_scores_df
Out[15]:
In [16]:
fig, ax = plt.subplots(1,1, figsize=(6,6))
fc_scores_df.plot.barh(width=0.4, ax=ax, legend=None)
ax.set(title="Fully-connected neural net test-set accuracy", ylabel="Network", xlabel="%age error");
plt.savefig('fc_scores.png', bbox_inches='tight', dpi=150)
Now let's see how much further we can improve performance with convolutional neural networks. For the convolutional networks, we need a 2-d image instead of the flattened 1-d vector the fully connected networks used. We also need to add padding around each of the images to increase their size to 32x32, as in [1].
In [17]:
# Load image pickle files
X_train, y_train, X_test, y_test = load_data()
X_train.shape
Out[17]:
In [18]:
#Plot a few random numbers to sanity check their size and that they look correct
N = 3
indexes = np.random.choice(X_train.shape[0], N)
fig, ax = plt.subplots(1, N)
for num, idx in enumerate(indexes):
ax[num].imshow(X_train[idx])
ax[num].set(title="Label={}\n{}x{}".format(y_train[idx], *X_train[idx].shape))
In [19]:
def image_border(image, size, fill):
"""
Adds a border around the nupmy array of the gizen size and value
"""
im_w, im_h = image.shape
im_dtype = image.dtype
new_image = np.full((im_w + (2 * size), im_h + (2 * size)),
fill_value=fill, dtype=im_dtype)
new_image[size:im_h + size, size:im_w + size] = image
assert new_image.dtype == image.dtype
assert new_image.shape[0] == image.shape[0] + (2 * size)
assert new_image.shape[1] == image.shape[1] + (2 * size)
assert np.array_equal(image, new_image[size:size+im_h, size:size+im_w])
return new_image
In [20]:
N = 3
indexes = np.random.choice(X_train.shape[0], N)
fig, ax = plt.subplots(2, N, figsize=(10,6))
for num, idx in enumerate(indexes):
ax[0, num].imshow(X_train[idx])
ax[0, num].set(title="Label={}\n{}x{}".format(y_train[idx], *X_train[idx].shape))
X_resize = image_border(X_train[idx], 2, 0)
ax[1, num].imshow(X_resize)
ax[1, num].set(title="Label={}\n{}x{}".format(y_train[idx], *X_resize.shape))
plt.tight_layout()
In [21]:
from tqdm import tqdm
# resize all the training and test images
n_train = X_train.shape[0]
n_test = X_test.shape[0]
def resize_images(images, description):
"""
Iterates through lowest order dimension, and resizes images
"""
new_images = np.zeros((images.shape[0], 32, 32))
for index in tqdm(range(images.shape[0]), desc=description):
new_images[index] = image_border(images[index], 2, 0)
return new_images
X_resize_train = resize_images(X_train, "Resizing train images")
X_resize_test = resize_images(X_test, "Resizing test images")
X_train, X_test = X_resize_train, X_resize_test
print('New X_train shape: {}, new x_test shape: {}'.format(X_train.shape, X_test.shape))
print('y_train shape: {}, y_test shape: {}'.format(y_train.shape, y_test.shape))
In [22]:
from keras import backend as K
# Input images need to be Z-normalized, and need to be flattened to 1-d vector and re-squared afterwards
X_train, X_test = z_norm_X(flatten_images(X_train), flatten_images(X_test))
X_train, X_test = square_images(X_train, 32, 32), square_images(X_test, 32, 32)
# y values need to be converted to one-hot
y_train, y_test = onehot_encode_y(y_train, y_test)
# Need to add explicit shape of 1 as we have 1 channel for B&W images
X_train, X_test = X_train[:,:,:, np.newaxis], X_test[:,:,:, np.newaxis] # Need explicit single channel
print('New X_train shape: {}, new x_test shape: {}'.format(X_train.shape, X_test.shape))
print('y_train shape: {}, y_test shape: {}'.format(y_train.shape, y_test.shape))
In [23]:
# Our channels are in the least significant order of the np array (32, 32, 1).
# Make sure the current backend matches this ordering, and doesn't expect (1, 32, 32).
assert K.image_data_format() == 'channels_last'
In [24]:
from keras.models import Sequential
from keras.layers import Conv2D, Dense, Activation, AveragePooling2D, Flatten
def lenet5_model(verbose=False):
"""
Creates and returns a lenet5 model
"""
# Create the model
model = Sequential()
model.add(Conv2D(filters=6, kernel_size=(5, 5), strides=(1, 1), input_shape=(32, 32, 1))) # C1
model.add(AveragePooling2D(pool_size=(2, 2))) # S2
model.add(Activation('tanh'))
model.add(Conv2D(filters=16, kernel_size=(5, 5), strides=(1, 1))) # C3
model.add(AveragePooling2D(pool_size=(2, 2))) # S4
model.add(Activation('tanh'))
model.add(Conv2D(filters=120, kernel_size=(5, 5), strides=(1, 1))) # C5
model.add(Activation('tanh'))
model.add(Flatten())
model.add(Dense(120)) # F6
model.add(Activation('tanh'))
model.add(Dense(10))
model.add(Activation('softmax'))
if verbose:
print(model.summary())
return model
lenet5 = lenet5_model(verbose=True)
In [33]:
# Create a new model every time
def evaluate_model(model, optimizer, cv_split=None, verbose=False):
"""
Wrapper method to create, train and optionally CV, and check performance on test set
"""
if verbose:
print('\nCompiling model')
model.summary()
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
if verbose:
print('\nTraining model')
history = model.fit(X_train, y_train, validation_split=cv_split,
epochs=20, batch_size=256, verbose=1 if verbose else 0)
if verbose:
print('\nEvaluating model')
score = model.evaluate(X_test, y_test, batch_size=256)
if verbose:
print('\nTest results: Loss = {:.4f}, Error = {:.4f}'.format(score[0], 1.0 - score[1]))
results = {'model': model, 'history': history.history, 'loss': score[0], 'acc': score[1], 'err': 1.0 - score[1]}
return results
results = evaluate_model(model=lenet5_model(),
optimizer=SGD(lr=0.1, decay=1e-6, momentum=0.9, nesterov=True),
cv_split=0.2, verbose=True)
In [34]:
def plot_history(hist):
"""
Plots the history object returned by the .fit() call
"""
for metric in ('acc', 'loss', 'val_acc', 'val_loss'):
assert metric in hist.keys()
hist_df = pd.DataFrame(hist)
fig, axes = plt.subplots(1,2,figsize=(14, 6))
hist_df['err'] = 1 - hist_df['acc']
hist_df['val_err'] = 1 - hist_df['val_acc']
hist_df[['val_err', 'err']].plot.line(ax=axes[0])
hist_df[['val_loss', 'loss']].plot.line(ax=axes[1])
axes[0].set(title="Error during training")
axes[0].legend(labels=["Test", "Training"])
axes[1].set(title="Loss during training")
axes[1].legend(labels=["Test", "Training"])
for ax in axes:
ax.set_xticks(range(hist_df.shape[0]))
ax.set(xlabel="epoch", ylabel="Accuracy / loss")
# return fig, axes
plot_history(results['history'])
In [36]:
all_scores_df = pd.DataFrame.from_dict({'lenet-5': results['err'] * 100}, orient='index')
all_scores_df.columns = ['error']
all_scores_df = fc_scores_df.append(all_scores_df)
all_scores_df = all_scores_df.sort_values('error')
# all_scores_df.sort_values('error').plot.barh()
# results['err']
fig, ax = plt.subplots(1,1, figsize=(6,6))
all_scores_df.plot.barh(width=0.4, ax=ax, legend=None)
ax.set(title="Neural networks test set accuracy", ylabel="Network", xlabel="%age error");
plt.savefig('lenet_scores.png', bbox_inches='tight', dpi=150)
all_scores_df
Out[36]:
In [37]:
def random_sgd(verbose=False):
"""
Generates an SGD optimizer with random values
"""
lr = 10 ** np.random.randint(-6, -3)
momentum = 0.1 * np.random.randint(8, 10)
decay = 10 ** np.random.randint(-5, -3)
nesterov = np.random.uniform() < 0.5
sgd = SGD(lr=lr, momentum=momentum, decay=decay, nesterov=nesterov)
if verbose:
print('sgd: lr={}, momentum={}, decay={}, nesterov={}'.format(lr, momentum, decay, nesterov))
return sgd
# random_sgd(verbose=True)
In [38]:
# Randomize optimizer and run for 100 samples
def best_model(N=100):
"""
Returns the best model after random search for N SGD values
"""
best_result = None
best_acc = 0
for n in range(N):
print('\nIteration {}'.format(n))
sgd_opt = random_sgd()
result = evaluate_model(lenet5_model(), sgd_opt)
current_acc = result['acc']
if current_acc > best_acc:
print('\n-> Updating best model. Current acc: {}, old acc: {}'.format(n, current_acc, best_acc))
best_result = result
best_acc = current_acc
return result
# best_lenet5 = best_model(N=5)
Since [1] was published, new layer types have been invented to improve ease-of training, and reduce overfitting. Let's retrofit the original network with these new improvements, and see how the performance changes.
When creating Dropout layers in between the activation and convolutional layers, it's not clear how many to use, and what the dropout percentage should be. Let's pass in configurations to the model creation method so we can try different combinations later.
In [39]:
from keras.models import Sequential
from keras.layers import MaxPooling2D, Dropout
def lenet5_modern_model(dropout_cnt=0, dropout_val=0.5, bias_init=None, verbose=False):
"""
Creates and returns a lenet5 model with retrofitted modern layers:
- ReLU activations
- Max pooling
- Dropout
"""
# Create the model
model = Sequential()
if bias_init:
bias = bias_init
else:
bias='zeros'
model.add(Conv2D(filters=6, kernel_size=(5, 5), strides=(1, 1),
input_shape=(32, 32, 1), bias_initializer=bias)) # C1
model.add(MaxPooling2D(pool_size=(2, 2))) # S2
model.add(Activation('relu'))
if dropout_cnt >= 1:
model.add(Dropout(dropout_val))
model.add(Conv2D(filters=16, kernel_size=(5, 5), strides=(1, 1),
bias_initializer=bias)) # C3
model.add(MaxPooling2D(pool_size=(2, 2))) # S4
model.add(Activation('relu'))
if dropout_cnt >= 2:
model.add(Dropout(dropout_val))
model.add(Conv2D(filters=120, kernel_size=(5, 5), strides=(1, 1),
bias_initializer=bias)) # C5
model.add(Activation('relu'))
if dropout_cnt >= 3:
model.add(Dropout(dropout_val))
model.add(Flatten())
model.add(Dense(120)) # F6
model.add(Activation('relu'))
if dropout_cnt >= 4:
model.add(Dropout(dropout_val))
model.add(Dense(10))
model.add(Activation('softmax'))
if verbose:
print(model.summary())
return model
lenet5 = lenet5_modern_model(verbose=True)
Let's do a grid search to find out how many dropout layers gives the best performance, and what their percentage should be. We're restricting the state space by adding dropout layers from the first layer onwards, and using the same percentage on each layer.
This is going to take a long time to run!!
In [41]:
%%time
results = dict()
best_dropout = None
min_error = 1.00
RUNS = 1
# Exhaustive grid search of dropout configs
for dropout_cnt in range(1, 4):
for dropout_val in (0.1, 0.2, 0.3, 0.4, 0.5):
print('\nTesting {} runs with {} layer(s) of dropout, {} value .. '.format(RUNS, dropout_cnt, dropout_val))
errors = np.zeros((RUNS,))
for index in range(RUNS): # Run each combination multiple times
model = lenet5_modern_model(dropout_cnt=dropout_cnt, dropout_val=dropout_val)
result = evaluate_model(model,
optimizer=SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True),
cv_split=None, verbose=False)
errors[index] = 1.0 - result['acc']
result_key = (dropout_cnt, dropout_val)
mean, std = errors.mean(), errors.std()
# Update the best settings based on worst case
if mean < min_error:
print("\nUpdating best settings:")
best_dropout = result_key
min_error = mean
results[result_key] = {'mean': mean, 'std': std}
print('\nDropout: {} @ {}, error: {:.4f} ({:.4f} std dev)'.format(*result_key, mean, std))
print(results)
In [42]:
print('Best dropout settings: {}, giving error of: {}'.format(best_dropout, min_error))
In [43]:
new_score_df = pd.DataFrame.from_dict({'lenet-5-dropout-relu': min_error * 100}, orient='index')
new_score_df.columns = ['error']
all_scores_df = all_scores_df.append(new_score_df)
all_scores_df = all_scores_df.sort_values('error')
# all_scores_df.sort_values('error').plot.barh()
# results['err']
In [44]:
fig, ax = plt.subplots(1,1, figsize=(6,6))
all_scores_df.plot.barh(width=0.4, ax=ax, legend=None)
ax.set(title="Neural networks test set accuracy", ylabel="Network", xlabel="%age error");
plt.savefig('lenet_modern_scores.png', bbox_inches='tight', dpi=150)
all_scores_df
Out[44]:
In [45]:
# Shortcut the cross validation above as it takes ages
best_dropout = 3, 0.2
In [50]:
# Create a new evaluation method where the data is passed in
def evaluate_model(model, X_tr, y_tr, X_te, y_te, optimizer, epochs=20, batch=256, cv_split=None, verbose=False):
"""
Wrapper method to create, train and optionally CV, and check performance on test set
"""
if verbose:
print('\nCompiling model')
model.summary()
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
if verbose:
print('\nTraining model')
history = model.fit(X_tr, y_tr, validation_split=cv_split,
epochs=epochs, batch_size=batch, verbose=1 if verbose else 0)
if verbose:
print('\nEvaluating model')
train_score = model.evaluate(X_tr, y_tr, batch_size=batch)
test_score = model.evaluate(X_te, y_te, batch_size=batch)
if verbose:
print('\nTest results: Loss = {:.2f}, Error = {:.2f}'.format(100.0 * test_score[0], 100.0 * (1.0 - test_score[1])))
results = {'model': model, 'history': history.history,
'train_loss': train_score[0], 'train_acc': train_score[1], 'train_err': 1.0 - train_score[1],
'test_loss': test_score[0], 'test_acc': test_score[1], 'test_err': 1.0 - test_score[1],
}
return results
In [51]:
%%time
n_rows = range(5000, 65000, 5000)
results = dict()
for n in n_rows:
print('\nValidating train and test set performance with {} training examples'.format(n))
X_train_sub, y_train_sub = stratified_subsample(X_train, y_train, n)
model = lenet5_modern_model(dropout_cnt=best_dropout[0], dropout_val=best_dropout[1])
result = evaluate_model(model,
X_train_sub, y_train_sub, X_test, y_test,
optimizer=SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True),
cv_split=None, verbose=False)
results[n] = result
train_sub_df = pd.DataFrame.from_dict(results, orient='index')
train_sub_df.index.name="N"
train_sub_df = train_sub_df[['train_err', 'test_err']]
train_sub_df
In [52]:
train_sub_df.plot.line()
Out[52]:
In [53]:
from keras.utils import to_categorical
NUM_CLASSES = 10 # Number of digits (0 to 9)
# Load the data
X_train, y_train, X_test, y_test = load_data()
# Convert single value label into 10-dim array of bools
y_train = to_categorical(y_train, NUM_CLASSES)
y_test = to_categorical(y_test, NUM_CLASSES)
# Resize the images to be centered in 32x32 (instead of 28x28)
X_train = resize_images(X_train, "Resizing train images")
X_test = resize_images(X_test, "Resizing test images")
# Need to add explicit shape of 1 as we have 1 channel for B&W images. This is "channels-last" ordering
X_train, X_test = X_train[:,:,:, np.newaxis], X_test[:,:,:, np.newaxis] # Need explicit single channel
print('Shapes: X_train: {}, y_train: {}, X_test: {}, y_test: {}'.format(X_train.shape, y_train.shape,
X_test.shape, y_test.shape))
In [54]:
from keras.preprocessing.image import ImageDataGenerator
def augmented_plot(images):
"""
Plots original images, and their augmented versions
"""
fig, ax = plt.subplots(2, 3, figsize=(10,6))
for idx, axis in enumerate(ax):
ax[0, idx].imshow(images[idx].squeeze())
# Plot a few augmented values
datagen = ImageDataGenerator(
rotation_range=20,
width_shift_range=0.2,
height_shift_range=0.2,
shear_range=np.pi/8.0
)
for X_batch, y_batch in datagen.flow(X_train[:3], y_train[:3], batch_size=3):
implot(X_batch)
break
In [55]:
def implot(images):
"""
Plots the images on rows of 3
"""
fig, ax = plt.subplots(1, 3, figsize=(10,6))
for idx, axis in enumerate(ax):
ax[idx].imshow(images[idx].squeeze())
implot(X_train[:3])
plt.savefig('noaug_digits.png', bbox_inches='tight', dpi=150)
In [57]:
# Plot a few augmented values
datagen = ImageDataGenerator(
featurewise_center=True,
featurewise_std_normalization=True,
rotation_range=45,
# width_shift_range=0.1,
# height_shift_range=0.1,
shear_range=np.pi/8.0
)
for X_batch, y_batch in datagen.flow(X_train[:3], y_train[:3], batch_size=3):
implot(X_batch)
break
plt.savefig('aug_digits.png', bbox_inches='tight', dpi=150)
In [58]:
# Train model with augmented data
def evaluate_model(model, datagen,
X_tr, y_tr, X_te, y_te,
optimizer, batch=256, epochs=20, verbose=False):
"""
Wrapper method to create, train and optionally CV, and check performance on test set
"""
if verbose:
print('\nCompiling model')
model.summary()
model.compile(optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
if verbose:
print('\nTraining model (no image augmentation)')
# Only difference is now the generator provides flow of images for minibatches
history = model.fit_generator(datagen.flow(X_tr, y_tr, batch_size=batch),
steps_per_epoch=int(X_tr.shape[0] / batch), epochs=epochs,
verbose=1 if verbose else 0)
if verbose:
print('\nEvaluating model')
train_score = model.evaluate(X_tr, y_tr, batch_size=batch)
test_score = model.evaluate(X_te, y_te, batch_size=batch)
if verbose:
print('\nTest results: Loss = {:.2f}, Error = {:.2f}'.format(100.0 * test_score[0], 100.0 * (1.0 - test_score[1])))
results = {'model': model, 'history': history.history,
'train_loss': train_score[0], 'train_acc': train_score[1], 'train_err': 1.0 - train_score[1],
'test_loss': test_score[0], 'test_acc': test_score[1], 'test_err': 1.0 - test_score[1],
}
return results
In [62]:
# Do a run with no augmentation for a baseline
N = 1
datagen_std = ImageDataGenerator(
featurewise_center=True,
featurewise_std_normalization=True
)
datagen_std.fit(X_train)
results = np.zeros((N,))
for n in range(N):
print('\nEvaluating model {} of {}'.format(n+1, N))
result = evaluate_model(lenet5_modern_model(dropout_cnt=best_dropout[0],
dropout_val=best_dropout[1]),
datagen_std,
X_train, y_train, X_test, y_test,
optimizer=SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True),
batch=256, epochs=60,
verbose=False)
results[n] = result['test_err']
error = results.mean()
std = results.std()
print('\n{} runs, mean error: {:.6f}, std dev: {:.6f}'.format(N, error, std))
In [63]:
plot_history(result['history'])
In [65]:
result['history']
Out[65]:
In [ ]:
result['train_err'], result['test_err']
In [67]:
# Do a run with no augmentation for a baseline
N = 1
print('Standardizing images')
datagen_aug = ImageDataGenerator(
featurewise_center=True,
featurewise_std_normalization=True,
rotation_range=45,
# width_shift_range=0.1,
# height_shift_range=0.1,
shear_range=np.pi/8.0
)
datagen_aug.fit(X_train)
print('Training and evaluating model')
result = evaluate_model(lenet5_modern_model(dropout_cnt=best_dropout[0],
dropout_val=best_dropout[1]),
datagen_aug,
X_train, y_train, X_test, y_test,
optimizer=SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True),
batch=256, epochs=20,
verbose=False)
print('\nTrain error: {}, test error: {}'.format(result['train_err'], result['test_err']))
In [ ]:
In [ ]:
In [ ]:
%%time
N = 1
all_results = dict()
image_gen_desc = 'no_aug'
for image_gen in (None, ImageDataGenerator(rotation_range=20,
width_shift_range=0.2,
height_shift_range=0.2,
shear_range=np.pi/4.0)):
# print('Using Image generator: {}'.format(image_gen_desc))
for epoch in range(20, 120, 20):
print('Training for {} epochs'.format(epoch))
results = np.zeros((N,))
for n in range(N):
print('\nEvaluating model {} of {}'.format(n+1, N))
result = evaluate_model(lenet5_modern_model(dropout_cnt=best_dropout[0],
dropout_val=best_dropout[1]),
image_gen,
X_train, y_train, X_test, y_test,
optimizer=SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True),
batch=256, epochs=epoch,
cv_split=None, verbose=True)
results[n] = result['test_err']
error = results.mean()
std = results.std()
print('\n{} runs, {} epochs, mean error: {:.2f}, std dev: {:.2f}'.format(N, epoch, error, std))
all_results[(image_gen_desc, epoch)] = (error, std)
image_gen_desc = 'aug'
print(all_results)
In [ ]:
print(all_results)
In [ ]:
In [ ]:
# Read back the weights of the best performing network and visualize them
In [ ]: