According to Wikipedia:
Part-of-speech tagging (POS tagging or PoS tagging or POST) is the process of marking up a word in a text (corpus) as corresponding to a particular part of speech, based on both its definition and its context—i.e., its relationship with adjacent and related words in a phrase, sentence, or paragraph.
Formally, given a sequence of words $\mathbf{x} = \left< x_1, x_2, \ldots, x_t \right>$ the goal is to learn a model $P(y_i \,|\, \mathbf{x})$ where $y_i$ is the POS tag associated with the $x_i$. Note that the model is conditioned on all of $\mathbf{x}$ not just the words that occur earlier in the sentence - this is because we can assume that the entire sentence is known at the time of tagging.
We will train our model on the Engligh Dependencies Treebank. You can download this dataset by running the following lines:
In [ ]:
!wget https://raw.githubusercontent.com/UniversalDependencies/UD_English/master/en-ud-dev.conllu
!wget https://raw.githubusercontent.com/UniversalDependencies/UD_English/master/en-ud-test.conllu
!wget https://raw.githubusercontent.com/UniversalDependencies/UD_English/master/en-ud-train.conllu
The individual data instances come in chunks seperated by blank lines. Each chunk consists of a few starting comments, and then lines of tab-seperated fields. The fields we are interested in are the 1st and 3rd, which contain the tokenized word and POS tag respectively. An example chunk is shown below:
# sent_id = answers-20111107193044AAvUYBv_ans-0023
# text = Hope you have a crapload of fun!
1 Hope hope VERB VBP Mood=Ind|Tense=Pres|VerbForm=Fin 0 root 0:root _
2 you you PRON PRP Case=Nom|Person=2|PronType=Prs 3 nsubj 3:nsubj _
3 have have VERB VBP Mood=Ind|Tense=Pres|VerbForm=Fin 1 ccomp 1:ccomp _
4 a a DET DT Definite=Ind|PronType=Art 5 det 5:det _
5 crapload crapload NOUN NN Number=Sing 3 obj 3:obj _
6 of of ADP IN _ 7 case 7:case _
7 fun fun NOUN NN Number=Sing 5 nmod 5:nmod SpaceAfter=No
8 ! ! PUNCT . _ 1 punct 1:punct _
As with most real world data, we are going to need to do some preprocessing before we can use it. The first thing we are going to need is a Vocabulary
to map words/POS tags to integer ids. Here is a more full-featured implementation than what we used in the first tutorial:
In [3]:
from collections import Counter
class Vocab(object):
def __init__(self, iter, max_size=None, sos_token=None, eos_token=None, unk_token=None):
"""Initialize the vocabulary.
Args:
iter: An iterable which produces sequences of tokens used to update
the vocabulary.
max_size: (Optional) Maximum number of tokens in the vocabulary.
sos_token: (Optional) Token denoting the start of a sequence.
eos_token: (Optional) Token denoting the end of a sequence.
unk_token: (Optional) Token denoting an unknown element in a
sequence.
"""
self.max_size = max_size
self.pad_token = '<pad>'
self.sos_token = sos_token
self.eos_token = eos_token
self.unk_token = unk_token
# Add special tokens.
id2word = [self.pad_token]
if sos_token is not None:
id2word.append(self.sos_token)
if eos_token is not None:
id2word.append(self.eos_token)
if unk_token is not None:
id2word.append(self.unk_token)
# Update counter with token counts.
counter = Counter()
for x in iter:
counter.update(x)
# Extract lookup tables.
if max_size is not None:
counts = counter.most_common(max_size)
else:
counts = counter.items()
counts = sorted(counts, key=lambda x: x[1], reverse=True)
words = [x[0] for x in counts]
id2word.extend(words)
word2id = {x: i for i, x in enumerate(id2word)}
self._id2word = id2word
self._word2id = word2id
def __len__(self):
return len(self._id2word)
def word2id(self, word):
"""Map a word in the vocabulary to its unique integer id.
Args:
word: Word to lookup.
Returns:
id: The integer id of the word being looked up.
"""
if word in self._word2id:
return self._word2id[word]
elif self.unk_token is not None:
return self._word2id[self.unk_token]
else:
raise KeyError('Word "%s" not in vocabulary.' % word)
def id2word(self, id):
"""Map an integer id to its corresponding word in the vocabulary.
Args:
id: Integer id of the word being looked up.
Returns:
word: The corresponding word.
"""
return self._id2word[id]
Now we need to parse the .conllu files and extract the data needed for our model. The good news is that the file is only a few megabytes so we can store everything in memory. Rather than creating a generator from scratch like we did in the previous tutorial, we will instead showcase the torch.utils.data.Dataset
class. There are two main things that a Dataset
must have:
__len__
method which let's you know how many data points are in the dataset.__getitem__
method which is used to support integer indexing.Here's an example of how to define these methods for the English Dependencies Treebank data.
In [4]:
import re
from torch.utils.data import Dataset
class Annotation(object):
def __init__(self):
"""A helper object for storing annotation data."""
self.tokens = []
self.pos_tags = []
class CoNLLDataset(Dataset):
def __init__(self, fname):
"""Initializes the CoNLLDataset.
Args:
fname: The .conllu file to load data from.
"""
self.fname = fname
self.annotations = self.process_conll_file(fname)
self.token_vocab = Vocab([x.tokens for x in self.annotations],
unk_token='<unk>')
self.pos_vocab = Vocab([x.pos_tags for x in self.annotations])
def __len__(self):
return len(self.annotations)
def __getitem__(self, idx):
annotation = self.annotations[idx]
input = [self.token_vocab.word2id(x) for x in annotation.tokens]
target = [self.pos_vocab.word2id(x) for x in annotation.pos_tags]
return input, target
def process_conll_file(self, fname):
# Read the entire file.
with open(fname, 'r') as f:
raw_text = f.read()
# Split into chunks on blank lines.
chunks = re.split(r'^\n', raw_text, flags=re.MULTILINE)
# Process each chunk into an annotation.
annotations = []
for chunk in chunks:
annotation = Annotation()
lines = chunk.split('\n')
# Iterate over all lines in the chunk.
for line in lines:
# If line is empty ignore it.
if len(line)==0:
continue
# If line is a commend ignore it.
if line[0] == '#':
continue
# Otherwise split on tabs and retrieve the token and the
# POS tag fields.
fields = line.split('\t')
annotation.tokens.append(fields[1])
annotation.pos_tags.append(fields[3])
if (len(annotation.tokens) > 0) and (len(annotation.pos_tags) > 0):
annotations.append(annotation)
return annotations
And let's see how this is used in practice.
In [5]:
dataset = CoNLLDataset('en-ud-train.conllu')
In [6]:
input, target = dataset[0]
print('Example input: %s\n' % input)
print('Example target: %s\n' % target)
print('Translated input: %s\n' % ' '.join(dataset.token_vocab.id2word(x) for x in input))
print('Translated target: %s\n' % ' '.join(dataset.pos_vocab.id2word(x) for x in target))
The main upshot of using the Dataset
class is that it makes accessing training/test observations very simple. Accordingly, this makes batch generation easy since all we need to do is randomly choose numbers and then grab those observations from the dataset - PyTorch includes a torch.utils.data.DataLoader
object which handles this for you. In fact, if we were not working with sequential data we would be able to proceed straight to the modeling step from here. However, since we are working with sequential data there is one last pesky issue we need to handle - padding.
The issue is that when we are given a batch of outputs from CoNLLDataset
, the sequences in the batch are likely to all be of different length. To deal with this, we define a custom collate_annotations
function which adds padding to the end of the sequences in the batch so that they are all the same length. In addition, we'll have this function take care of loading the data into tensors and ensuring that the tensor dimensions are in the order expected by PyTorch.
Oh and one last annoying thing - to deal with some of the issues caused by using padded data we will be using a function called torch.nn.utils.rnn.pack_padded_sequences
in our model later on. All you need to know now is that this function expects our sequences in the batch to be sorted in terms of descending length, and that we know the lengths of each sequence. So we will make sure that the collate_annotations
function performs this sorting for us and returns the sequence lengths in addition to the input and target tensors.
In [7]:
import torch
from torch.autograd import Variable
def pad(sequences, max_length, pad_value=0):
"""Pads a list of sequences.
Args:
sequences: A list of sequences to be padded.
max_length: The length to pad to.
pad_value: The value used for padding.
Returns:
A list of padded sequences.
"""
out = []
for sequence in sequences:
padded = sequence + [0]*(max_length - len(sequence))
out.append(padded)
return out
def collate_annotations(batch):
"""Function used to collate data returned by CoNLLDataset."""
# Get inputs, targets, and lengths.
inputs, targets = zip(*batch)
lengths = [len(x) for x in inputs]
# Sort by length.
sort = sorted(zip(inputs, targets, lengths),
key=lambda x: x[2],
reverse=True)
inputs, targets, lengths = zip(*sort)
# Pad.
max_length = max(lengths)
inputs = pad(inputs, max_length)
targets = pad(targets, max_length)
# Transpose.
inputs = list(map(list, zip(*inputs)))
targets = list(map(list, zip(*targets)))
# Convert to PyTorch variables.
inputs = Variable(torch.LongTensor(inputs))
targets = Variable(torch.LongTensor(targets))
lengths = Variable(torch.LongTensor(lengths))
if torch.cuda.is_available():
inputs = inputs.cuda()
targets = targets.cuda()
lengths = lengths.cuda()
return inputs, targets, lengths
Again let's see how this is used in practice:
In [8]:
from torch.utils.data import DataLoader
for inputs, targets, lengths in DataLoader(dataset, batch_size=16, collate_fn=collate_annotations):
print('Inputs: %s\n' % inputs.data)
print('Targets: %s\n' % targets.data)
print('Lengths: %s\n' % lengths.data)
# Usually we'd keep sampling batches, but here we'll just break
break
We will use the following architecture:
There is one complication which arises during the forward computation. As was noted in the dataset section, the input sequences are padded. This causes an issue since we do not want to waste computational resources feeding these pad tokens into the RNN. In PyTorch, we can deal with this issue by converting the sequence data into a torch.nn.utils.rnn.PackedSequence
object before feeding it into the RNN. In essence, a PackedSequence
flattens the sequence and batch dimensions of a tensor, and also contains metadata so that PyTorch knows when to re-initialize the hidden state when fed into a recurrent layer. If this seems confusing, do not worry. To use the PackedSequence
in practice you will almost always perform the following steps:
PackedSequence
by using the function torch.nn.utils.rnn.pack_padded_sequence()
.PackedSequence
into the recurrent layer.torch.nn.utils.rnn.pad_packed_sequence()
.See the model implementation below for a working example:
In [9]:
from torch import nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
class Tagger(nn.Module):
def __init__(self,
input_vocab_size,
output_vocab_size,
embedding_dim=64,
hidden_size=64,
bidirectional=True):
"""Initializes the tagger.
Args:
input_vocab_size: Size of the input vocabulary.
output_vocab_size: Size of the output vocabulary.
embedding_dim: Dimension of the word embeddings.
hidden_size: Number of units in each LSTM hidden layer.
bidirectional: Whether or not to use a bidirectional rnn.
"""
# Always do this!!!
super(Tagger, self).__init__()
# Store parameters
self.input_vocab_size = input_vocab_size
self.output_vocab_size = output_vocab_size
self.embedding_dim = embedding_dim
self.hidden_size = hidden_size
self.bidirectional = bidirectional
# Define layers
self.word_embeddings = nn.Embedding(input_vocab_size, embedding_dim,
padding_idx=0)
self.rnn = nn.GRU(embedding_dim, hidden_size,
bidirectional=bidirectional,
dropout=0.9)
if bidirectional:
self.fc = nn.Linear(2*hidden_size, output_vocab_size)
else:
self.fc = nn.Linear(hidden_size, output_vocab_size)
self.activation = nn.LogSoftmax(dim=2)
def forward(self, x, lengths=None, hidden=None):
"""Computes a forward pass of the language model.
Args:
x: A LongTensor w/ dimension [seq_len, batch_size].
lengths: The lengths of the sequences in x.
hidden: Hidden state to be fed into the lstm.
Returns:
net: Probability of the next word in the sequence.
hidden: Hidden state of the lstm.
"""
seq_len, batch_size = x.size()
# If no hidden state is provided, then default to zeros.
if hidden is None:
if self.bidirectional:
num_directions = 2
else:
num_directions = 1
hidden = Variable(torch.zeros(num_directions, batch_size, self.hidden_size))
if torch.cuda.is_available():
hidden = hidden.cuda()
net = self.word_embeddings(x)
# Pack before feeding into the RNN.
if lengths is not None:
lengths = lengths.data.view(-1).tolist()
net = pack_padded_sequence(net, lengths)
net, hidden = self.rnn(net, hidden)
# Unpack after
if lengths is not None:
net, _ = pad_packed_sequence(net)
net = self.fc(net)
net = self.activation(net)
return net, hidden
In [10]:
import numpy as np
# Load datasets.
train_dataset = CoNLLDataset('en-ud-train.conllu')
dev_dataset = CoNLLDataset('en-ud-dev.conllu')
dev_dataset.token_vocab = train_dataset.token_vocab
dev_dataset.pos_vocab = train_dataset.pos_vocab
# Hyperparameters / constants.
input_vocab_size = len(train_dataset.token_vocab)
output_vocab_size = len(train_dataset.pos_vocab)
batch_size = 16
epochs = 6
# Initialize the model.
model = Tagger(input_vocab_size, output_vocab_size)
if torch.cuda.is_available():
model = model.cuda()
# Loss function weights.
weight = torch.ones(output_vocab_size)
weight[0] = 0
if torch.cuda.is_available():
weight = weight.cuda()
# Initialize loss function and optimizer.
loss_function = torch.nn.NLLLoss(weight)
optimizer = torch.optim.Adam(model.parameters())
# Main training loop.
data_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True,
collate_fn=collate_annotations)
dev_loader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False,
collate_fn=collate_annotations)
losses = []
i = 0
for epoch in range(epochs):
for inputs, targets, lengths in data_loader:
optimizer.zero_grad()
outputs, _ = model(inputs, lengths=lengths)
outputs = outputs.view(-1, output_vocab_size)
targets = targets.view(-1)
loss = loss_function(outputs, targets)
loss.backward()
optimizer.step()
losses.append(loss.data[0])
if (i % 10) == 0:
# Compute dev loss over entire dev set.
# NOTE: This is expensive. In your work you may want to only use a
# subset of the dev set.
dev_losses = []
for inputs, targets, lengths in dev_loader:
outputs, _ = model(inputs, lengths=lengths)
outputs = outputs.view(-1, output_vocab_size)
targets = targets.view(-1)
loss = loss_function(outputs, targets)
dev_losses.append(loss.data[0])
avg_train_loss = np.mean(losses)
avg_dev_loss = np.mean(dev_losses)
losses = []
print('Iteration %i - Train Loss: %0.6f - Dev Loss: %0.6f' % (i, avg_train_loss, avg_dev_loss), end='\r')
torch.save(model, 'pos_tagger.pt')
i += 1
torch.save(model, 'pos_tagger.final.pt')
In [11]:
# Collect the predictions and targets
y_true = []
y_pred = []
for inputs, targets, lengths in dev_loader:
outputs, _ = model(inputs, lengths=lengths)
_, preds = torch.max(outputs, dim=2)
targets = targets.view(-1)
preds = preds.view(-1)
if torch.cuda.is_available():
targets = targets.cpu()
preds = preds.cpu()
y_true.append(targets.data.numpy())
y_pred.append(preds.data.numpy())
# Stack into numpy arrays
y_true = np.concatenate(y_true)
y_pred = np.concatenate(y_pred)
# Compute accuracy
acc = np.mean(y_true[y_true != 0] == y_pred[y_true != 0])
print('Accuracy - %0.6f\n' % acc)
# Evaluate f1-score
from sklearn.metrics import f1_score
score = f1_score(y_true, y_pred, average=None)
print('F1-scores:\n')
for label, score in zip(dev_dataset.pos_vocab._id2word[1:], score[1:]):
print('%s - %0.6f' % (label, score))
In [12]:
model = torch.load('pos_tagger.final.pt')
def inference(sentence):
# Convert words to id tensor.
ids = [[dataset.token_vocab.word2id(x)] for x in sentence]
ids = Variable(torch.LongTensor(ids))
if torch.cuda.is_available():
ids = ids.cuda()
# Get model output.
output, _ = model(ids)
_, preds = torch.max(output, dim=2)
if torch.cuda.is_available():
preds = preds.cpu()
preds = preds.data.view(-1).numpy()
pos_tags = [dataset.pos_vocab.id2word(x) for x in preds]
for word, tag in zip(sentence, pos_tags):
print('%s - %s' % (word, tag))
In [27]:
sentence = "sdfgkj asd;glkjsdg ;lkj .".split()
inference(sentence)
According to Wikipedia:
Opinion mining (sometimes known as sentiment analysis or emotion AI) refers to the use of natural language processing, text analysis, computational linguistics, and biometrics to systematically identify, extract, quantify, and study affective states and subjective information.
Formally, given a sequence of words $\mathbf{x} = \left< x_1, x_2, \ldots, x_t \right>$ the goal is to learn a model $P(y \,|\, \mathbf{x})$ where $y$ is the sentiment associated to the sentence. This is very similar to the problem above, with the exception that we only want a single output for each sentence not a sentence. Accordingly, we will only highlight the neccessary changes that need to be made.
We will be using the Kaggle 'Sentiment Analysis on Movie Reviews' dataset [link]. You will need to agree to the Kaggle terms of service in order to download this data. The following code can be used to process this data.
In [15]:
import torch
from collections import Counter
from torch.autograd import Variable
from torch.utils.data import Dataset
class Annotation(object):
def __init__(self):
"""A helper object for storing annotation data."""
self.tokens = []
self.sentiment = None
class SentimentDataset(Dataset):
def __init__(self, fname):
"""Initializes the SentimentDataset.
Args:
fname: The .tsv file to load data from.
"""
self.fname = fname
self.annotations = self.process_tsv_file(fname)
self.token_vocab = Vocab([x.tokens for x in self.annotations],
unk_token='<unk>')
def __len__(self):
return len(self.annotations)
def __getitem__(self, idx):
annotation = self.annotations[idx]
input = [self.token_vocab.word2id(x) for x in annotation.tokens]
target = annotation.sentiment
return input, target
def process_tsv_file(self, fname):
# Read the entire file.
with open(fname, 'r') as f:
lines = f.readlines()
annotations = []
observed_ids = set()
for line in lines[1:]:
annotation = Annotation()
_, sentence_id, sentence, sentiment = line.split('\t')
sentence_id = sentence_id
if sentence_id in observed_ids:
continue
else:
observed_ids.add(sentence_id)
annotation.tokens = sentence.split()
annotation.sentiment = int(sentiment)
if len(annotation.tokens) > 0:
annotations.append(annotation)
return annotations
def pad(sequences, max_length, pad_value=0):
"""Pads a list of sequences.
Args:
sequences: A list of sequences to be padded.
max_length: The length to pad to.
pad_value: The value used for padding.
Returns:
A list of padded sequences.
"""
out = []
for sequence in sequences:
padded = sequence + [0]*(max_length - len(sequence))
out.append(padded)
return out
def collate_annotations(batch):
"""Function used to collate data returned by CoNLLDataset."""
# Get inputs, targets, and lengths.
inputs, targets = zip(*batch)
lengths = [len(x) for x in inputs]
# Sort by length.
sort = sorted(zip(inputs, targets, lengths),
key=lambda x: x[2],
reverse=True)
inputs, targets, lengths = zip(*sort)
# Pad.
max_length = max(lengths)
inputs = pad(inputs, max_length)
# Transpose.
inputs = list(map(list, zip(*inputs)))
# Convert to PyTorch variables.
inputs = Variable(torch.LongTensor(inputs))
targets = Variable(torch.LongTensor(targets))
lengths = Variable(torch.LongTensor(lengths))
if torch.cuda.is_available():
inputs = inputs.cuda()
targets = targets.cuda()
lengths = lengths.cuda()
return inputs, targets, lengths
The model architecture we will use for sentiment classification is almost exactly the same as the one we used for tagging. The only difference is that we want the model to produce a single output at the end, not a sequence of outputs. While there are many ways to do this, a simple approach is to just use the final hidden state of the recurrent layer as the input to the fully connected layer. This approach is particularly nice in PyTorch since the forward pass of the recurrent layer returns the final hidden states as its second output (see the note in the code below if this is unclear), so we do not need to do any fancy indexing tricks to get them.
Formally, the model architecture we will use is:
In [16]:
from torch import nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
class SentimentClassifier(nn.Module):
def __init__(self,
input_vocab_size,
output_vocab_size,
embedding_dim=64,
hidden_size=64):
"""Initializes the tagger.
Args:
input_vocab_size: Size of the input vocabulary.
output_vocab_size: Size of the output vocabulary.
embedding_dim: Dimension of the word embeddings.
hidden_size: Number of units in each LSTM hidden layer.
"""
# Always do this!!!
super(SentimentClassifier, self).__init__()
# Store parameters
self.input_vocab_size = input_vocab_size
self.output_vocab_size = output_vocab_size
self.embedding_dim = embedding_dim
self.hidden_size = hidden_size
# Define layers
self.word_embeddings = nn.Embedding(input_vocab_size, embedding_dim,
padding_idx=0)
self.rnn = nn.GRU(embedding_dim, hidden_size, dropout=0.9)
self.fc = nn.Linear(hidden_size, output_vocab_size)
self.activation = nn.LogSoftmax(dim=2)
def forward(self, x, lengths=None, hidden=None):
"""Computes a forward pass of the language model.
Args:
x: A LongTensor w/ dimension [seq_len, batch_size].
lengths: The lengths of the sequences in x.
hidden: Hidden state to be fed into the lstm.
Returns:
net: Probability of the next word in the sequence.
hidden: Hidden state of the lstm.
"""
seq_len, batch_size = x.size()
# If no hidden state is provided, then default to zeros.
if hidden is None:
hidden = Variable(torch.zeros(1, batch_size, self.hidden_size))
if torch.cuda.is_available():
hidden = hidden.cuda()
net = self.word_embeddings(x)
if lengths is not None:
lengths_list = lengths.data.view(-1).tolist()
net = pack_padded_sequence(net, lengths_list)
net, hidden = self.rnn(net, hidden)
# NOTE: we are using hidden as the input to the fully-connected layer, not net!!!
net = self.fc(hidden)
net = self.activation(net)
return net, hidden
In [17]:
import numpy as np
from torch.utils.data import DataLoader
# Load dataset.
sentiment_dataset = SentimentDataset('train.tsv')
# Hyperparameters / constants.
input_vocab_size = len(sentiment_dataset.token_vocab)
output_vocab_size = 5
batch_size = 16
epochs = 7
# Initialize the model.
model = SentimentClassifier(input_vocab_size, output_vocab_size)
if torch.cuda.is_available():
model = model.cuda()
# Initialize loss function and optimizer.
loss_function = torch.nn.NLLLoss()
optimizer = torch.optim.Adam(model.parameters())
# Main training loop.
data_loader = DataLoader(sentiment_dataset, batch_size=batch_size, shuffle=True,
collate_fn=collate_annotations)
losses = []
i = 0
for epoch in range(epochs):
for inputs, targets, lengths in data_loader:
optimizer.zero_grad()
outputs, _ = model(inputs, lengths=lengths)
outputs = outputs.view(-1, output_vocab_size)
targets = targets.view(-1)
loss = loss_function(outputs, targets)
loss.backward()
optimizer.step()
losses.append(loss.data[0])
if (i % 100) == 0:
average_loss = np.mean(losses)
losses = []
print('Iteration %i - Loss: %0.6f' % (i, average_loss), end='\r')
if (i % 1000) == 0:
torch.save(model, 'sentiment_classifier.pt')
i += 1
torch.save(model, 'sentiment_classifier.final.pt')
In [18]:
model = torch.load('sentiment_classifier.final.pt')
def inference(sentence):
# Convert words to id tensor.
ids = [[sentiment_dataset.token_vocab.word2id(x)] for x in sentence]
ids = Variable(torch.LongTensor(ids))
if torch.cuda.is_available():
ids = ids.cuda()
# Get model output.
output, _ = model(ids)
_, pred = torch.max(output, dim=2)
if torch.cuda.is_available():
pred = pred.cpu()
pred = pred.data.view(-1).numpy()
print('Sentence: %s' % ' '.join(sentence))
print('Sentiment (0=negative, 4=positive): %i' % pred)
In [25]:
sentence = 'Zot zot .'.split()
inference(sentence)