In [1]:
# Just various neural networks and associated techniques in a modular fashion utilizing matrix notation.
# Nothing groundbreaking; just for exercising.
In [2]:
import numpy as np
In [14]:
SIGMA_INIT = 0.001
class Activation(object):
def __init__(self, f, f_deriv):
self.f = f
self.f_deriv = f_deriv
class Layer(object):
def __init__(self, n, activation, bias):
self.n = n
self.W = None
self.act = activation
self.bias = bias
class Loss(object):
def __init__(self, loss_function, loss_function_deriv):
self.f = loss_function
self.f_deriv = loss_function_deriv
class Trainer(object):
def __init__(self, learning_rate):
self.lr = learning_rate
def update(self):
raise Exception("Use implementing subclass.")
class SGD(Trainer):
def __init__(self, learning_rate, momentum):
super(SGD, self).__init__(learning_rate)
self.momentum = momentum
def update(self):
for i, layer in enumerate(self.model.layers):
if self.deltas[i] != None:
layer.W -= self.momentum * self.deltas[i]
# consider bias
imax = layer.W.shape[0]
delta = self.lr * np.dot(layer.deltas[:imax], layer._in.T)
delta /= layer.deltas.shape[1]
layer.W -= delta
self.deltas[i] = delta
# TODO: Change to in place operations for optimization where possible.
class Model(object):
def __init__(self, n_in):
self.n_in = n_in
self.layers = []
self.loss = None
self.trainer = None
def add(self, layer):
self.layers.append(layer)
def prepare(self, loss, trainer):
self.nl = len(self.layers)
assert self.nl > 0
n_in = self.n_in
for i in xrange(self.nl):
layer = self.layers[i]
if layer.bias:
n_in += 1
layer.W = SIGMA_INIT * np.random.randn(layer.n, n_in)
n_in = layer.n
self.loss = loss
self.trainer = trainer
trainer.model = self
trainer.deltas = [None for i in xrange(len(self.layers))]
def forward(self, X):
buf = X.T
for i in xrange(self.nl):
layer = self.layers[i]
if layer.bias:
buf = np.append(buf, np.ones((1, buf.shape[1])), axis=0)
layer._in = buf.copy()
buf = np.dot(layer.W, buf)
layer._out = buf.copy()
buf = layer.act.f(buf)
return buf.T
def backward(self, Y_pred, Y_target):
assert self.loss
assert self.trainer
prev = None
for i in xrange(self.nl - 1, -1, -1):
layer = self.layers[i]
if not prev:
err = self.loss.f_deriv(Y_pred, Y_target).T
else:
err = np.dot(prev.W.T, prev.deltas)
# consider bias
if layer._out.shape[0] < err.shape[0]:
out = np.append(layer._out, np.ones((1, err.shape[1])), axis=0)
else:
out = layer._out
err *= layer.act.f_deriv(out)
err = np.clip(err, -500, 500)
layer.deltas = err.copy()
prev = layer
def _forward_backward(self, X, Y_target):
Y_pred = self.forward(X)
self.backward(Y_pred, Y_target)
self.trainer.update()
def train_epoch(self, X, Y, batch_size=1, shuffle=True):
if shuffle:
p = np.random.permutation(X.shape[0])
X = X[p]
Y = Y[p]
if batch_size < 1:
batch_size = X.shape[0]
for i in xrange(0, X.shape[0], batch_size):
self._forward_backward(X[i:min(i+batch_size,X.shape[0])], Y[i:min(i+batch_size,X.shape[0])])
In [21]:
# MLP - Regression
N_IN = 2
N_HIDDEN = 4
N_OUT = 1
N_SAMPLES = 2000
LEARNING_RATE = 0.05
MOMENTUM = 0.9
N_EPOCHS = 300
BATCH_SIZE = 1
X = np.random.rand(N_SAMPLES, N_IN) * 0.5
Y = np.zeros((N_SAMPLES, N_OUT))
Y = np.sum(X, axis=1)**2
Y = Y[:,np.newaxis]
print Y.shape
print X[0],Y[0],Y[0]-(X[0,0]+X[0,1])**2
thresh = int(0.8 * N_SAMPLES)
X_train = X[:thresh]
Y_train = Y[:thresh]
X_test = X[thresh:]
Y_test = Y[thresh:]
def sigmoid(x):
val = 1. / (1. + np.exp(-x))
return np.clip(val, -500, 500)
logistic = Activation(lambda X: sigmoid(X), lambda Y: sigmoid(Y) * (1. - sigmoid(Y)))
identity = Activation(lambda X: X, lambda Y: Y)
mse = Loss(
lambda Y_pred, Y_target: np.sum((Y_pred - Y_target)**2, axis=1),
lambda Y_pred, Y_target: Y_pred - Y_target
)
sgd = SGD(LEARNING_RATE, MOMENTUM)
hidden = Layer(N_HIDDEN, logistic, bias=True)
hidden2 = Layer(N_HIDDEN/2, logistic, bias=True)
output = Layer(N_OUT, identity, bias=False)
model = Model(N_IN)
model.add(hidden)
model.add(hidden2)
model.add(output)
model.prepare(loss=mse, trainer=sgd)
for i in range(N_EPOCHS):
model.train_epoch(X_train, Y_train, batch_size=BATCH_SIZE, shuffle=True)
Y_train_pred = model.forward(X_train)
Y_test_pred = model.forward(X_test)
if not (i+1) % 10:
print "Epoch %5i" % (i+1), "-", "Train Loss: %5.3f" % np.sum(mse.f(Y_train_pred, Y_train), axis=0), \
"Test Loss: %5.3f" % np.sum(mse.f(Y_test_pred, Y_test), axis=0)
print X_test[:5]
print Y_test[:5]
Y_pred = model.forward(X_test[:5])
print Y_pred
print X_train[:5]
print Y_train[:5]
Y_pred = model.forward(X_train[:5])
print Y_pred
In [5]:
# MLP - Classification
#TODO: logistic layer, softmax and classification examples
In [6]:
#TODO: RNN and BPTT
In [7]:
#TODO: LSTM
In [8]:
#TODO: CNN
In [9]:
#some other things: droput, regularization, batch normalization, weight decay, momentum, stopping criteria for trainer,
# automatic minibatch sizing, ...
#out of scope: computational graph