In [ ]:
%load_ext autoreload
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data
from torch.autograd import Variable
from sklearn.utils import shuffle
from torchsample.initializers import Uniform
from torchsample.modules import ModuleTrainer
from torchsample.metrics import CategoricalAccuracy
%aimport torchsample.modules
%matplotlib inline
In [ ]:
use_cuda = False
batch_size = 64
We're going to look at the IMDB dataset, which contains movie reviews from IMDB, along with their sentiment. Keras comes with some helpers for this dataset.
In [ ]:
from keras.datasets import imdb
idx = imdb.get_word_index()
This is the word list:
In [ ]:
idx_arr = sorted(idx, key=idx.get)
idx_arr[:10]
...and this is the mapping from id to word
In [ ]:
idx2word = {v: k for k, v in idx.items()}
We download the reviews using code copied from keras.datasets:
In [ ]:
from keras.utils.data_utils import get_file
import pickle
path = get_file('imdb_full.pkl',
origin='https://s3.amazonaws.com/text-datasets/imdb_full.pkl',
md5_hash='d091312047c43cf9e4e38fef92437263')
f = open(path, 'rb')
(x_train, labels_train), (x_test, labels_test) = pickle.load(f)
In [ ]:
len(x_train)
Here's the 1st review. As you see, the words have been replaced by ids. The ids can be looked up in idx2word.
In [ ]:
', '.join(map(str, x_train[0]))
The first word of the first review is 23022. Let's see what that is.
In [ ]:
idx2word[23022]
Here's the whole review, mapped from ids to words.
In [ ]:
' '.join([idx2word[o] for o in x_train[0]])
The labels are 1 for positive, 0 for negative.
In [ ]:
labels_train_tensor = torch.from_numpy(np.array(labels_train))
labels_test_tensor = torch.from_numpy(np.array(labels_test))
labels_train[:10]
Reduce vocab size by setting rare words to max index.
In [ ]:
vocab_size = 5000
trn = [np.array([i if i < vocab_size - 1 else vocab_size - 1 for i in s]) for s in x_train]
test = [np.array([i if i < vocab_size - 1 else vocab_size - 1 for i in s]) for s in x_test]
Look at distribution of lengths of sentences.
In [ ]:
lens = np.array(list(map(len, trn)))
(lens.max(), lens.min(), lens.mean())
Pad (with zero) or truncate each sentence to make consistent length.
In [ ]:
seq_len = 500
from keras.preprocessing import sequence
trn = sequence.pad_sequences(trn, maxlen=seq_len, value=0)
test = sequence.pad_sequences(test, maxlen=seq_len, value=0)
trn_tensor = torch.from_numpy(trn).long()
test_tensor = torch.from_numpy(test).long()
This results in nice rectangular matrices that can be passed to ML algorithms. Reviews shorter than 500 words are pre-padded with zeros, those greater are truncated.
In [ ]:
trn_tensor.size()
The simplest model that tends to give reasonable results is a single hidden layer net. So let's try that. Note that we can't expect to get any useful results by feeding word ids directly into a neural net - so instead we use an embedding to replace them with a vector of 32 (initially random) floats for each word in the vocab.
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
class SingleHiddenLayerModule(nn.Module):
def __init__(self):
super().__init__()
num_dimensions = 32
self.embedding = nn.Embedding(vocab_size, num_dimensions)
self.fc1 = nn.Linear(seq_len * num_dimensions, 100)
self.dropout = nn.Dropout(0.7)
self.fc2 = nn.Linear(100, 2)
self.init()
def forward(self, words_ids):
x = self.embedding(words_ids) # x => torch.Size([64, 500, 32])
x = x.view(x.size(0), -1) # x => torch.Size([64, 16000])
x = self.fc1(x)
x = F.relu(x, True)
x = self.dropout(x)
x = self.fc2(x)
# result = F.sigmoid(x)
result = x
return result
def init(self):
torch.nn.init.constant(self.fc1.bias, val=0.0)
torch.nn.init.constant(self.fc2.bias, val=0.0)
In [ ]:
%autoreload 2
# criterion = nn.BCELoss()
criterion = nn.CrossEntropyLoss()
model = SingleHiddenLayerModule()
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
trainer.set_initializers([Uniform(module_filter="embedding*", a=-0.05, b=0.05), XavierUniform(module_filter="fc*")])
trainer.set_metrics([CategoricalAccuracy()])
# trainer.summary((trn_tensor.size(0), labels_train_tensor.size(0)))
model
In [ ]:
trainer.fit(trn_tensor, labels_train_tensor, validation_data=(test_tensor, labels_test_tensor),
nb_epoch=2, batch_size=batch_size, shuffle=True)
The stanford paper that this dataset is from cites a state of the art accuracy (without unlabelled data) of 0.883. So we're short of that, but on the right track. We've already beaten the state of the art in 2011 with a simple Neural Net.
A CNN is likely to work better, since it's designed to take advantage of ordered data. We'll need to use a 1D CNN, since a sequence of words is 1D.
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
class CnnMaxPoolingModule(nn.Module):
def __init__(self):
super().__init__()
num_dimensions = 32
self.embedding = nn.Embedding(vocab_size, num_dimensions)
self.drop1 = nn.Dropout(0.2)
self.conv1 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=5, padding=2, groups=1)
self.fc1 = nn.Linear(seq_len * num_dimensions, 100)
self.dropout = nn.Dropout(0.7)
self.fc2 = nn.Linear(100, 2)
self.init()
def forward(self, words_ids):
x = self.embedding(words_ids) # x => torch.Size([B, 500, 32])
x = x.permute(0, 2, 1)
# print('emb', x.size())
x = self.drop1(x) # x => torch.Size([B, 500, 32])
x = self.conv1(x) # x => torch.Size([B, 500, 64])
x = F.relu(x, True)
# print('conv1', x.size())
x = self.drop1(x) # x => torch.Size([B, 500, 64])
x = F.max_pool1d(x, kernel_size=2)
# print('max', x.size())
x = x.view(x.size(0), -1)
# print(x.size())
x = self.fc1(x)
x = F.relu(x, True)
x = self.dropout(x)
x = self.fc2(x)
# result = F.sigmoid(x)
result = x
#raise 'Error'
return result
def init(self):
torch.nn.init.constant(self.conv1.bias, val=0.0)
torch.nn.init.constant(self.fc1.bias, val=0.0)
torch.nn.init.constant(self.fc2.bias, val=0.0)
In [ ]:
%autoreload 2
# criterion = nn.BCELoss()
criterion = nn.CrossEntropyLoss()
model = CnnMaxPoolingModule()
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
trainer.set_initializers([Uniform(module_filter="embedding*", a=-0.05, b=0.05), XavierUniform(module_filter="fc*"), XavierUniform(module_filter="conv*")])
trainer.set_metrics([CategoricalAccuracy()])
# trainer.summary((trn_tensor.size(0), labels_train_tensor.size(0)))
model
In [ ]:
trainer.fit(trn_tensor, labels_train_tensor, validation_data=(test_tensor, labels_test_tensor),
nb_epoch=2, batch_size=batch_size, shuffle=True)
In [ ]:
trainer.fit(trn_tensor, labels_train_tensor, validation_data=(test_tensor, labels_test_tensor),
nb_epoch=4, batch_size=batch_size, shuffle=True)
You may want to look at wordvectors.ipynb before moving on.
In this section, we replicate the previous CNN, but using pre-trained embeddings.
In [ ]:
import torch
import re
from torchtext.vocab import load_word_vectors
wv_dict, wv_arr, wv_size = load_word_vectors('.', 'glove.6B', 50)
print('Loaded', len(wv_arr), 'words')
The glove word ids and imdb word ids use different indexes. So we create a simple function that creates an embedding matrix using the indexes from imdb, and the embeddings from glove (where they exist).
In [ ]:
def get_word(word):
return wv_arr[wv_dict[word]]
def create_emb():
num_dimensions_glove = wv_arr.size()[1]
embedding = nn.Embedding(vocab_size, num_dimensions_glove)
# If we can't find the word in glove, randomly initialize
torch.nn.init.uniform(embedding.weight, a=-0.05, b=0.05)
num_found, num_not_found = 0, 0
for i in range(1,len(embedding.weight)):
word = idx2word[i]
if word and re.match(r"^[a-zA-Z0-9\-]*$", word):
embedding.weight.data[i] = get_word(word)
num_found += 1
else:
num_not_found +=1
# This is our "rare word" id - we want to randomly initialize
torch.nn.init.uniform(embedding.weight.data[-1], a=-0.05, b=0.05)
embedding.weight.requires_grad = False
# This speeds up training. Can it be replaced by BatchNorm1d?
embedding.weight.data /= 3
print("Words found: {}, not found: {}".format(num_found, num_not_found))
return embedding
We pass our embedding matrix to the Embedding constructor, and set it to non-trainable.
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
class CnnMaxPoolingModuleWithEmbedding(nn.Module):
def __init__(self, embedding):
super().__init__()
num_dimensions = 32
self.embedding = embedding
self.drop1 = nn.Dropout(0.25)
self.batchnorm = nn.BatchNorm1d(500)
self.conv1 = nn.Conv1d(in_channels=embedding.weight.size()[1], out_channels=64, kernel_size=5, padding=2, groups=1)
self.fc1 = nn.Linear(seq_len * num_dimensions, 100)
self.dropout = nn.Dropout(0.7)
self.fc2 = nn.Linear(100, 2)
self.init()
def forward(self, words_ids):
x = self.embedding(words_ids)
# x = self.batchnorm(x)
x = x.permute(0, 2, 1)
x = self.drop1(x)
x = self.conv1(x)
x = F.relu(x, True)
x = self.drop1(x)
x = F.max_pool1d(x, kernel_size=2)
x = x.view(x.size(0), -1)
x = self.fc1(x)
x = F.relu(x, True)
x = self.dropout(x)
x = self.fc2(x)
result = x
return result
def init(self):
torch.nn.init.constant(self.conv1.bias, val=0.0)
torch.nn.init.constant(self.fc1.bias, val=0.0)
torch.nn.init.constant(self.fc2.bias, val=0.0)
def parameters(self):
p = filter(lambda p: p.requires_grad, nn.Module.parameters(self))
return p
In [ ]:
%autoreload 2
emb = create_emb()
# criterion = nn.BCELoss()
criterion = nn.CrossEntropyLoss()
model = CnnMaxPoolingModuleWithEmbedding(emb)
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
trainer.set_initializers([XavierUniform(module_filter="fc*"), XavierUniform(module_filter="conv*")])
trainer.set_metrics([CategoricalAccuracy()])
# trainer.summary((trn_tensor.size(0), labels_train_tensor.size(0)))
In [ ]:
trainer.fit(trn_tensor, labels_train_tensor, validation_data=(test_tensor, labels_test_tensor),
nb_epoch=10, batch_size=batch_size, shuffle=True)
We already have beaten our previous model! But let's fine-tune the embedding weights - especially since the words we couldn't find in glove just have random embeddings.
In [ ]:
model.embedding.weight.requires_grad = True
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-4)
trainer.set_loss(criterion)
trainer.set_metrics([CategoricalAccuracy()])
In [ ]:
trainer.fit(trn_tensor, labels_train_tensor, validation_data=(test_tensor, labels_test_tensor),
nb_epoch=1, batch_size=batch_size, shuffle=True)
This is an implementation of a multi-size CNN as shown in Ben Bowles' excellent blog post.
We create multiple conv layers of different sizes, and then concatenate them.
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
class CnnMaxPoolingModuleMultiSizeWithEmbedding(nn.Module):
def __init__(self, embedding):
super().__init__()
num_dimensions = 32
self.embedding = embedding
self.drop1 = nn.Dropout(0.25)
self.batchnorm = nn.BatchNorm1d(500)
self.convs = [self.create_conv(embedding, fsz) for fsz in range (3, 6)]
self.fc1 = nn.Linear(25000, 100)
self.dropout = nn.Dropout(0.7)
self.fc2 = nn.Linear(100, 2)
self.init()
def create_conv(self, embedding, fsz):
return nn.Conv1d(in_channels=embedding.weight.size()[1], out_channels=64, kernel_size=5, padding=2, groups=1)
def conv(self, c, x):
x = c(x)
x = F.relu(x, True)
x = self.drop1(x)
x = F.max_pool1d(x, kernel_size=2)
return x
def forward(self, words_ids):
x = self.embedding(words_ids)
x = x.permute(0, 2, 1)
x = self.drop1(x)
convs = [self.conv(conv, x) for conv in self.convs]
torch.cat(convs, dim=1)
x = x.view(x.size(0), -1)
x = self.fc1(x)
x = F.relu(x, True)
x = self.dropout(x)
x = self.fc2(x)
result = x
return result
def init(self):
torch.nn.init.constant(self.fc1.bias, val=0.0)
torch.nn.init.constant(self.fc2.bias, val=0.0)
for conv in self.convs:
torch.nn.init.xavier_uniform(conv.weight.data, gain=1.0)
torch.nn.init.constant(conv.bias, val=0.0)
def parameters(self):
p = filter(lambda p: p.requires_grad, nn.Module.parameters(self))
return p
In [ ]:
%autoreload 2
emb = create_emb()
criterion = nn.CrossEntropyLoss()
model = CnnMaxPoolingModuleMultiSizeWithEmbedding(emb)
model.embedding.weight.requires_grad = True
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
trainer.set_initializers([XavierUniform(module_filter="fc*")])
trainer.set_metrics([CategoricalAccuracy()])
In [ ]:
trainer.fit(trn_tensor, labels_train_tensor, validation_data=(test_tensor, labels_test_tensor),
nb_epoch=10, batch_size=batch_size, shuffle=True)
This is clearly over-fitting. But it does get the highest accuracy on validation set.
We haven't covered this bit yet!
In [ ]:
import torch.nn as nn
import torch.nn.functional as F
class LstmEmbeddingModule(nn.Module):
def __init__(self):
super().__init__()
num_dimensions = 32
self.num_hidden = 100
self.embedding = nn.Embedding(vocab_size, num_dimensions)
self.drop1 = nn.Dropout(0.2)
self.lstm1 = nn.LSTM(input_size=32, hidden_size=self.num_hidden, num_layers=1, batch_first=True)
self.fc1 = nn.Linear(50000, 2)
self.hidden = self.init_hidden(batch_size)
self.init()
def forward(self, words_ids):
# We detach the hidden state from how it was previously produced.
# If we didn't, the model would try backpropagating all the way to start of the dataset.
# self.hidden = self.repackage_hidden(self.hidden)
x = self.embedding(words_ids)
x = self.drop1(x)
#print('embd', x.size())
self.hidden = self.init_hidden(x.size(0))
#lenghts = [vocab_size for _ in range(x.size(0))]
#x = torch.nn.utils.rnn.pack_padded_sequence(x, lenghts, batch_first=True)
#print('pack', x.data.size())
x, self.hidden = self.lstm1(x, self.hidden)
#print('lstm', x.data.size())
#x, _ = torch.nn.utils.rnn.pad_packed_sequence(x, batch_first=True)
#print('unpk', x.size())
# print(self.hidden)
# TODO can we get rid of contiguous?
x = x.contiguous().view(x.size(0), -1)
#print('view', x.size())
x = self.fc1(x)
x = F.relu(x, True)
return x
def init(self):
torch.nn.init.constant(self.fc1.bias, val=0.0)
def init_hidden(self, batch_size):
num_layers = 1
weight = next(self.parameters()).data
return (Variable(weight.new(num_layers, batch_size, self.num_hidden).zero_()),
Variable(weight.new(num_layers, batch_size, self.num_hidden).zero_()))
def repackage_hidden(self, h):
"""Wraps hidden states in new Variables, to detach them from their history."""
if type(h) == Variable:
return Variable(h.data)
else:
return tuple(self.repackage_hidden(v) for v in h)
In [ ]:
%autoreload 2
criterion = nn.CrossEntropyLoss()
model = LstmEmbeddingModule()
if(use_cuda):
model.cuda()
criterion.cuda()
trainer = ModuleTrainer(model)
trainer.set_optimizer(optim.Adam, lr=1e-3)
trainer.set_loss(criterion)
# TODO init LSTM
trainer.set_initializers([Uniform(module_filter="embedding*", a=-0.05, b=0.05), XavierUniform(module_filter="fc*"), XavierUniform(module_filter="conv*")])
trainer.set_metrics([CategoricalAccuracy()])
In [ ]:
# TODO figure out how to do this in PyTorch
trainer.fit(trn_tensor, labels_train_tensor, validation_data=(test_tensor, labels_test_tensor),
nb_epoch=5, batch_size=batch_size, shuffle=True)