In [3]:
# Data
import numpy as np

# if __name__ == '__main__':
with open('data/text_data/japan.txt', 'r') as f:
    txt = f.read()

    X = []
    y = []

    char_to_idx = {char: i for i, char in enumerate(set(txt))}
    idx_to_char = {i: char for i, char in enumerate(set(txt))}
    
    X = [char_to_idx[x] for x in txt]
    X = np.array(X)
    y = [char_to_idx[x] for x in txt[1:]]
    y.append(char_to_idx['.'])
    y = np.array(y)

# # Data exploration
# X.shape, y.shape, X, y, txt.split()[:2], 
# # set(txt), 
# # for val, key in enumerate(set(txt)):
# #     print(val, key)
# val2char = {val: key for val, key in enumerate(set(txt))}
# # val2char

In [4]:
# Model
import impl.layer as l

class RNN:

    def __init__(self, D, H, L, char2idx, idx2char):
        self.D = D
        self.H = H
        self.L = L
        self.char2idx = char2idx
        self.idx2char = idx2char
        self.vocab_size = len(char2idx)
        self.losses = {'train':[], 'smooth train':[]}
        
        # Model parameters
        m = dict(
            Wxh=np.random.randn(D, H) / np.sqrt(D / 2.),
            Whh=np.random.randn(H, H) / np.sqrt(H / 2.),
            Why=np.random.randn(H, D) / np.sqrt(H / 2.),
            bh=np.zeros((1, H)),
            by=np.zeros((1, D))
            )
        self.model = []
        for _ in range(self.L):
            self.model.append(m)
            
    def initial_state(self):
        return np.zeros((1, self.H))

    def forward(self, X, h, m):
        Wxh, Whh, Why = m['Wxh'], m['Whh'], m['Why']
        bh, by = m['bh'], m['by']

        hprev = h.copy()
    
        h = X @ Wxh + hprev @ Whh + bh
        h, h_cache = l.tanh_forward(h)
        
        y, y_cache = l.fc_forward(h, Why, by)
        cache = X, Wxh, hprev, Whh, h_cache, y_cache

        return y, h, cache

    def backward(self, dy, dh, cache):
        X, Wxh, hprev, Whh, h_cache, y_cache = cache

        dh_next = dh.copy()
        
        # Hidden to output gradient
        dh, dWhy, dby = l.fc_backward(dy, y_cache)
        dh += dh_next
        dby = dby.reshape((1, -1))

        # tanh
        dh = l.tanh_backward(dh, h_cache)

        # Hidden gradient
        dbh = dh
        dWhh = hprev.T @ dh
        dWxh = X.T @ dh
        
        dX = dh @ Wxh.T
        dh = dh @ Whh.T

        grad = dict(Wxh=dWxh, Whh=dWhh, Why=dWhy, bh=dbh, by=dby)
        
        return dX, dh, grad

    def train_forward(self, X_train, h):
        ys, caches = [], []
        h_init = h.copy()
        h = []
        for _ in range(self.L):
            h.append(h_init.copy())
            caches.append([])
            
        for X in X_train:
            X_one_hot = np.zeros(self.D)
            X_one_hot[X] = 1.
            y = X_one_hot.reshape(1, -1)
            for layer in range(self.L):
                y, h[layer], cache = self.forward(y, h[layer], self.model[layer])
                caches[layer].append(cache)
                
            ys.append(y)
            
        return ys, caches

    def cross_entropy(self, y_pred, y_train):
        m = y_pred.shape[0]

        prob = l.softmax(y_pred)
        log_like = -np.log(prob[range(m), y_train])
        data_loss = np.sum(log_like) / m

        return data_loss # + reg_loss

    def dcross_entropy(self, y_pred, y_train):
        m = y_pred.shape[0]

        grad_y = l.softmax(y_pred)
        grad_y[range(m), y_train] -= 1.0
        grad_y /= m

        return grad_y
    
    def loss_function(self, y_train, ys):
        loss, dys = 0.0, []

        for y_pred, y in zip(ys, y_train):
            loss += self.cross_entropy(y_pred, y)
            dy = self.dcross_entropy(y_pred, y)
            dys.append(dy)
            
        return loss, dys

    def train_backward(self, dys, caches):
        dh, grad, grads = [], [], []
        for layer in range(self.L):
            dh.append(np.zeros((1, self.H)))
            grad.append({key: np.zeros_like(val) for key, val in self.model[layer].items()})
            grads.append({key: np.zeros_like(val) for key, val in self.model[layer].items()})
            
        for t in reversed(range(len(dys))):
            dX = dys[t]
            for layer in reversed(range(self.L)):
                dX, dh[layer], grad[layer] = self.backward(dX, dh[layer], caches[layer][t])
                for key in grad[0].keys():
                    grads[layer][key] += grad[layer][key]
                
        return dX, grads
    
    def test(self, X_seed, h, size):
        chars = [self.idx2char[X_seed]]
        idx_list = list(range(self.vocab_size))
        X = X_seed
        
        h_init = h.copy()
        h = []
        for _ in range(self.L):
            h.append(h_init.copy())

        for _ in range(size):
            X_one_hot = np.zeros(self.D)
            X_one_hot[X] = 1.
            y = X_one_hot.reshape(1, -1)
            for layer in range(self.L):
                y, h[layer], _ = self.forward(y, h[layer], self.model[layer])
                
            prob = l.softmax(y)
            idx = np.random.choice(idx_list, p=prob.ravel())
            chars.append(self.idx2char[idx])
            X = idx

        return ''.join(chars)

