In [ ]:
%load_ext autoreload

import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
from torch.autograd import Variable
from sklearn.utils import shuffle
from torchsample.initializers import Uniform
from torchsample.modules import ModuleTrainer
from torchsample.metrics import CategoricalAccuracy

%aimport torchsample.modules

%matplotlib inline

In [ ]:
use_cuda = False
batch_size = 64

Setup

We're going to download the collected works of Nietzsche to use as our data for this class.


In [ ]:
from keras.utils.data_utils import get_file

path = get_file('nietzsche.txt', origin="https://s3.amazonaws.com/text-datasets/nietzsche.txt")
text = open(path).read()
print('corpus length:', len(text))

In [ ]:
chars = sorted(list(set(text)))
chars.insert(0, "\0")
vocab_size = len(chars)
print('total chars:', vocab_size)

Sometimes it's useful to have a zero value in the dataset, e.g. for padding


In [ ]:
''.join(chars)

Map from chars to indices and back again


In [ ]:
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))

idx will be the data we use from now own - it simply converts all the characters to their index (based on the mapping above)


In [ ]:
idx = [char_indices[c] for c in text]

In [ ]:
idx[:10]

In [ ]:
''.join(indices_char[i] for i in idx[:70])

3 char model

Create inputs

Create a list of every 4th character, starting at the 0th, 1st, 2nd, then 3rd characters


In [ ]:
cs=3
c1_dat = [idx[i] for i in range(0, len(idx)-1-cs, cs)]
c2_dat = [idx[i+1] for i in range(0, len(idx)-1-cs, cs)]
c3_dat = [idx[i+2] for i in range(0, len(idx)-1-cs, cs)]
c4_dat = [idx[i+3] for i in range(0, len(idx)-1-cs, cs)]

In [ ]:
x1 = np.stack(c1_dat)
x2 = np.stack(c2_dat)
x3 = np.stack(c3_dat)
x3.shape

Our output


In [ ]:
y = np.stack(c4_dat)
y.shape

The first 4 inputs and outputs


In [ ]:
x1[:4], x2[:4], x3[:4]

In [ ]:
y[:4]

In [ ]:
x1.shape, y.shape

Create and train model

The number of latent factors to create (i.e. the size of the embedding matrix). Pick a size for our hidden state


In [ ]:
n_fac = 42
n_hidden = 256

In [ ]:
import torch.nn as nn
import torch.nn.functional as F

seq_len = 3

def tensor(from_int):
    return torch.from_numpy(np.array(from_int)).long()

class SimpleRnn3Chars(nn.Module):

    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, n_fac)
        self.dense_in_lin = nn.Linear(n_fac, n_hidden)
        self.dense_hidden_lin = nn.Linear(n_hidden, n_hidden)
        self.dense_out = nn.Linear(n_hidden, vocab_size)
        self.init()
        # print(self.embedding(Variable(tensor([10]))))
        # print(self.dense_in_lin.bias)
        
    def dense_in(self, x):
        x = x.view(x.size(0), -1)
        x = self.dense_in_lin(x)
        x = F.relu(x, True)
        return x
            
    def dense_hidden(self, x):
        x = self.dense_hidden_lin(x)
        x = F.tanh(x)
        return x

    def forward(self, c1, c2, c3):
        c1_in = self.embedding(c1) # x => torch.Size([B, 3, n_fac])
        c2_in = self.embedding(c2)
        c3_in = self.embedding(c3)
        
        c1_hidden = self.dense_in(c1_in)
        
        c2_dense = self.dense_in(c2_in)
        hidden_2 = self.dense_hidden(c1_hidden)
        c2_hidden = c2_dense + hidden_2
        
        c3_dense = self.dense_in(c3_in)
        hidden_3 = self.dense_hidden(c2_hidden)
        c3_hidden = c3_dense + hidden_3
        
        c4_out = self.dense_out(c3_hidden)
        
        return c4_out
    
    def init(self):
        torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
        torch.nn.init.xavier_uniform(self.dense_in_lin.weight)
        torch.nn.init.constant(self.dense_in_lin.bias, val=0.0)
        torch.nn.init.eye(self.dense_hidden_lin.weight)
        torch.nn.init.constant(self.dense_hidden_lin.bias, val=0.0)
        torch.nn.init.xavier_uniform(self.dense_out.weight)
        torch.nn.init.constant(self.dense_out.bias, val=0.0)

In [ ]:
%autoreload 2

criterion = nn.CrossEntropyLoss()
model = SimpleRnn3Chars()
if(use_cuda):
    model.cuda()
    criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)

model

In [ ]:
trainer.fit([tensor(x1), tensor(x2), tensor(x3)], tensor(y), nb_epoch=4, batch_size=batch_size, shuffle=True)

Test model


In [ ]:
def get_next(inp):
    idxs = [char_indices[c] for c in inp]
    arrs = [tensor([i]) for i in idxs]
    p = trainer.predict(arrs)
    # torch doesn't have an argmax function. See https://discuss.pytorch.org/t/argmax-with-pytorch/1528
    v, i = torch.max(p, 1) # i is the result Tensor with the index locations of the maximum values
    i = torch.max(i.data) # find any index (they are all max)
    return chars[i]

In [ ]:
get_next('phi')

In [ ]:
get_next(' th')

In [ ]:
get_next(' an')

Our first RNN!

Create inputs

This is the size of our unrolled RNN.


In [ ]:
cs=8

For each of 0 through 7, create a list of every 8th character with that starting point. These will be the 8 inputs to out model.


In [ ]:
c_in_dat = [[idx[i+n] for i in range(0, len(idx)-1-cs, cs)]
            for n in range(cs)]
len(c_in_dat), len(c_in_dat[0])

Then create a list of the next character in each of these series. This will be the labels for our model.


In [ ]:
c_out_dat = [idx[i+cs] for i in range(0, len(idx)-1-cs, cs)]
xs = [np.stack(c) for c in c_in_dat]
len(xs), xs[0].shape

In [ ]:
y = np.stack(c_out_dat)

So each column below is one series of 8 characters from the text.


In [ ]:
[xs[n][:cs] for n in range(cs)]

...and this is the next character after each sequence.


In [ ]:
y[:cs]

Create and train model


In [ ]:
import torch.nn as nn
import torch.nn.functional as F

def each_tensor(items):
    return [tensor(item) for item in items] 

class RnnMultiChar(nn.Module):

    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, n_fac)
        self.dense_in_lin = nn.Linear(n_fac, n_hidden)
        self.dense_hidden_lin = nn.Linear(n_hidden, n_hidden)
        self.dense_out = nn.Linear(n_hidden, vocab_size)
        self.init()
        
    def dense_in(self, x):
        x = x.view(x.size(0), -1)
        x = self.dense_in_lin(x)
        x = F.relu(x, True)
        return x
            
    def dense_hidden(self, x):
        x = self.dense_hidden_lin(x)
        x = F.relu(x)
        return x

    def forward(self, *c):
        c_in = self.embedding(c[0])
        hidden = self.dense_in(c_in)
        
        for i in range(1,cs):
            c_in = self.embedding(c[i]) # x => torch.Size([B, 1, n_fac])
            c_dense = self.dense_in(c_in)
            hidden = self.dense_hidden(hidden)
            hidden.add_(c_dense)
        
        c_out = self.dense_out(hidden)
        
        return c_out
    
    def init(self):
        torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
        torch.nn.init.xavier_uniform(self.dense_in_lin.weight)
        torch.nn.init.constant(self.dense_in_lin.bias, val=0.0)
        torch.nn.init.eye(self.dense_hidden_lin.weight)
        torch.nn.init.constant(self.dense_hidden_lin.bias, val=0.0)
        torch.nn.init.xavier_uniform(self.dense_out.weight)
        torch.nn.init.constant(self.dense_out.bias, val=0.0)

In [ ]:
%autoreload 2

criterion = nn.CrossEntropyLoss()
model = RnnMultiChar()
if(use_cuda):
    model.cuda()
    criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)

model

In [ ]:
trainer.fit(each_tensor(xs), tensor(y), nb_epoch=4, batch_size=batch_size, shuffle=True)

Test model


In [ ]:
get_next('for ther')

In [ ]:
get_next('part of ')

In [ ]:
get_next('queens a')

Our first RNN with PyTorch!

The SimpleRNN layer does not exist in PyTorch (yet?)


In [ ]:
n_hidden, n_fac, cs, vocab_size

This is nearly exactly equivalent to the RNN we built ourselves in the previous section.


In [ ]:
import torch.nn as nn
import torch.nn.functional as F

class RnnMultiCharPytorch(nn.Module):

    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, n_fac)
        self.rnn = nn.RNNCell(input_size=n_fac, hidden_size=n_hidden, nonlinearity='relu')
        self.dense_out = nn.Linear(n_hidden, vocab_size)
        self.init()

    def forward(self, *c):
        batch_size = c[0].size(0)
        hidden = Variable(torch.zeros(batch_size, n_hidden))
        # F.relu(F.linear(input, w_ih, b_ih)
        for ci in c:
            c_in = self.embedding(ci)
            c_in = c_in.view(c_in.size(0), -1) # torch.Size([64, 42])
            hidden = self.rnn(c_in, hidden)
        
        c_out = self.dense_out(hidden)
        return c_out
    
    def init(self):
        torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
        torch.nn.init.xavier_uniform(self.rnn.weight_ih)
        torch.nn.init.constant(self.rnn.bias_ih, val=0.0)
        torch.nn.init.eye(self.rnn.weight_hh)
        torch.nn.init.constant(self.rnn.bias_hh, val=0.0)
        torch.nn.init.xavier_uniform(self.dense_out.weight)
        torch.nn.init.constant(self.dense_out.bias, val=0.0)

In [ ]:
%autoreload 2

criterion = nn.CrossEntropyLoss()
model = RnnMultiCharPytorch()
if(use_cuda):
    model.cuda()
    criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)

model

In [ ]:
trainer.fit(each_tensor(xs), tensor(y), nb_epoch=4, batch_size=batch_size, shuffle=True)

In [ ]:
get_next('for ther')

In [ ]:
get_next('part of ')

In [ ]:
get_next('queens a')

Returning sequences

Create inputs

To use a sequence model, we can leave our input unchanged - but we have to change our output to a sequence (of course!)

Here, c_out_dat is identical to c_in_dat, but moved across 1 character.


In [ ]:
#c_in_dat = [[idx[i+n] for i in range(0, len(idx)-1-cs, cs)]
#            for n in range(cs)]
c_out_dat = [[idx[i+n] for i in range(1, len(idx)-cs, cs)]
            for n in range(cs)]

In [ ]:
ys = [np.stack(c) for c in c_out_dat]
len(ys), ys[0].shape

Reading down each column shows one set of inputs and outputs.


In [ ]:
[xs[n][:cs] for n in range(cs)]
len(xs), xs[0].shape

In [ ]:
[ys[n][:cs] for n in range(cs)]
len(ys), ys[0].shape

Create and train model


In [ ]:
import torch.nn as nn
import torch.nn.functional as F

class RnnMultiOutput(nn.Module):

    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, n_fac)
        self.dense_in_lin = nn.Linear(n_fac, n_hidden)
        self.dense_hidden_lin = nn.Linear(n_hidden, n_hidden)
        self.dense_out = nn.Linear(n_hidden, vocab_size)
        self.init()
        
    def dense_in(self, x):
        x = x.view(x.size(0), -1)
        x = self.dense_in_lin(x)
        x = F.relu(x, True)
        return x
            
    def dense_hidden(self, x):
        x = self.dense_hidden_lin(x)
        x = F.relu(x)
        return x

    def forward(self, *c):
        c_in = self.embedding(c[0])
        hidden = self.dense_in(c_in)
        
        out = [self.dense_out(hidden)]
        
        for i in range(1,cs):
            c_in = self.embedding(c[i]) # x => torch.Size([B, 1, n_fac])
            c_dense = self.dense_in(c_in)
            hidden = self.dense_hidden(hidden)
            hidden.add_(c_dense)
            out.append(self.dense_out(hidden))
        
        return out
    
    def init(self):
        torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
        torch.nn.init.xavier_uniform(self.dense_in_lin.weight)
        torch.nn.init.constant(self.dense_in_lin.bias, val=0.0)
        torch.nn.init.eye(self.dense_hidden_lin.weight)
        torch.nn.init.constant(self.dense_hidden_lin.bias, val=0.0)
        torch.nn.init.xavier_uniform(self.dense_out.weight)
        torch.nn.init.constant(self.dense_out.bias, val=0.0)

In [ ]:
%autoreload 2

criterion = nn.CrossEntropyLoss()
model = RnnMultiOutput()
if(use_cuda):
    model.cuda()
    criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)

# Bug in torchsample?
trainer._has_multiple_loss_fns = False

model

In [ ]:
# TODO print each loss separately
trainer.fit(each_tensor(xs), each_tensor(ys), nb_epoch=4, batch_size=batch_size, shuffle=True)

Test model


In [ ]:
%autoreload 2

def char_argmax(p):
    # print(p.size())
    v, i = torch.max(p, 0) # i is the result Tensor with the index locations of the maximum values
    i = torch.max(i.data) # find any index (they are all max)
    return chars[i]

def get_nexts_multiple(inp):
    idxs = [char_indices[c] for c in inp]
    arrs = [tensor([i]) for i in idxs]
    ps = trainer.predict(arrs)
    print(list(inp))
    return [char_argmax(p[0]) for p in ps]

In [ ]:
get_nexts_multiple(' this is')

In [ ]:
get_nexts_multiple(' part of')

Sequence model with PyTorch


In [ ]:
n_hidden, n_fac, cs, vocab_size

To convert our previous PyTorch model into a sequence model, simply return multiple outputs instead of a single one


In [ ]:
import torch.nn as nn
import torch.nn.functional as F

class RnnCellMultiOutput(nn.Module):

    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, n_fac)
        self.rnn = nn.RNNCell(input_size=n_fac, hidden_size=n_hidden, nonlinearity='relu')
        self.dense_out = nn.Linear(n_hidden, vocab_size)
        self.init()

    def forward(self, *c):
        batch_size = c[0].size(0)
        hidden = Variable(torch.zeros(batch_size, n_hidden))
        
        out = []
        
        for ci in c:
            c_in = self.embedding(ci)
            c_in = c_in.view(c_in.size(0), -1)
            hidden = self.rnn(c_in, hidden)
            out.append(self.dense_out(hidden))
            
        return out
    
    def init(self):
        torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
        torch.nn.init.xavier_uniform(self.rnn.weight_ih)
        torch.nn.init.constant(self.rnn.bias_ih, val=0.0)
        torch.nn.init.eye(self.rnn.weight_hh)
        torch.nn.init.constant(self.rnn.bias_hh, val=0.0)
        torch.nn.init.xavier_uniform(self.dense_out.weight)
        torch.nn.init.constant(self.dense_out.bias, val=0.0)

In [ ]:
%autoreload 2

criterion = nn.CrossEntropyLoss()
model = RnnCellMultiOutput()
if(use_cuda):
    model.cuda()
    criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)

# Bug in torchsample?
trainer._has_multiple_loss_fns = False

model

In [ ]:
# TODO print each loss separately
trainer.fit(each_tensor(xs), each_tensor(ys), nb_epoch=4, batch_size=batch_size, shuffle=True)

In [ ]:
get_nexts_multiple(' this is')

Stateful model with Pytorch


In [ ]:
# TODO