In [14]:
# Data
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
import impl.utils as utils
# Dataset preparation and pre-processing
mnist = input_data.read_data_sets('data/MNIST_data/', one_hot=False)
X_train, y_train = mnist.train.images, mnist.train.labels
X_val, y_val = mnist.validation.images, mnist.validation.labels
X_test, y_test = mnist.test.images, mnist.test.labels
# y_test.shape, y_val.shape, y_train.shape
M, D, C = X_train.shape[0], X_train.shape[1], y_train.max() + 1
# M, D, C
X_train, X_val, X_test = utils.prepro(X_train, X_val, X_test)
# X_train.shape, X_val.shape, X_test.shape
# # if net_type == 'cnn':
# img_shape = (1, 28, 28)
# X_train = X_train.reshape(-1, *img_shape)
# X_val = X_val.reshape(-1, *img_shape)
# X_test = X_test.reshape(-1, *img_shape)
# X_train.shape, X_val.shape, X_test.shape
In [29]:
import impl.loss as loss_fun
import impl.layer as l
import impl.regularization as reg
import impl.utils as util
import impl.NN as nn
class FFNN(nn.NN):
def __init__(self, D, C, H, lam=1e-3, p_dropout=.8, loss='cross_ent', nonlin='relu'):
super().__init__(D, C, H, lam, p_dropout, loss, nonlin)
def _init_model(self, D, C, H):
self.model = dict(
W1=np.random.randn(D, H) / np.sqrt(D / 2.),
W2=np.random.randn(H, H) / np.sqrt(H / 2.),
W3=np.random.randn(H, C) / np.sqrt(H / 2.),
b1=np.zeros((1, H)),
b2=np.zeros((1, H)),
b3=np.zeros((1, C)),
gamma1=np.ones((1, H)),
gamma2=np.ones((1, H)),
beta1=np.zeros((1, H)),
beta2=np.zeros((1, H))
)
self.bn_caches = dict(
bn1_mean=np.zeros((1, H)),
bn2_mean=np.zeros((1, H)),
bn1_var=np.zeros((1, H)),
bn2_var=np.zeros((1, H))
)
def forward(self, X, train=False):
gamma1, gamma2 = self.model['gamma1'], self.model['gamma2']
beta1, beta2 = self.model['beta1'], self.model['beta2']
u1, u2 = None, None
bn1_cache, bn2_cache = None, None
# First layer
h1, h1_cache = l.fc_forward(X, self.model['W1'], self.model['b1'])
bn1_cache = (self.bn_caches['bn1_mean'], self.bn_caches['bn1_var'])
h1, bn1_cache, run_mean, run_var = l.bn_forward(h1, gamma1, beta1, bn1_cache, train=train)
h1, nl_cache1 = self.forward_nonlin(h1)
self.bn_caches['bn1_mean'], self.bn_caches['bn1_var'] = run_mean, run_var
if train: h1, u1 = l.dropout_forward(h1, self.p_dropout)
# Second layer
h2, h2_cache = l.fc_forward(h1, self.model['W2'], self.model['b2'])
bn2_cache = (self.bn_caches['bn2_mean'], self.bn_caches['bn2_var'])
h2, bn2_cache, run_mean, run_var = l.bn_forward(h2, gamma2, beta2, bn2_cache, train=train)
h2, nl_cache2 = self.forward_nonlin(h2)
self.bn_caches['bn2_mean'], self.bn_caches['bn2_var'] = run_mean, run_var
if train: h2, u2 = l.dropout_forward(h2, self.p_dropout)
# Third layer
score, score_cache = l.fc_forward(h2, self.model['W3'], self.model['b3'])
cache = (X, h1_cache, h2_cache, score_cache, nl_cache1, nl_cache2, u1, u2, bn1_cache, bn2_cache)
return score, cache
def cross_entropy(self, y_pred, y_train):
m = y_pred.shape[0]
prob = util.softmax(y_pred)
log_like = -np.log(prob[range(m), y_train])
data_loss = np.sum(log_like) / m
return data_loss
def dcross_entropy(self, y_pred, y_train):
m = y_pred.shape[0]
grad_y = util.softmax(y_pred)
grad_y[range(m), y_train] -= 1.0
grad_y /= m
return grad_y
def loss_function(self, y, y_train):
loss = self.cross_entropy(y, y_train)
dy = self.dcross_entropy(y, y_train)
return loss, dy
def backward(self, dy, cache):
X, h1_cache, h2_cache, score_cache, nl_cache1, nl_cache2, u1, u2, bn1_cache, bn2_cache = cache
# Third layer
dh2, dW3, db3 = l.fc_backward(dy, score_cache)
dW3 += reg.dl2_reg(self.model['W3'], self.lam)
dh2 = self.backward_nonlin(dh2, nl_cache2)
dh2 = l.dropout_backward(dh2, u2)
dh2, dgamma2, dbeta2 = l.bn_backward(dh2, bn2_cache)
# Second layer
dh1, dW2, db2 = l.fc_backward(dh2, h2_cache)
dW2 += reg.dl2_reg(self.model['W2'], self.lam)
dh1 = self.backward_nonlin(dh1, nl_cache1)
dh1 = l.dropout_backward(dh1, u1)
dh1, dgamma1, dbeta1 = l.bn_backward(dh1, bn1_cache)
# First layer
dX, dW1, db1 = l.fc_backward(dh1, h1_cache)
dW1 += reg.dl2_reg(self.model['W1'], self.lam)
grad = dict(
W1=dW1, W2=dW2, W3=dW3, b1=db1, b2=db2, b3=db3, gamma1=dgamma1,
gamma2=dgamma2, beta1=dbeta1, beta2=dbeta2
)
return dX, grad
def test(self, X):
y_logit, cache = self.forward(X, train=False)
y_prob = util.softmax(y_logit)
if self.mode == 'classification':
return np.argmax(y_prob, axis=1)
else: # self.mode == 'regression'
return np.round(y_logit)
In [30]:
# SGD
# import numpy as np
import impl.utils as util
import impl.constant as c
import copy
from sklearn.utils import shuffle as skshuffle
def get_minibatch(X, y, minibatch_size, shuffle=True):
minibatches = []
if shuffle:
X, y = skshuffle(X, y)
for i in range(0, X.shape[0], minibatch_size):
X_mini = X[i:i + minibatch_size]
y_mini = y[i:i + minibatch_size]
minibatches.append((X_mini, y_mini))
return minibatches
def adam(nn, X_train, y_train, val_set=None, alpha=0.001, mb_size=256, n_iter=2000, print_after=100):
M = {k: np.zeros_like(v) for k, v in nn.model.items()}
R = {k: np.zeros_like(v) for k, v in nn.model.items()}
beta1 = .9
beta2 = .999
minibatches = get_minibatch(X_train, y_train, mb_size)
if val_set:
X_val, y_val = val_set
for iter in range(1, n_iter + 1):
t = iter
idx = np.random.randint(0, len(minibatches))
X_mini, y_mini = minibatches[idx]
# grad, loss = nn.train_step(X_mini, y_mini)
# def train_step(self, X_train, y_train):
# """
# Single training step over minibatch: forward, loss, backprop
# """
y, cache = nn.forward(X_mini, train=True)
loss, dy = nn.loss_function(y, y_mini)
dX, grad = nn.backward(dy, cache)
# return grad, loss
if iter % print_after == 0:
if val_set:
val_acc = util.accuracy(y_val, nn.test(X_val))
print('Iter-{} training loss: {:.4f} validation accuracy: {:4f}'.format(iter, loss, val_acc))
for k in grad:
M[k] = util.exp_running_avg(M[k], grad[k], beta1)
R[k] = util.exp_running_avg(R[k], grad[k]**2, beta2)
m_k_hat = M[k] / (1. - beta1**(t))
r_k_hat = R[k] / (1. - beta2**(t))
nn.model[k] -= alpha * m_k_hat / (np.sqrt(r_k_hat) + c.eps)
return nn
In [31]:
# Hyper-parameters
n_iter = 20 # number of epochs
alpha = 1e-3 # learning_rate
mb_size = 64 # width, timestep for sequential data or minibatch size
# num_layers = 5 # depth
print_after = 1 # print loss for train, valid, and test
In [32]:
# Train, valid, and test
net = FFNN(C=C, D=D, H=8, lam=1e-3, p_dropout=0.95)
net = adam(nn=net, X_train=X_train, y_train=y_train, val_set=(X_val, y_val), mb_size=mb_size, alpha=alpha,
n_iter=n_iter, print_after=print_after)
y_pred = net.predict(X_test)
# accs[k] = np.mean(y_pred == y_test)
accs = np.mean(y_pred == y_test)
print()
print('Test Mean accuracy: {:.4f}, std: {:.4f}'.format(accs.mean(), accs.std()))
In [ ]: