Wayne Nixalo - 2 Jul 2017
Theano RNN / Python RNN / Theano GRU
(from L6 & L7 JNBs)
In [3]:
import theano
import os, sys
sys.path.insert(1, os.path.join('../utils'))
from utils import *
# %matplotlib inline
from __future__ import division, print_function
In [4]:
path = get_file('nietzsche.txt', origin="https://s3.amazonaws.com/text-datasets/nietzsche.txt")
text = open(path).read()
print('corpus length:', len(text))
chars = sorted(list(set(text)))
vocab_size = len(chars) + 1
print('total chars:', vocab_size)
chars.insert(0, "\0")
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))
idx = [char_indices[c] for c in text]
In [13]:
cs = 8
c_in_dat = [[idx[i+n] for i in xrange(0, len(idx)-1-cs, cs)] for n in xrange(cs)]
xs = [np.stack(c[:-2]) for c in c_in_dat]
c_out_dat = [[idx[i+n] for i in xrange(1, len(idx)-cs, cs)] for n in xrange(cs)]
ys = [np.stack(c[:-2]) for c in c_out_dat]
oh_ys = [to_categorical(o, vocab_size) for o in ys]
oh_y_rnn = np.stack(oh_ys, axis=1)
oh_xs = [to_categorical(o, vocab_size) for o in xs]
oh_x_rnn = np.stack(oh_xs, axis=1)
oh_x_rnn.shape, oh_y_rnn.shape
Out[13]:
In [51]:
n_hidden = 256; n_Fac = 42; cs = 8
n_input = vocab_size; n_output = vocab_size
def init_wgts(rows, cols):
scale = math.sqrt(2 / rows)
return shared(normal(scale=scale, size=(rows,cols)).astype(np.float32))
def init_bias(rows):
return shared(np.zeros(rows, dtype=np.float32))
def wgts_and_bias(n_in, n_out):
return init_wgts(n_in, n_out), init_bias(n_out)
def id_and_bias(n):
return shared(np.eye(n, dtype=np.float32)), init_bias(n)
# Theano Variables
t_inp = T.matrix('inp')
t_outp = T.matrix('outp')
t_h0 = T.vector('h0')
lr = T.scalar('lr')
all_args = [t_h0, t_inp, t_outp, lr]
W_h = id_and_bias(n_hidden)
W_x = wgts_and_bias(n_input, n_hidden)
W_y = wgts_and_bias(n_hidden, n_output)
w_all = list(chain.from_iterable([W_h, W_x, W_y]))
def step(x, h, W_h, b_h, W_x, b_x, W_y, b_y):
# Calc hidden activations
h = nnet.relu(T.dot(x, W_x) + b_x + T.dot(h, W_h) + b_h)
# Calc output activations
y = nnet.softmax(T.dot(h, W_y) + b_y)
# Return both -- `flatten()` is workaround: Theano bug
return h, T.flatten(y, 1)
[v_h, v_y], _ = theano.scan(step, sequences=t_inp,
outputs_info=[t_h0, None], non_sequences=w_all)
error = nnet.categorical_crossentropy(v_y, t_outp).sum()
g_all = T.grad(error, w_all)
def upd_dict(wgts, grads, lr):
return OrderedDict({w: w - g*lr for (w,g) in zip(wgts, grads)})
upd = upd_dict(w_all, g_all, lr)
# ready to compile function
fn = theano.function(all_args, error, updates=upd, allow_input_downcast=True)
X = oh_x_rnn
Y = oh_y_rnn
# semi-auto SGD loop:
err = 0.0; l_rate = 0.01
for i in xrange(len(X)):
err += fn(np.zeros(n_hidden), X[i], Y[i], l_rate)
if i % 1000 == 999:
print ("Error: {:.3f}".format(err/1000))
err=0.0
In [5]:
def sigmoid(x): return 1 / (1 + np.exp(-x))
def sigmoid_d(x):
output = sigmoid(x)
return output * (1 - output)
def relu(x): return np.maximum(0., x)
def relu_d(x): return (x > 0.) * 1.
In [6]:
# relu fn test
relu(np.array([3.,-3.])), relu_d(np.array([3.,-3.]))
Out[6]:
In [20]:
# Euclidean distance
def dist(a, b): return pow(a - b, 2)
def dist_d(a, b): return 2 * (a - b)
import pdb
# CrossEntropy
eps = 1e-7
def x_entropy(pred, actual):
return -np.sum(actual * np.log(np.clip(pred, eps, 1 - eps))) # clipping for infs
def x_entropy_d(pred, actual): return -actual / pred
def softmax(x): return np.exp(x) / np.exp(x).sum()
def softmax_d(x):
sm = softmax(x)
res = np.expand_dims(-sm, -1) * sm
res[np.diag_indices_from(res)] = sm * (1 - sm)
return res
Some tests: Check manual answers same as Theano:
In [8]:
test_preds = np.array([0.2, 0.7, 0.1])
test_actuals = np.array([0.,1.,0.])
# Theano
nnet.categorical_crossentropy(test_preds, test_actuals).eval()
Out[8]:
In [9]:
# Manual
x_entropy(test_preds, test_actuals)
Out[9]:
In [10]:
test_inp = T.dvector()
test_out = nnet.categorical_crossentropy(test_inp, test_actuals)
test_grad = theano.function([test_inp], T.grad(test_out, test_inp))
# Theano
test_grad(test_preds)
Out[10]:
In [11]:
# Manual
x_entropy_d(test_preds, test_actuals)
Out[11]:
In [22]:
loss = x_entropy
loss_d = x_entropy_d
act = relu
act_d = relu_d
pre_pred = random(oh_x_rnn[0][0].shape)
preds = softmax(pre_pred)
actual = oh_x_rnn[0][0]
np.allclose(softmax_d(pre_pred).dot(loss_d(preds, actual)), preds - actual)
Out[22]:
In [28]:
print (softmax(test_preds))
print (nnet.softmax(test_preds).eval())
print()
test_out = T.flatten(nnet.softmax(test_inp))
test_grad = theano.function([test_inp], theano.gradient.jacobian(test_out, test_inp))
print(test_grad(test_preds))
print()
print(softmax_d(test_preds))
Also need to define a scan function. Not concerned w/ running things in parallel here, so it's simpe to implement:
scan(..)
: go thru a sequences, 1 step at a time; calling a function on ea. element of the sequence. Each time, the function gets 2 things: the next elem of the seq as well as the previous result of the call.
NOTE: Theano doesn't use a for-loop
, it creates a tree that simultaneously & gradually combines everything together.
In [29]:
# using relu as activation
act = relu
act_d = relu_d
# using crossentropy as loss function
loss = x_entropy
loss_d = x_entropy_d
def scan(fn, start, seq):
res = []
prev = start
for s in seq:
app = fn(prev, s)
res.append(app)
prev = app
return res
# test: scan on add 2 things together, on integers 0..4
scan(lambda prev, curr: prev + curr, 0, range(5))
Out[29]:
In [34]:
# 8-char seqs 1Hencoded
inp = oh_x_rnn
# 8-char seqs 1Henc moved aXs by 1
outp = oh_y_rnn
n_input = vocab_size; n_output = vocab_size; n_hidden = 256
inp.shape, outp.shape
Out[34]:
The functino to do a single forward pass of an RNN, for a single character:
In [31]:
# Forward Pass
def one_char(prev, item):
# Previous state:
tot_loss, pre_hidden, pre_pred, hidden, ypred = prev
# Current inputs and output
x, y = item
pre_hidden = np.dot(x, w_x) + np.dot(hidden, w_h)
hidden = act(pre_hidden)
pre_pred = np.dot(hidden, w_y)
ypred = softmax(pre_pred)
return (
# Keep track of loss so we can report it
tot_loss + loss(ypred, y),
# Used in Backprop
pre_hidden, pre_pred,
# Used in next iteration
hidden,
# to provide predictions
ypred)
# We use scan to apply the above to a whole sequences of characters:
def get_chars(n): return zip(inp[n], outp[n])
# scan thru all chars in nth phrase, the input & output, calling some function
def one_fwd(n): return scan(one_char, (0,0,0,np.zeros(n_hidden),0), get_chars(n))
Now we can define the backward step. We use a loop to go through every element of the sequence. The derivatives are applying the chain rule to each step, and accumulating the gradients across the sequences. In the cell below are all the steps necessary to do BackProp thru an RNN
In [40]:
# "Columnify" a vector
def col(x): return x[:,newaxis]
def one_bkwd(args, n):
global w_x, w_y, w_h
i = inp[n] # 8x86 # grab one of our inputs
o = outp[n] # 8x86 # grab one of our outputs
d_pre_hidden = np.zeros(n_hidden) # 256
for p in reversed(range(len(i))): # and go backwards thru ea of the 8 chars, end->start
totloss, pre_hidden, pre_pred, hidden, ypred = args[p]
x = i[p] # 86
y = o[p] # 86
d_pre_pred = softmax_d(pre_pred).dot(loss_d(ypred, y)) # 86
d_pre_hidden = (np.dot(d_pre_hidden, w_h.T)
+ np.dot(d_pre_pred, w_y.T)) * act_d(pre_hidden) # 256
# <Now we can update our weights>
# d(loss)/d(w_y) = d(loss)/d(pre_pred) * d(pre_pred)/d(w_y)
w_y -= col(hidden) * d_pre_pred * alpha # alpha = l_rate
# d(loss)/d(w_h) = d(loss)/d(pre_hidden[p-1]) * d(pre_hidden[p-1])/d(w_h)
if (p > 0): w_h -= args[p-1][3].dot(d_pre_hidden) * alpha
w_x -= col(x) * d_pre_hidden * alpha
return d_pre_hidden
Derivative of Loss gets us from loss to output triangle. Deriv of softmax gets us from triangle to other side of the activation function (blue arrow in Lecture 7 ~1:38:59) That's where
d_pre_pred
gets us to.We want to get to the other side of the hidden matrix. The Deriv of a MatxMul is the Mul w/ the Transp of that Matx. wrt hidden:
(np.dot(d_pre_hidden, w_h.T)
The hidden (circle) has 2 arrows going in & out: so we have to add those derivs together. wrt outputs:
+ np.dot(d_pre_pred, w_y.T))
Finally need to undo the activation: multiply by deriv of activn fn:
* act_d(pre_hidden)
That's the chain-rule that gets us all the way back to the input-side of the green arrow.
Undo the multiplication by the hidden state to get the derivative wrt the weights; for that the 'Columnify' function does: turning a vector into a column.
w_y -= col(hidden) * d_pre_pred * alpha
which creates the new output weights. The new hidden weights are essentially the same: learning rate * derivative we just calculated & we have to undo its weights..
w_h -= args[p-1][3].dot(d_pre_hidden) * alpha
The new new input weights:
w_x -= col(x) * d_pre_hidden * alpha
Now we can set up our initial weight matrices. Note that we're not using bias at all in this example iot keep things simpler.
In [41]:
scale = math.sqrt(2 / n_input)
w_x = normal(scale=scale, size=(n_input, n_hidden))
w_y = normal(scale=scale, size=(n_hidden, n_output))
w_h = np.eye(n_hidden, dtype=np.float32) # I matx to initlz hidden weights
The loop looks much like the Theano loop in the prevs section, except that we have to call the backwards step manually:
In [43]:
overallError = 0
alpha = 0.0001
for n in range(10000):
# run 1 F Step
res = one_fwd(n)
overallError += res[-1][0]
# run 1 B Step
deriv = one_bkwd(res, n)
# print update
if (n % 1000 == 999):
print ("Error:{:.4f}; Gradient:{:.5f}".format(
overallError/1000, np.linalg.norm(deriv)))
overallError = 0
# Currently, the Forward Step is passing to scan an initial state of zeros every
# update. So it's resetting state on each step. To run statefully: just edit
# `one_fwd(n)` to return its final state and keep track of it to feed back into it
# the next time thru the loop. This however, will risk exploding Gradients &
# Activations unless implemented as an LSTM or GRU.
In [45]:
# you can implement a GRU in Keras by just replacing `SimpleRNN` with `GRU`:
model = Sequential([
GRU(n_hidden, return_sequences=True, input_shape=(cs, vocab_size),
activation='relu', inner_init='identity'),
TimeDistributed(Dense(vocab_size, activation='softmax')),
])
model.compile(loss='categorical_crossentropy', optimizer=Adam())
model.fit(oh_x_rnn, oh_y_rnn, batch_size = 64, nb_epoch=8)
Out[45]:
In [46]:
def get_nexts_oh(inp):
idxs = np.array([char_indices[c] for c in inp])
arr = to_categorical(idxs, vocab_size)
p = model.predict(arr[np.newaxis,:])[0]
print(list(inp))
return [chars[np.argmax(o)] for o in p]
get_nexts_oh(' this is')
Out[46]:
In [57]:
W_h = id_and_bias(n_hidden)
W_x = init_wgts(n_input, n_hidden)
W_y = wgts_and_bias(n_hidden, n_output)
# reset gate NN weights
rW_h = init_wgts(n_hidden, n_hidden)
rW_x = wgts_and_bias(n_input, n_hidden)
# update gate NN weights
uW_h = init_wgts(n_hidden, n_hidden)
uW_x = wgts_and_bias(n_input, n_hidden)
w_all = list(chain.from_iterable([W_h, W_y, uW_x, rW_x]))
w_all.extend([W_x, uW_h, rW_h])
Definition of a Gate - it's just a sigmoid applied to the addition of the Dot Products of the input vectors.
Input: inputs, hidden state, hidden state weights, input weights, biases
In [58]:
def gate(x, h, W_h, W_x, b_x):
return nnet.sigmoid(T.dot(x, W_x) + b_x + T.dot(h, W_h))
The Step function is nearly identical to before, except that it multiplies the hidden state by the reset gate, and updates the hidden state based on the update gate.
In [59]:
def step(x, h, W_h, b_h, W_y, b_y,
uW_x, ub_x, rW_x, rb_x, W_x, uW_h, rW_h):
reset = gate(x, h, rW_h, rW_x, rb_x) # reset gate is little NN
update = gate(x, h, uW_h, uW_x, ub_x)
h_new = gate(x, h * reset, W_h, W_x, b_h) # hidden states X by reset gate
h = update * h + (1 - update) * h_new # actual new hidden state
y = nnet.softmax(T.dot(h, W_y) + b_y) # output = hidden state X hidden weight matrix + the biases
return h, T.flatten(y, 1)
# update gate decides how much h_new will replace h
Because there're these reset & update gates, the GRU-NN has the ability to learn these special sets of weights (rW_h, uW_h, ...)
, to make sure it throws away or keeps more state when that's a good idea.
Everything here onwards is identical to the simple Theano RNN.
In [60]:
[v_h, v_y], _ = theano.scan(step, sequences=t_inp,
outputs_info=[t_h0, None], non_sequences=w_all)
error = nnet.categorical_crossentropy(v_y, t_outp).sum()
g_all = T.grad(error, w_all)
upd = upd_dict(w_all, g_all, lr)
fn = theano.function(all_args, error, updates=upd, allow_input_downcast=True)
err = 0.0; l_rate = 0.1
for i in xrange(len(X)):
err += fn(np.zeros(n_hidden), X[i], Y[i], l_rate)
if i % 1000 == 999:
l_rate *= 0.95
print("Error:{:.2f}".format(err/1000))
err = 0.0
In [62]:
W = (shared(np.concatenate([np.eye(n_hidden), normal(size=(n_input, n_hidden))])
.astype(np.float32)), init_bias(n_hidden))
rW = wgts_and_bias(n_input + n_hidden, n_hidden)
uW = wgts_and_bias(n_input + n_hidden, n_hidden)
W_y = wgts_and_bias(n_hidden, n_output)
w_all = list(chain.from_iterable([W, W_y, uW, rW]))
In [63]:
def gate(m, W, b): return nnet.sigmoid(T.dot(m, W) + b)
def step(x, h, W, b, W_y, b_y, uW, ub, rW, rb):
m = T.concatenate([h, x])
reset = gate(m, rW, rb)
update = gate(m, uW, ub)
m = T.concatenate([h * reset, x])
h_new = gate(m, W, b)
h = update * h + (1 - update) * h_new
y = nnet.softmax(T.dot(h, W_y) + b_y)
return h, T.flatten(y, 1)
In [64]:
[v_h, v_y], _ = theano.scan(step, sequences=t_inp,
outputs_info=[t_h0, None], non_sequences=w_all)
In [65]:
def upd_dict(wgts, grads, lr):
return OrderedDict({w:w-g*lr for (w,g) in zip(wgts, grads)})
In [66]:
error = nnet.categorical_crossentropy(v_y, t_outp).sum()
g_all = T.grad(error, w_all)
upd = upd_dict(w_all, g_all, lr)
fn = theano.function(all_args, error, updates=upd, allow_input_downcast=True)
In [67]:
err=0.0; l_rate=0.01
for i in xrange(len(X)):
err += fn(np.zeros(n_hidden), X[i], Y[i], l_rate)
if i % 1000 == 999:
print ("Error:{:.2f}".format(err/1000))
err=0.0
In [68]:
f_y = theano.function([t_h0, t_inp], v_y, allow_input_downcast=True)
pred = np.argmax(f_y(np.zeros(n_hidden), X[6]), axis=1)
act = np.argmax(X[6], axis=1)
actual = [indices_char[o] for o in act]
prediction = [indices_char[o] for o in pred]
print(actual)
print(prediction)
In [ ]: