In [1]:
import sys
import numpy as np
import impl.RNN as rnn
import impl.solver as solver
In [2]:
with open('data/text_data/japan.txt', 'r') as f:
txt = f.read()
X = []
y = []
char_to_idx = {char: i for i, char in enumerate(set(txt))}
idx_to_char = {i: char for i, char in enumerate(set(txt))}
X = np.array([char_to_idx[x] for x in txt])
y = [char_to_idx[x] for x in txt[1:]]
y.append(char_to_idx['.'])
y = np.array(y)
In [3]:
vocab_size = len(char_to_idx)
# hyper parameters
time_step = 30
n_iter = 13000 # epochs
alpha = 1e-3
print_after = 1000
H = 64
In [8]:
import numpy as np
import impl.loss as loss_fun
import impl.layer as l
import impl.regularization as reg
import impl.utils as util
import impl.NN as nn
class GRU3(nn.NN):
def __init__(self, D, H, char2idx, idx2char):
self.D = D
self.H = H
self.char2idx = char2idx
self.idx2char = idx2char
self.vocab_size = len(char2idx)
super().__init__(D, D, H, None, None, loss='cross_ent', nonlin='relu')
def initial_state(self):
return np.zeros((1, self.H))
def forward(self, X, h):
m = self.model
Wh, Wy = m['Wh'], m['Wy']
bh, by = m['bh'], m['by']
X_one_hot = X.copy()
h_old = h.copy()
# input: concat: [h, x]
X = np.column_stack((h_old, X_one_hot))
hh, hh_cache = l.fc_forward(X, Wh, bh)
# gate: h_prob
hz, hz_sigm_cache = l.sigmoid_forward(hh)
# signal: h_pred
hh, hh_tanh_cache = l.tanh_forward(hh)
# output: h_next and y_pred
h = h_old + hz * (hh - h_old)
y, y_cache = l.fc_forward(h, Wy, by)
cache = h_old, X, hh_cache, hz, hz_sigm_cache, hh, hh_tanh_cache, h, y_cache
return y, h, cache
def backward(self, dy, dh_next, cache):
h_old, X, hh_cache, hz, hz_sigm_cache, hh, hh_tanh_cache, h, y_cache = cache
# output: h_next and y_pred
dh, dWy, dby = l.fc_backward(dy, y_cache)
dh += dh_next
dh_old1 = (1. - hz) * dh
# signal: h_pred
dhh = hz * dh
dhh = l.tanh_backward(dhh, hh_tanh_cache)
# gate: h_prob
dhz = (hh - h_old) * dh
dhz = l.sigmoid_backward(dhz, hz_sigm_cache)
# input
dhh += dhz
dX, dWh, dbh = l.fc_backward(dhh, hh_cache)
dh_old2 = dX[:, :self.H]
dX = dX[:, self.H:]
# concat: [h, x]
dh_next = dh_old1 + dh_old2
grad = dict(Wh=dWh, Wy=dWy, bh=dbh, by=dby)
return dX, dh_next, grad
def _init_model(self, D, C, H):
Z = H + D
self.model = dict(
Wh=np.random.randn(Z, H) / np.sqrt(Z / 2.),
Wy=np.random.randn(H, D) / np.sqrt(D / 2.),
bh=np.zeros((1, H)),
by=np.zeros((1, D))
)
def train_step_fwd(self, X_train, h):
ys, caches = [], []
for X in X_train:
X_one_hot = np.zeros(self.D)
X_one_hot[X] = 1.
x = X_one_hot.reshape(1, -1)
y, h, cache = self.forward(x, h)
ys.append(y)
caches.append(cache)
return ys, caches
def train_step_bwd(self, y_train, ys, caches):
# Error
loss, dys = 0.0, []
for y_pred, y in zip(ys, y_train):
loss += loss_fun.cross_entropy(self.model, y_pred, y, lam=0)/ y_train.shape[0]
dy = loss_fun.dcross_entropy(y_pred, y)
dys.append(dy)
# Grads
dh_next = np.zeros((1, self.H))
grads = {key: np.zeros_like(val) for key, val in self.model.items()}
for t in reversed(range(len(dys))):
dX, dh_next, grad = self.backward(dys[t], dh_next, caches[t])
for k in grad.keys():
grads[k] += grad[k]
# # Clipping grads for exploding grad problems
# for key, val in grads.items():
# grads[key] = np.clip(val, -5., 5.)
return grads, loss
In [10]:
net = GRU3(D=vocab_size, H=H, char2idx=char_to_idx, idx2char=idx_to_char)
In [11]:
import numpy as np
import impl.utils as util
import impl.constant as c
import copy
from sklearn.utils import shuffle as skshuffle
def get_minibatch(X, y, minibatch_size, shuffle=True):
minibatches = []
if shuffle:
X, y = skshuffle(X, y)
for i in range(0, X.shape[0], minibatch_size):
X_mini = X[i:i + minibatch_size]
y_mini = y[i:i + minibatch_size]
minibatches.append((X_mini, y_mini))
return minibatches
def adam_rnn(nn, X_train, y_train, alpha=0.001, mb_size=256, n_iter=2000, print_after=100):
minibatches = get_minibatch(X_train, y_train, mb_size, shuffle=False)
idx = 0
state = nn.initial_state()
smooth_loss = -np.log(1.0 / len(set(X_train)))
M = {k: np.zeros_like(v) for k, v in nn.model.items()}
R = {k: np.zeros_like(v) for k, v in nn.model.items()}
beta1 = .9
beta2 = .999
for iter in range(1, n_iter + 1):
t = iter
if idx >= len(minibatches):
idx = 0
state = nn.initial_state()
X_mini, y_mini = minibatches[idx]
idx += 1
if iter % print_after == 0:
print('Iter-{} loss: {:.4f}'.format(iter, smooth_loss))
# # Testing can be completed once we make sure the training is done and is validated.
# sample = nn.test_step_fwd(X_mini[0], state)
# print(sample)
ys, caches = nn.train_step_fwd(X_mini, state)
grads, loss = nn.train_step_bwd(y_mini, ys, caches)
smooth_loss = 0.999 * smooth_loss + 0.001 * loss
for k in grads.keys(): #key, value: items
M[k] = util.exp_running_avg(M[k], grads[k], beta1)
R[k] = util.exp_running_avg(R[k], grads[k]**2, beta2)
m_k_hat = M[k] / (1. - beta1**(t))
r_k_hat = R[k] / (1. - beta2**(t))
nn.model[k] -= alpha * m_k_hat / (np.sqrt(r_k_hat) + c.eps)
return nn
In [12]:
adam_rnn(nn=net, X_train=X, y_train=y, alpha=alpha, mb_size=time_step, n_iter=n_iter,
print_after=print_after)
Out[12]:
In [ ]: