In [ ]:
%load_ext autoreload
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
from torch.autograd import Variable
from sklearn.utils import shuffle
from torchsample.initializers import Uniform
from torchsample.modules import ModuleTrainer
from torchsample.metrics import CategoricalAccuracy
%aimport torchsample.modules
%matplotlib inline
In [ ]:
use_cuda = False
batch_size = 64
We're going to download the collected works of Nietzsche to use as our data for this class.
In [ ]:
from keras.utils.data_utils import get_file
path = get_file('nietzsche.txt', origin="https://s3.amazonaws.com/text-datasets/nietzsche.txt")
text = open(path).read()
print('corpus length:', len(text))
In [ ]:
chars = sorted(list(set(text)))
chars.insert(0, "\0")
vocab_size = len(chars)
print('total chars:', vocab_size)
Sometimes it's useful to have a zero value in the dataset, e.g. for padding
In [ ]:
''.join(chars)
Map from chars to indices and back again
In [ ]:
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))
idx will be the data we use from now own - it simply converts all the characters to their index (based on the mapping above)
In [ ]:
idx = [char_indices[c] for c in text]
In [ ]:
idx[:10]
In [ ]:
''.join(indices_char[i] for i in idx[:70])
Create a list of every 4th character, starting at the 0th, 1st, 2nd, then 3rd characters
In [ ]:
cs=3
c1_dat = [idx[i] for i in range(0, len(idx)-1-cs, cs)]
c2_dat = [idx[i+1] for i in range(0, len(idx)-1-cs, cs)]
c3_dat = [idx[i+2] for i in range(0, len(idx)-1-cs, cs)]
c4_dat = [idx[i+3] for i in range(0, len(idx)-1-cs, cs)]
In [ ]:
x1 = np.stack(c1_dat)
x2 = np.stack(c2_dat)
x3 = np.stack(c3_dat)
x3.shape
Our output
In [ ]:
y = np.stack(c4_dat)
y.shape
The first 4 inputs and outputs
In [ ]:
x1[:4], x2[:4], x3[:4]
In [ ]:
y[:4]
In [ ]:
x1.shape, y.shape
The number of latent factors to create (i.e. the size of the embedding matrix). Pick a size for our hidden state
In [ ]:
n_fac = 42
n_hidden = 256
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
seq_len = 3
def tensor(from_int):
return torch.from_numpy(np.array(from_int)).long()
class SimpleRnn3Chars(nn.Module):
def __init__(self):
super().__init__()
self.embedding = nn.Embedding(vocab_size, n_fac)
self.dense_in_lin = nn.Linear(n_fac, n_hidden)
self.dense_hidden_lin = nn.Linear(n_hidden, n_hidden)
self.dense_out = nn.Linear(n_hidden, vocab_size)
self.init()
# print(self.embedding(Variable(tensor([10]))))
# print(self.dense_in_lin.bias)
def dense_in(self, x):
x = x.view(x.size(0), -1)
x = self.dense_in_lin(x)
x = F.relu(x, True)
return x
def dense_hidden(self, x):
x = self.dense_hidden_lin(x)
x = F.tanh(x)
return x
def forward(self, c1, c2, c3):
c1_in = self.embedding(c1) # x => torch.Size([B, 3, n_fac])
c2_in = self.embedding(c2)
c3_in = self.embedding(c3)
c1_hidden = self.dense_in(c1_in)
c2_dense = self.dense_in(c2_in)
hidden_2 = self.dense_hidden(c1_hidden)
c2_hidden = c2_dense + hidden_2
c3_dense = self.dense_in(c3_in)
hidden_3 = self.dense_hidden(c2_hidden)
c3_hidden = c3_dense + hidden_3
c4_out = self.dense_out(c3_hidden)
return c4_out
def init(self):
torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
torch.nn.init.xavier_uniform(self.dense_in_lin.weight)
torch.nn.init.constant(self.dense_in_lin.bias, val=0.0)
torch.nn.init.eye(self.dense_hidden_lin.weight)
torch.nn.init.constant(self.dense_hidden_lin.bias, val=0.0)
torch.nn.init.xavier_uniform(self.dense_out.weight)
torch.nn.init.constant(self.dense_out.bias, val=0.0)
In [ ]:
%autoreload 2
criterion = nn.CrossEntropyLoss()
model = SimpleRnn3Chars()
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
model
In [ ]:
trainer.fit([tensor(x1), tensor(x2), tensor(x3)], tensor(y), nb_epoch=4, batch_size=batch_size, shuffle=True)
In [ ]:
def get_next(inp):
idxs = [char_indices[c] for c in inp]
arrs = [tensor([i]) for i in idxs]
p = trainer.predict(arrs)
# torch doesn't have an argmax function. See https://discuss.pytorch.org/t/argmax-with-pytorch/1528
v, i = torch.max(p, 1) # i is the result Tensor with the index locations of the maximum values
i = torch.max(i.data) # find any index (they are all max)
return chars[i]
In [ ]:
get_next('phi')
In [ ]:
get_next(' th')
In [ ]:
get_next(' an')
This is the size of our unrolled RNN.
In [ ]:
cs=8
For each of 0 through 7, create a list of every 8th character with that starting point. These will be the 8 inputs to out model.
In [ ]:
c_in_dat = [[idx[i+n] for i in range(0, len(idx)-1-cs, cs)]
for n in range(cs)]
len(c_in_dat), len(c_in_dat[0])
Then create a list of the next character in each of these series. This will be the labels for our model.
In [ ]:
c_out_dat = [idx[i+cs] for i in range(0, len(idx)-1-cs, cs)]
xs = [np.stack(c) for c in c_in_dat]
len(xs), xs[0].shape
In [ ]:
y = np.stack(c_out_dat)
So each column below is one series of 8 characters from the text.
In [ ]:
[xs[n][:cs] for n in range(cs)]
...and this is the next character after each sequence.
In [ ]:
y[:cs]
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
def each_tensor(items):
return [tensor(item) for item in items]
class RnnMultiChar(nn.Module):
def __init__(self):
super().__init__()
self.embedding = nn.Embedding(vocab_size, n_fac)
self.dense_in_lin = nn.Linear(n_fac, n_hidden)
self.dense_hidden_lin = nn.Linear(n_hidden, n_hidden)
self.dense_out = nn.Linear(n_hidden, vocab_size)
self.init()
def dense_in(self, x):
x = x.view(x.size(0), -1)
x = self.dense_in_lin(x)
x = F.relu(x, True)
return x
def dense_hidden(self, x):
x = self.dense_hidden_lin(x)
x = F.relu(x)
return x
def forward(self, *c):
c_in = self.embedding(c[0])
hidden = self.dense_in(c_in)
for i in range(1,cs):
c_in = self.embedding(c[i]) # x => torch.Size([B, 1, n_fac])
c_dense = self.dense_in(c_in)
hidden = self.dense_hidden(hidden)
hidden.add_(c_dense)
c_out = self.dense_out(hidden)
return c_out
def init(self):
torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
torch.nn.init.xavier_uniform(self.dense_in_lin.weight)
torch.nn.init.constant(self.dense_in_lin.bias, val=0.0)
torch.nn.init.eye(self.dense_hidden_lin.weight)
torch.nn.init.constant(self.dense_hidden_lin.bias, val=0.0)
torch.nn.init.xavier_uniform(self.dense_out.weight)
torch.nn.init.constant(self.dense_out.bias, val=0.0)
In [ ]:
%autoreload 2
criterion = nn.CrossEntropyLoss()
model = RnnMultiChar()
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
model
In [ ]:
trainer.fit(each_tensor(xs), tensor(y), nb_epoch=4, batch_size=batch_size, shuffle=True)
In [ ]:
get_next('for ther')
In [ ]:
get_next('part of ')
In [ ]:
get_next('queens a')
The SimpleRNN layer does not exist in PyTorch (yet?)
In [ ]:
n_hidden, n_fac, cs, vocab_size
This is nearly exactly equivalent to the RNN we built ourselves in the previous section.
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
class RnnMultiCharPytorch(nn.Module):
def __init__(self):
super().__init__()
self.embedding = nn.Embedding(vocab_size, n_fac)
self.rnn = nn.RNNCell(input_size=n_fac, hidden_size=n_hidden, nonlinearity='relu')
self.dense_out = nn.Linear(n_hidden, vocab_size)
self.init()
def forward(self, *c):
batch_size = c[0].size(0)
hidden = Variable(torch.zeros(batch_size, n_hidden))
# F.relu(F.linear(input, w_ih, b_ih)
for ci in c:
c_in = self.embedding(ci)
c_in = c_in.view(c_in.size(0), -1) # torch.Size([64, 42])
hidden = self.rnn(c_in, hidden)
c_out = self.dense_out(hidden)
return c_out
def init(self):
torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
torch.nn.init.xavier_uniform(self.rnn.weight_ih)
torch.nn.init.constant(self.rnn.bias_ih, val=0.0)
torch.nn.init.eye(self.rnn.weight_hh)
torch.nn.init.constant(self.rnn.bias_hh, val=0.0)
torch.nn.init.xavier_uniform(self.dense_out.weight)
torch.nn.init.constant(self.dense_out.bias, val=0.0)
In [ ]:
%autoreload 2
criterion = nn.CrossEntropyLoss()
model = RnnMultiCharPytorch()
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
model
In [ ]:
trainer.fit(each_tensor(xs), tensor(y), nb_epoch=4, batch_size=batch_size, shuffle=True)
In [ ]:
get_next('for ther')
In [ ]:
get_next('part of ')
In [ ]:
get_next('queens a')
To use a sequence model, we can leave our input unchanged - but we have to change our output to a sequence (of course!)
Here, c_out_dat is identical to c_in_dat, but moved across 1 character.
In [ ]:
#c_in_dat = [[idx[i+n] for i in range(0, len(idx)-1-cs, cs)]
# for n in range(cs)]
c_out_dat = [[idx[i+n] for i in range(1, len(idx)-cs, cs)]
for n in range(cs)]
In [ ]:
ys = [np.stack(c) for c in c_out_dat]
len(ys), ys[0].shape
Reading down each column shows one set of inputs and outputs.
In [ ]:
[xs[n][:cs] for n in range(cs)]
len(xs), xs[0].shape
In [ ]:
[ys[n][:cs] for n in range(cs)]
len(ys), ys[0].shape
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
class RnnMultiOutput(nn.Module):
def __init__(self):
super().__init__()
self.embedding = nn.Embedding(vocab_size, n_fac)
self.dense_in_lin = nn.Linear(n_fac, n_hidden)
self.dense_hidden_lin = nn.Linear(n_hidden, n_hidden)
self.dense_out = nn.Linear(n_hidden, vocab_size)
self.init()
def dense_in(self, x):
x = x.view(x.size(0), -1)
x = self.dense_in_lin(x)
x = F.relu(x, True)
return x
def dense_hidden(self, x):
x = self.dense_hidden_lin(x)
x = F.relu(x)
return x
def forward(self, *c):
c_in = self.embedding(c[0])
hidden = self.dense_in(c_in)
out = [self.dense_out(hidden)]
for i in range(1,cs):
c_in = self.embedding(c[i]) # x => torch.Size([B, 1, n_fac])
c_dense = self.dense_in(c_in)
hidden = self.dense_hidden(hidden)
hidden.add_(c_dense)
out.append(self.dense_out(hidden))
return out
def init(self):
torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
torch.nn.init.xavier_uniform(self.dense_in_lin.weight)
torch.nn.init.constant(self.dense_in_lin.bias, val=0.0)
torch.nn.init.eye(self.dense_hidden_lin.weight)
torch.nn.init.constant(self.dense_hidden_lin.bias, val=0.0)
torch.nn.init.xavier_uniform(self.dense_out.weight)
torch.nn.init.constant(self.dense_out.bias, val=0.0)
In [ ]:
%autoreload 2
criterion = nn.CrossEntropyLoss()
model = RnnMultiOutput()
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
# Bug in torchsample?
trainer._has_multiple_loss_fns = False
model
In [ ]:
# TODO print each loss separately
trainer.fit(each_tensor(xs), each_tensor(ys), nb_epoch=4, batch_size=batch_size, shuffle=True)
In [ ]:
%autoreload 2
def char_argmax(p):
# print(p.size())
v, i = torch.max(p, 0) # i is the result Tensor with the index locations of the maximum values
i = torch.max(i.data) # find any index (they are all max)
return chars[i]
def get_nexts_multiple(inp):
idxs = [char_indices[c] for c in inp]
arrs = [tensor([i]) for i in idxs]
ps = trainer.predict(arrs)
print(list(inp))
return [char_argmax(p[0]) for p in ps]
In [ ]:
get_nexts_multiple(' this is')
In [ ]:
get_nexts_multiple(' part of')
In [ ]:
n_hidden, n_fac, cs, vocab_size
To convert our previous PyTorch model into a sequence model, simply return multiple outputs instead of a single one
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
class RnnCellMultiOutput(nn.Module):
def __init__(self):
super().__init__()
self.embedding = nn.Embedding(vocab_size, n_fac)
self.rnn = nn.RNNCell(input_size=n_fac, hidden_size=n_hidden, nonlinearity='relu')
self.dense_out = nn.Linear(n_hidden, vocab_size)
self.init()
def forward(self, *c):
batch_size = c[0].size(0)
hidden = Variable(torch.zeros(batch_size, n_hidden))
out = []
for ci in c:
c_in = self.embedding(ci)
c_in = c_in.view(c_in.size(0), -1)
hidden = self.rnn(c_in, hidden)
out.append(self.dense_out(hidden))
return out
def init(self):
torch.nn.init.uniform(self.embedding.weight, a=-0.05, b=0.05)
torch.nn.init.xavier_uniform(self.rnn.weight_ih)
torch.nn.init.constant(self.rnn.bias_ih, val=0.0)
torch.nn.init.eye(self.rnn.weight_hh)
torch.nn.init.constant(self.rnn.bias_hh, val=0.0)
torch.nn.init.xavier_uniform(self.dense_out.weight)
torch.nn.init.constant(self.dense_out.bias, val=0.0)
In [ ]:
%autoreload 2
criterion = nn.CrossEntropyLoss()
model = RnnCellMultiOutput()
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
# Bug in torchsample?
trainer._has_multiple_loss_fns = False
model
In [ ]:
# TODO print each loss separately
trainer.fit(each_tensor(xs), each_tensor(ys), nb_epoch=4, batch_size=batch_size, shuffle=True)
In [ ]:
get_nexts_multiple(' this is')
In [ ]:
# TODO