In [8]:
from sklearn.utils import shuffle as skshuffle

def get_minibatch(X, y, minibatch_size, shuffle):
    minibatches = []

    if shuffle:
        X, y = skshuffle(X, y)

    for i in range(0, X.shape[0], minibatch_size):
        X_mini = X[i:i + minibatch_size]
        y_mini = y[i:i + minibatch_size]

        minibatches.append((X_mini, y_mini))

    return minibatches

def adam_rnn(nn, X_train, y_train, alpha, mb_size, n_iter, print_after):
    minibatches = get_minibatch(X_train, y_train, mb_size, shuffle=False)
    
    M, R = [], []
    for _ in range(nn.L):
        M.append({k: np.zeros_like(v) for k, v in nn.model[0].items()})
        R.append({k: np.zeros_like(v) for k, v in nn.model[0].items()})
        
    beta1 = .9
    beta2 = .999
    idx = 0
    state = nn.initial_state()
    loss = np.log(len(set(X_train)))
    eps = 1e-8
    smooth_loss = 1.0 #-np.log(1.0 / len(set(X_train)))

    for iter in range(1, n_iter + 1):

        if idx >= len(minibatches):
            idx = 0
            state = nn.initial_state()

        X_mini, y_mini = minibatches[idx]
        idx += 1

        ys, caches = nn.train_forward(X_mini, state)
        loss, dys = nn.loss_function(y_mini, ys)
        dX, grads = nn.train_backward(dys, caches)
        nn.losses['train'].append(loss)
        smooth_loss = 0.999 * smooth_loss + 0.001 * loss
        nn.losses['smooth train'].append(smooth_loss)


        for layer in range(nn.L):
            for k in grads[0].keys(): #key, value: items
                M[layer][k] = l.exp_running_avg(M[layer][k], grads[layer][k], beta1)
                R[layer][k] = l.exp_running_avg(R[layer][k], grads[layer][k]**2, beta2)

                m_k_hat = M[layer][k] / (1. - beta1**(iter))
                r_k_hat = R[layer][k] / (1. - beta2**(iter))

                nn.model[layer][k] -= alpha * m_k_hat / (np.sqrt(r_k_hat) + eps)
    
        # Print loss and test sample
        if iter % print_after == 0:
            print('Iter-{} loss: {:.4f}'.format(iter, loss))
            sample = nn.test(X_mini[0], state, size=mb_size*10)
            print(sample)
        
    return nn

In [25]:
vocab_size = len(char_to_idx)

# hyper parameters
time_step = 10 # width
num_layers = 1 # depth
n_iter = 13000 # epochs
alpha = 1e-3 # learning_rate
print_after = n_iter//10 # print loss, valid, and test
H = 64 # num_hidden_units in hidden layer

In [26]:
net = RNN(D=vocab_size, H=H, L=num_layers, char2idx=char_to_idx, idx2char=idx_to_char)

adam_rnn(nn=net, X_train=X, y_train=y, alpha=alpha, mb_size=time_step, n_iter=n_iter, print_after=print_after)


Iter-1300 loss: 23.6086
0 ttpss red  cts ia enre ,hrnas ertlin pen .epran ilomatn oe TWef fiwemhentantth tshy rs hurnr-   tad
Iter-2600 loss: 24.3746
ingsof asttof79;"pis uaOkoJ aha ahan mal the theapanc Shand fouarit,ningD. au lbh, wivnrat eJspllocan
Iter-3900 loss: 21.7627
gy deld Wn kha Ueatod Gicug8tti cta 1eth  Srtmica dasists ap Japan ythi geanduro o.gObes arm artesthe
Iter-5200 loss: 6.3551
 wht ro, Aapwe bmulith ropin"kyob aumeoncuxgEate tiogwe. L5il Jar se the Roulorpoujoshintot ihd Steep
Iter-6500 loss: 9.7314
 ald Wan pmpelorea an ast ah s uht maletake ch wse -bivar d pe, el Connt  Japan is ind a an len th tN
Iter-7800 loss: 10.8498
ris lad pouteda mpon on ing Sh ther tem ted on me Wouthe the 180tar o, toiomisn, Japan resemic then t
Iter-9100 loss: 6.0690
, 9ake imins ins anpfxtse th san, Nail Is bin or an bom onegD pftcobex as the nios cithe touteresin ,
Iter-10400 loss: 6.4991
47 torbandy malonatg rnsthetSea Eist of ton mpiroresica ones ares fimpeon mpinolas iourhala oesturfot
Iter-11700 loss: 13.6825
th ium en.ise a- logsoti. ceass rnhumpronsty-cokeale Gn, u.ath ofori. Hha'8 a, the Paile 1852 ine pas
Iter-13000 loss: 17.2030
 thitwortot. Jacan mi ponsi ca inicit m end J4p J pandest.mThe eiper theth-shd-nabeapan'se 19unkton m
Out[26]:
<__main__.RNN at 0x113d6b710>

In [24]:
# # Display the learning curve and losses for training, validation, and testing
# %matplotlib inline
# %config InlineBackend.figure_format = 'retina'

import matplotlib.pyplot as plt

plt.plot(net.losses['train'], label='Train loss')
plt.plot(net.losses['smooth train'], label='Smooth train loss')
plt.legend()
plt.show()



In [ ]: