LSTM was proposed in 1997 by Sepp Hochreiter and Jürgen Schmidhuber. It is an artificial recurrent neural network (RNN) architecture, developed to deal with the exploding and vanishing gradient problems that can be encountered when training traditional RNNs.
In this lessson, we would introduce several parts below:
This tutorials mainly refer from seq2seq_translation_tutorial and Building an LSTM from Scratch in PyTorch .
In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
import random
import math
import time
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from typing import *
from torch.nn import Parameter
from torch.nn import init
from torch import Tensor
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
Since process of data isn't the center part that we should focus in this tutorials,
we put data related code in utils function.
Actually, if we want to solve a problem seriously, there is no way for us to skip data processing, which may be boring but very important.
In [2]:
from utils import *
In [3]:
# Determine to use GPU or CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
In [4]:
def setup_seed(seed):
"""In order to reproduce the same results
Args:
seed: random seed given by you
"""
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True
In [5]:
class EncoderLSTM(nn.Module):
"""Encoder use LSTM as backbone"""
def __init__(self, input_size: int, hidden_size: int):
"""
Args:
input_size : The number of expected features in the input
hidden_size: The number of features in the hidden state
"""
super(EncoderLSTM, self).__init__()
self.hidden_size = hidden_size
# Retrieve word embeddings with dimentionality hidden_size
# using indices with dimentionality input_size, embeddding is learnable
# After embedding, input vector with input_size would be converted to hidden_size
self.embedding = nn.Embedding(input_size, hidden_size)
# LSTM
self.lstm = nn.LSTM(hidden_size, hidden_size)
def forward(self, inputs: Tensor, state: Tuple[Tensor]):
"""Forward
Args:
inputs: [1, hidden_size]
state : ([1, 1, hidden_size], [1, 1, hidden_size])
Returns:
output:
state: (hidden, cell)
"""
(hidden, cell) = state
# Retrieve word embeddings
embedded = self.embedding(inputs).view(1, 1, -1)
# Directly output embedding
output = embedded
output, (hidden, cell) = self.lstm(output, (hidden, cell))
return output, (hidden, cell)
def init_hidden(self):
"""Init hidden
Returns:
hidden:
cell:
"""
cell = torch.zeros(1, 1, self.hidden_size, device=device)
hidden = torch.zeros(1, 1, self.hidden_size, device=device)
return hidden, cell
The decoder is another LSTM that takes the encoder output vector(s) and outputs a sequence of words to create the translation.
In the simplest seq2seq decoder we use only last output of the encoder.
This last output is sometimes called the context vector as it encodes context from the entire sequence.
This context vector is used as the initial hidden state of the decoder.
At every step of decoding, the decoder is given an input token and hidden state.
The initial input token is the start-of-string
In [6]:
class DecoderLSTM(nn.Module):
"""Decoder use LSTM as backbone"""
def __init__(self, hidden_size: int, output_size: int):
"""
Args:
hidden_size: The number of features in the hidden state
output_size : The number of expected features in the output
"""
super(DecoderLSTM, self).__init__()
self.hidden_size = hidden_size
# Retrieve word embeddings with dimentionality hidden_size
# using indices with dimentionality input_size, embeddding is learnable
# After embedding, input vector with input_size would be converted to hidden_size
self.embedding = nn.Embedding(output_size, hidden_size)
# LSTM
self.lstm = nn.LSTM(hidden_size, hidden_size)
# out
self.out = nn.Linear(hidden_size, output_size)
# log after softmax
self.log_softmax = nn.LogSoftmax(dim=1)
# activation function
self.activation_function = F.relu
def forward(self, inputs, state):
"""Forward
Args:
inputs: [1, hidden_size]
state : ([1, 1, hidden_size], [1, 1, hidden_size])
Returns:
output:
state: (hidden, cell)
"""
(hidden, cell) = state
# Retrieve word embeddings, [1, 1, hidden_size]
output = self.embedding(inputs).view(1, 1, -1)
# activation function, [1, 1, hidden_size]
output = self.activation_function(output)
# output: [1, 1, hidden_size]
output, (hidden, cell) = self.lstm(output, (hidden, cell))
# output: [output_size]
output = self.log_softmax(self.out(output[0]))
return output, (hidden, cell)
def init_hidden(self):
"""Init hidden
Returns:
hidden:
cell:
"""
cell = torch.zeros(1, 1, self.hidden_size, device=device)
hidden = torch.zeros(1, 1, self.hidden_size, device=device)
return hidden, cell
In [7]:
def train_by_sentence(input_tensor, target_tensor, encoder, decoder,
encoder_optimizer, decoder_optimizer, loss_fn,
use_teacher_forcing=True, reverse_source_sentence=True,
max_length=MAX_LENGTH):
"""Train by single sentence using EncoderLSTM and DecoderLSTM
including training and update model
Args:
input_tensor: [input_sequence_len, 1, hidden_size]
target_tensor: [target_sequence_len, 1, hidden_size]
encoder: EncoderLSTM
decoder: DecoderLSTM
encoder_optimizer: optimizer for encoder
decoder_optimizer: optimizer for decoder
loss_fn: loss function
use_teacher_forcing: True is to Feed the target as the next input,
False is to use its own predictions as the next input
max_length: max length for input and output
Returns:
loss: scalar
"""
if reverse_source_sentence:
input_tensor = torch.flip(input_tensor, [0])
hidden, cell = encoder.init_hidden()
# Clears the gradients of all optimized torch.Tensors'
encoder_optimizer.zero_grad()
decoder_optimizer.zero_grad()
# Get sequence length of the input and target sentences.
input_length = input_tensor.size(0)
target_length = target_tensor.size(0)
# encoder outputs: [max_length, hidden_size]
encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
loss = 0
# Get encoder outputs
for ei in range(input_length):
encoder_output, (hidden, cell) = encoder(
input_tensor[ei], (hidden, cell))
encoder_outputs[ei] = encoder_output[0, 0]
# First input for the decoder
decoder_input = torch.tensor([[SOS_token]], device=device)
# Last state of encoder as the init state of decoder
decoder_hidden = (hidden, cell)
for di in range(target_length):
decoder_output, (hidden, cell) = decoder(
decoder_input, (hidden, cell))
if use_teacher_forcing:
# Feed the target as the next input
loss += loss_fn(decoder_output, target_tensor[di])
decoder_input = target_tensor[di] # Teacher forcing
else:
# Use its own predictions as the next input
topv, topi = decoder_output.topk(1)
decoder_input = topi.squeeze().detach()
loss += loss_fn(decoder_output, target_tensor[di])
# End if decoder output End of Signal(EOS)
if decoder_input.item() == EOS_token:
break
loss.backward()
encoder_optimizer.step()
decoder_optimizer.step()
return loss.item() / target_length
In [8]:
def train(encoder, decoder, n_iters, reverse_source_sentence=True,
use_teacher_forcing=True,
print_every=1000, plot_every=100,
learning_rate=0.01):
"""Train of Seq2seq
Args:
encoder: EncoderLSTM
decoder: DecoderLSTM
n_iters: train with n_iters sentences without replacement
reverse_source_sentence: True is to reverse the source sentence
but keep order of target unchanged,
False is to keep order of the source sentence
target unchanged
use_teacher_forcing: True is to Feed the target as the next input,
False is to use its own predictions as the next input
print_every: print log every print_every
plot_every: plot every plot_every
learning_rate:
"""
start = time.time()
plot_losses = []
print_loss_total = 0
plot_loss_total = 0
# Use SGD to optimize encoder and decoder parameters
encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
# Obtain training input
training_pairs = [tensor_from_pair(random.choice(pairs), input_lang, output_lang)
for _ in range(n_iters)]
# Negative log likelihood loss
loss_fn = nn.NLLLoss()
for i in range(1, n_iters+1):
# Get a pair of sentences and move them to device,
# training_pair: ([Seq_size, 1, input_size], [Seq_size, 1, input_size])
training_pair = training_pairs[i-1]
input_tensor = training_pair[0].to(device)
target_tensor = training_pair[1].to(device)
# Train by a pair of source sentence and target sentence
loss = train_by_sentence(input_tensor, target_tensor,
encoder, decoder,
encoder_optimizer, decoder_optimizer,
loss_fn, use_teacher_forcing=use_teacher_forcing,
reverse_source_sentence=reverse_source_sentence)
print_loss_total += loss
plot_loss_total += loss
if i % print_every == 0:
# Print Loss
print_loss_avg = print_loss_total / print_every
print_loss_total = 0
print("%s (%d %d%%) %.4f" % (time_since(start, i / n_iters),
i, i / n_iters * 100, print_loss_avg))
if i % plot_every == 0:
# Plot
plot_loss_avg = plot_loss_total / plot_every
plot_losses.append(plot_loss_avg)
plot_loss_total = 0
# show plot
show_plot(plot_losses)
In [9]:
def evaluate_by_sentence(encoder, decoder, sentence, reverse_source_sentence, max_length=MAX_LENGTH):
"""Evalutae on a source sentence
Args:
encoder
decoder
sentence
max_length
Return:
decoded_words: predicted sentence
"""
with torch.no_grad():
# Get tensor of sentence
input_tensor = tensor_from_sentence(input_lang, sentence).to(device)
input_length = input_tensor.size(0)
if reverse_source_sentence:
input_tensor = torch.flip(input_tensor, [0])
# init state for encoder
(hidden, cell) = encoder.init_hidden()
# encoder outputs: [max_length, hidden_size]
encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
for ei in range(input_length):
encoder_output, (hidden, cell) = encoder(input_tensor[ei],
(hidden, cell))
encoder_outputs[ei] += encoder_output[0, 0]
# Last state of encoder as the init state of decoder
decoder_input = torch.tensor([[SOS_token]], device=device)
decoder_hidden = (hidden, cell)
decoded_words = []
# When evaluate, use its own predictions as the next input
for di in range(max_length):
decoder_output, (hidden, cell) = decoder(decoder_input, (hidden, cell))
topv, topi = decoder_output.data.topk(1)
if topi.item() == EOS_token:
decoded_words.append("<EOS>")
break
else:
decoded_words.append(output_lang.index2word[topi.item()])
decoder_input = topi.squeeze().detach()
return decoded_words
In [10]:
def evaluate_randomly(encoder, decoder, n=10, reverse_source_sentence=True):
"""Random pick sentence from dataset and observe the effect of translation
Args:
encoder:
decoder:
n: numbers of sentences to evaluate
"""
for _ in range(n):
pair = random.choice(pairs)
# Source sentence
print(">", pair[0])
# Target sentence
print("=", pair[1])
output_words = evaluate_by_sentence(encoder, decoder, pair[0], reverse_source_sentence)
output_sentence = " ".join(output_words)
# Predicted sentence
print("<", output_sentence)
print("")
In [11]:
def show_plot(points):
"""Plot according to points"""
plt.figure()
fig, ax = plt.subplots()
loc = ticker.MultipleLocator(base=0.2)
ax.yaxis.set_major_locator(loc)
plt.plot(points)
plt.show()
In [19]:
# prepare_data defined in utis.py
# reverse to True here means, source sentence is English,
# while target sentence is France
input_lang, output_lang, pairs = prepare_data('eng', 'fra', reverse=True)
print(random.choice(pairs))
In [14]:
setup_seed(45)
hidden_size = 256
# Reverse the order of source input sentence
reverse_source_sentence = True
# Feed the target as the next input
use_teacher_forcing = True
encoder = EncoderLSTM(input_lang.n_words, hidden_size).to(device)
decoder = DecoderLSTM(hidden_size, output_lang.n_words).to(device)
print(">> Model is on: {}".format(next(encoder.parameters()).is_cuda))
print(">> Model is on: {}".format(next(decoder.parameters()).is_cuda))
In [15]:
iters = 50000
train(encoder, decoder, iters, reverse_source_sentence=reverse_source_sentence,
use_teacher_forcing=use_teacher_forcing,print_every=250, plot_every=250)
In [16]:
# Randomly pick up 10 sentence and observe the performance
evaluate_randomly(encoder, decoder, 10, reverse_source_sentence)
答:通过观察运行的结果,在句子是顺序和逆序两种情况下,实际上收敛速度差不多,没有明显的差距。
答:可以发现不指定use_teacher_forcing
的情况下,开始loss值会比上面模型的loss值要低,但是训练的效果不如上面的模型的训练效果,最终网络模型会比上面的模型略差。
答:使用Tanh的一开始收敛速度会快于ReLU,但是最终网络模型的loss值都相差不大。
In [17]:
# Hw 1.1
setup_seed(45)
hidden_size = 256
# Reverse the order of source input sentence
reverse_source_sentence = False
# Feed the target as the next input
use_teacher_forcing = True
encoder = EncoderLSTM(input_lang.n_words, hidden_size).to(device)
decoder = DecoderLSTM(hidden_size, output_lang.n_words).to(device)
print(">> Model is on: {}".format(next(encoder.parameters()).is_cuda))
print(">> Model is on: {}".format(next(decoder.parameters()).is_cuda))
iters = 50000
train(encoder, decoder, iters, reverse_source_sentence=reverse_source_sentence,
use_teacher_forcing=use_teacher_forcing,print_every=250, plot_every=250)
In [18]:
# Hw 1.2
setup_seed(45)
hidden_size = 256
# Reverse the order of source input sentence
reverse_source_sentence = True
# Feed the target as the next input
use_teacher_forcing = False
encoder = EncoderLSTM(input_lang.n_words, hidden_size).to(device)
decoder = DecoderLSTM(hidden_size, output_lang.n_words).to(device)
print(">> Model is on: {}".format(next(encoder.parameters()).is_cuda))
print(">> Model is on: {}".format(next(decoder.parameters()).is_cuda))
iters = 50000
train(encoder, decoder, iters, reverse_source_sentence=reverse_source_sentence,
use_teacher_forcing=use_teacher_forcing,print_every=250, plot_every=250)
In [21]:
# Hw 1.3
# TODO: change activation of DecoderLSTM firstly
class DecoderLSTM_v2(nn.Module):
"""Decoder use LSTM as backbone"""
def __init__(self, hidden_size: int, output_size: int):
"""
Args:
hidden_size: The number of features in the hidden state
output_size : The number of expected features in the output
"""
super(DecoderLSTM_v2, self).__init__()
self.hidden_size = hidden_size
# Retrieve word embeddings with dimentionality hidden_size
# using indices with dimentionality input_size, embeddding is learnable
# After embedding, input vector with input_size would be converted to hidden_size
self.embedding = nn.Embedding(output_size, hidden_size)
# LSTM
self.lstm = nn.LSTM(hidden_size, hidden_size)
# out
self.out = nn.Linear(hidden_size, output_size)
# log after softmax
self.log_softmax = nn.LogSoftmax(dim=1)
# activation function, TODO!!
self.activation_function = torch.tanh
def forward(self, inputs, state):
"""Forward
Args:
inputs: [1, hidden_size]
state : ([1, 1, hidden_size], [1, 1, hidden_size])
Returns:
output:
state: (hidden, cell)
"""
(hidden, cell) = state
# Retrieve word embeddings, [1, 1, hidden_size]
output = self.embedding(inputs).view(1, 1, -1)
# activation function, [1, 1, hidden_size]
output = self.activation_function(output)
# output: [1, 1, hidden_size]
output, (hidden, cell) = self.lstm(output, (hidden, cell))
# output: [output_size]
output = self.log_softmax(self.out(output[0]))
return output, (hidden, cell)
def init_hidden(self):
"""Init hidden
Returns:
hidden:
cell:
"""
cell = torch.zeros(1, 1, self.hidden_size, device=device)
hidden = torch.zeros(1, 1, self.hidden_size, device=device)
return hidden, cell
In [22]:
setup_seed(45)
hidden_size = 256
# Reverse the order of source input sentence
reverse_source_sentence = True
# Feed the target as the next input
use_teacher_forcing = True
encoder = EncoderLSTM(input_lang.n_words, hidden_size).to(device)
decoder = DecoderLSTM_v2(hidden_size, output_lang.n_words).to(device)
print(">> Model is on: {}".format(next(encoder.parameters()).is_cuda))
print(">> Model is on: {}".format(next(decoder.parameters()).is_cuda))
iters = 50000
train(encoder, decoder, iters, reverse_source_sentence=reverse_source_sentence,
use_teacher_forcing=use_teacher_forcing,print_every=250, plot_every=250)
Why we need attention mechanism ?
In short version, because seq2seq could achieve better performance and consumes less time with attention mechanism.
In long version, attention allows the decoder network to “focus” on a different part of the encoder’s outputs for every step of the decoder’s own outputs.
For simplicity, we change DecoderLSTM to AttentionDecoderLSTM and some hepler function and then
we can train model.
Very Detail of AttentionDecoderLSTM
Since there are many ways to do attention, we select a simple way to do that.
First we calculate a set of attention weights.
These will be multiplied by the encoder output vectors to create a weighted combination.
The result (called attention_applied in the code) should contain information about that
specific part of the input sequence, and thus help the decoder choose the right output words.
In [12]:
class AttentionDecoderLSTM(nn.Module):
def __init__(self, hidden_size: int, output_size: int, dropout_p=0.1, max_length=MAX_LENGTH):
"""DecoderLSTM with attention mechanism
"""
super(AttentionDecoderLSTM, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
self.dropout_p = dropout_p
self.max_length = max_length
# Retrieve word embeddings with dimentionality hidden_size
# using indices with dimentionality input_size, embeddding is learnable
# After embedding, input vector with input_size would be converted to hidden_size
self.embedding = nn.Embedding(self.output_size, self.hidden_size)
# W1
self.attention = nn.Linear(self.hidden_size * 2, self.max_length)
# W2
self.attention_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
self.dropout = nn.Dropout(self.dropout_p)
self.lstm = nn.LSTM(self.hidden_size, self.hidden_size)
# prediction layer
self.out = nn.Linear(self.hidden_size, self.output_size)
# activation
self.activation_fn = F.relu
def forward(self, inputs, state, encoder_outputs):
"""Forward
Args:
inputs: [1, hidden_size]
state : ([1, 1, hidden_size], [1, 1, hidden_size])
encoder_outputs: [max_length, hidden_size]
Returns:
output:
state: (hidden, cell)
"""
# embedded: [1, 1, hidden_size]
embedded = self.embedding(inputs).view(1, 1, -1)
embedded = self.dropout(embedded)
(hidden, cell) = state
# attention_weights: [1, max_length]
attention_weights = F.softmax(
self.attention(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
# attention_applied: [1, hidden_size]
# torch.bmm == @, matrix muplication
attention_applied = torch.bmm(attention_weights.unsqueeze(0),
encoder_outputs.unsqueeze(0))
# output: [1, hidden_size * 2]
output = torch.cat((embedded[0], attention_applied[0]), 1)
# output: [1, 1, hidden_size]
output = self.attention_combine(output).unsqueeze(0)
output = self.activation_fn(output)
# output, [1, 1, output_size]
output, (hidden, cell) = self.lstm(output, (hidden, cell))
# output, [1, output_size]
output = F.log_softmax(self.out(output[0]), dim=1)
return output, (hidden, cell), attention_weights
def init_hidden(self):
"""Init hidden
Returns:
hidden:
cell:
"""
cell = torch.zeros(1, 1, self.hidden_size, device=device)
hidden = torch.zeros(1, 1, self.hidden_size, device=device)
return hidden, cell
In [13]:
def train_by_sentence_attn(input_tensor, target_tensor, encoder, decoder,
encoder_optimizer, decoder_optimizer, loss_fn,
use_teacher_forcing=True, reverse_source_sentence=True,
max_length=MAX_LENGTH):
"""Train by single sentence using EncoderLSTM and DecoderLSTM
including training and update model, combining attention mechanism.
Args:
input_tensor: [input_sequence_len, 1, hidden_size]
target_tensor: [target_sequence_len, 1, hidden_size]
encoder: EncoderLSTM
decoder: DecoderLSTM
encoder_optimizer: optimizer for encoder
decoder_optimizer: optimizer for decoder
loss_fn: loss function
use_teacher_forcing: True is to Feed the target as the next input,
False is to use its own predictions as the next input
max_length: max length for input and output
Returns:
loss: scalar
"""
if reverse_source_sentence:
input_tensor = torch.flip(input_tensor, [0])
hidden, cell = encoder.init_hidden()
# Clears the gradients of all optimized torch.Tensors'
encoder_optimizer.zero_grad()
decoder_optimizer.zero_grad()
# Get sequence length of the input and target sentences.
input_length = input_tensor.size(0)
target_length = target_tensor.size(0)
# encoder outputs: [max_length, hidden_size]
encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
loss = 0
# Get encoder outputs
for ei in range(input_length):
encoder_output, (hidden, cell) = encoder(
input_tensor[ei], (hidden, cell))
encoder_outputs[ei] = encoder_output[0, 0]
# First input for the decoder
decoder_input = torch.tensor([[SOS_token]], device=device)
# Last state of encoder as the init state of decoder
decoder_hidden = (hidden, cell)
for di in range(target_length):
# !! Most important change, apply attention mechnism
decoder_output, (hidden, cell), _ = decoder(
decoder_input, (hidden, cell), encoder_outputs)
if use_teacher_forcing:
# Feed the target as the next input
loss += loss_fn(decoder_output, target_tensor[di])
decoder_input = target_tensor[di] # Teacher forcing
else:
# Use its own predictions as the next input
topv, topi = decoder_output.topk(1)
decoder_input = topi.squeeze().detach()
loss += loss_fn(decoder_output, target_tensor[di])
# End if decoder output End of Signal(EOS)
if decoder_input.item() == EOS_token:
break
loss.backward()
encoder_optimizer.step()
decoder_optimizer.step()
return loss.item() / target_length
In [14]:
def train_attn(encoder, decoder, n_iters, reverse_source_sentence=True,
use_teacher_forcing=True,
print_every=1000, plot_every=100,
learning_rate=0.01):
"""Train of Seq2seq with attention
Args:
encoder: EncoderLSTM
decoder: DecoderLSTM
n_iters: train with n_iters sentences without replacement
reverse_source_sentence: True is to reverse the source sentence
but keep order of target unchanged,
False is to keep order of the source sentence
target unchanged
use_teacher_forcing: True is to Feed the target as the next input,
False is to use its own predictions as the next input
print_every: print log every print_every
plot_every: plot every plot_every
learning_rate:
"""
start = time.time()
plot_losses = []
print_loss_total = 0
plot_loss_total = 0
# Use SGD to optimize encoder and decoder parameters
encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
# Obtain training input
training_pairs = [tensor_from_pair(random.choice(pairs), input_lang, output_lang)
for _ in range(n_iters)]
# Negative log likelihood loss
loss_fn = nn.NLLLoss()
for i in range(1, n_iters+1):
# Get a pair of sentences and move them to device,
# training_pair: ([Seq_size, 1, input_size], [Seq_size, 1, input_size])
training_pair = training_pairs[i-1]
input_tensor = training_pair[0].to(device)
target_tensor = training_pair[1].to(device)
# Train by a pair of source sentence and target sentence
loss = train_by_sentence_attn(input_tensor, target_tensor,
encoder, decoder,
encoder_optimizer, decoder_optimizer,
loss_fn, use_teacher_forcing=use_teacher_forcing,
reverse_source_sentence=reverse_source_sentence)
print_loss_total += loss
plot_loss_total += loss
if i % print_every == 0:
# Print Loss
print_loss_avg = print_loss_total / print_every
print_loss_total = 0
print("%s (%d %d%%) %.4f" % (time_since(start, i / n_iters),
i, i / n_iters * 100, print_loss_avg))
if i % plot_every == 0:
# Plot
plot_loss_avg = plot_loss_total / plot_every
plot_losses.append(plot_loss_avg)
plot_loss_total = 0
# show plot
show_plot(plot_losses)
In [15]:
def evaluate_by_sentence_attn(encoder, decoder, sentence,
reverse_source_sentence=True, max_length=MAX_LENGTH):
"""Evalutae on a source sentence with model trained with attention mechanism
Args:
encoder
decoder
sentence
max_length
Return:
decoded_words: predicted sentence
"""
with torch.no_grad():
# Get tensor of sentence
input_tensor = tensor_from_sentence(input_lang, sentence).to(device)
input_length = input_tensor.size(0)
if reverse_source_sentence:
input_tensor = torch.flip(input_tensor, [0])
# init state for encoder
(hidden, cell) = encoder.init_hidden()
# encoder outputs: [max_length, hidden_size]
encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)
for ei in range(input_length):
encoder_output, (hidden, cell) = encoder(input_tensor[ei],
(hidden, cell))
encoder_outputs[ei] += encoder_output[0, 0]
# Last state of encoder as the init state of decoder
decoder_input = torch.tensor([[SOS_token]], device=device)
decoder_hidden = (hidden, cell)
decoded_words = []
# CHANGE!! Add decoder_attentions to collect attention map
decoder_attentions = torch.zeros(max_length, max_length)
# When evaluate, use its own predictions as the next input
for di in range(max_length):
# CHANGE!! Attention
decoder_output, (hidden, cell), decoder_attention = \
decoder(decoder_input, (hidden, cell), encoder_outputs)
topv, topi = decoder_output.data.topk(1)
# CHANGE!!
decoder_attentions[di] = decoder_attention.data
if topi.item() == EOS_token:
decoded_words.append("<EOS>")
break
else:
decoded_words.append(output_lang.index2word[topi.item()])
decoder_input = topi.squeeze().detach()
return decoded_words, decoder_attentions[:di + 1]
In [16]:
def show_attention(input_sentence, output_words, attentions):
"""Show attention between input sentence and output words
"""
# Set up figure with colorbar
fig = plt.figure()
ax = fig.add_subplot(111)
cax = ax.matshow(attentions.numpy(), cmap='bone')
fig.colorbar(cax)
# Set up axes
ax.set_xticklabels([''] + input_sentence.split(' ') +
['<EOS>'], rotation=90)
ax.set_yticklabels([''] + output_words)
# Show label at every tick
ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
ax.yaxis.set_major_locator(ticker.MultipleLocator(1))
plt.show()
In [17]:
def evaluate_and_show_attention(input_sentence, encoder, decoder):
"""Evaluate and show attention for a input sentence
"""
output_words, attentions = evaluate_by_sentence_attn(
encoder, decoder, input_sentence)
print('input =', input_sentence)
print('output =', ' '.join(output_words))
show_attention(input_sentence, output_words, attentions)
In [20]:
setup_seed(45)
hidden_size = 256
# Reverse the order of source input sentence
reverse_source_sentence = True
# Feed the target as the next input
use_teacher_forcing = True
encoder = EncoderLSTM(input_lang.n_words, hidden_size).to(device)
decoder = AttentionDecoderLSTM(hidden_size, output_lang.n_words).to(device)
print(">> Model is on: {}".format(next(encoder.parameters()).is_cuda))
print(">> Model is on: {}".format(next(decoder.parameters()).is_cuda))
In [21]:
iters = 50000
train_attn(encoder, decoder, iters, reverse_source_sentence=reverse_source_sentence,
use_teacher_forcing=use_teacher_forcing,print_every=250, plot_every=250)
In [22]:
evaluate_and_show_attention("elle a cinq ans de moins que moi .", encoder, decoder)
evaluate_and_show_attention("elle est trop petit .", encoder, decoder)
evaluate_and_show_attention("je ne crains pas de mourir .", encoder, decoder)
evaluate_and_show_attention("c est un jeune directeur plein de talent .", encoder, decoder)
@ 指的是 matrix mipliplication
In [23]:
class NaiveLSTM(nn.Module):
"""Naive LSTM like nn.LSTM"""
def __init__(self, input_size: int, hidden_size: int):
super(NaiveLSTM, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
# input gate
self.w_ii = Parameter(Tensor(hidden_size, input_size))
self.w_hi = Parameter(Tensor(hidden_size, hidden_size))
self.b_ii = Parameter(Tensor(hidden_size, 1))
self.b_hi = Parameter(Tensor(hidden_size, 1))
# forget gate
self.w_if = Parameter(Tensor(hidden_size, input_size))
self.w_hf = Parameter(Tensor(hidden_size, hidden_size))
self.b_if = Parameter(Tensor(hidden_size, 1))
self.b_hf = Parameter(Tensor(hidden_size, 1))
# output gate
self.w_io = Parameter(Tensor(hidden_size, input_size))
self.w_ho = Parameter(Tensor(hidden_size, hidden_size))
self.b_io = Parameter(Tensor(hidden_size, 1))
self.b_ho = Parameter(Tensor(hidden_size, 1))
# cell
self.w_ig = Parameter(Tensor(hidden_size, input_size))
self.w_hg = Parameter(Tensor(hidden_size, hidden_size))
self.b_ig = Parameter(Tensor(hidden_size, 1))
self.b_hg = Parameter(Tensor(hidden_size, 1))
self.reset_weigths()
def reset_weigths(self):
"""reset weights
"""
stdv = 1.0 / math.sqrt(self.hidden_size)
for weight in self.parameters():
init.uniform_(weight, -stdv, stdv)
def forward(self, inputs: Tensor, state: Tuple[Tensor]) \
-> Tuple[Tensor, Tuple[Tensor, Tensor]]:
"""Forward
Args:
inputs: [1, 1, input_size]
state: ([1, 1, hidden_size], [1, 1, hidden_size])
"""
# seq_size, batch_size, _ = inputs.size()
if state is None:
h_t = torch.zeros(1, self.hidden_size).t()
c_t = torch.zeros(1, self.hidden_size).t()
else:
(h, c) = state
h_t = h.squeeze(0).t()
c_t = c.squeeze(0).t()
hidden_seq = []
seq_size = 1
for t in range(seq_size):
x = inputs[:, t, :].t()
# input gate
i = torch.sigmoid(self.w_ii @ x + self.b_ii + self.w_hi @ h_t +
self.b_hi)
# forget gate
f = torch.sigmoid(self.w_if @ x + self.b_if + self.w_hf @ h_t +
self.b_hf)
# cell
g = torch.tanh(self.w_ig @ x + self.b_ig + self.w_hg @ h_t
+ self.b_hg)
# output gate
o = torch.sigmoid(self.w_io @ x + self.b_io + self.w_ho @ h_t +
self.b_ho)
c_next = f * c_t + i * g
h_next = o * torch.tanh(c_next)
c_next_t = c_next.t().unsqueeze(0)
h_next_t = h_next.t().unsqueeze(0)
hidden_seq.append(h_next_t)
hidden_seq = torch.cat(hidden_seq, dim=0)
return hidden_seq, (h_next_t, c_next_t)
In [24]:
def reset_weigths(model):
"""reset weights
"""
for weight in model.parameters():
init.constant_(weight, 0.5)
In [25]:
inputs = torch.ones(1, 1, 10)
h0 = torch.ones(1, 1, 20)
c0 = torch.ones(1, 1, 20)
print(h0.shape, h0)
print(c0.shape, c0)
print(inputs.shape, inputs)
In [26]:
# test naive_lstm with input_size=10, hidden_size=20
naive_lstm = NaiveLSTM(10, 20)
reset_weigths(naive_lstm)
In [27]:
output1, (hn1, cn1) = naive_lstm(inputs, (h0, c0))
In [28]:
print(hn1.shape, cn1.shape, output1.shape)
print(hn1)
print(cn1)
print(output1)
In [29]:
# Use official lstm with input_size=10, hidden_size=20
lstm = nn.LSTM(10, 20)
reset_weigths(lstm)
In [30]:
output2, (hn2, cn2) = lstm(inputs, (h0, c0))
print(hn2.shape, cn2.shape, output2.shape)
print(hn2)
print(cn2)
print(output2)
In [31]:
# Implementation of RNN for our experiment
from NaiveRNN import NaiveRNN
In [32]:
hidden_size = 50
input_size = 100
sequence_len = 100
high = 1000000
In [33]:
# Generate random input with sequence_len=100
test_idx = torch.randint(high=high, size=(1, sequence_len)).to(device)
print(test_idx)
In [34]:
setup_seed(45)
embeddings = nn.Embedding(high, input_size).to(device)
test_embeddings = embeddings(test_idx).to(device)
print(test_embeddings)
h_0 = torch.zeros(1, hidden_size, requires_grad=True).to(device)
h_t = h_0
print(h_0)
print(test_embeddings)
In [35]:
def rnn_step(x, h, w_ih, b_ih, w_hh, b_hh):
"""run rnn a step
"""
h = torch.tanh(w_ih @ x.t() + b_ih + w_hh @ h.t() + b_hh)
h_t = h.t()
return h_t
In [36]:
print(test_embeddings)
rnn = NaiveRNN(input_size, hidden_size).to(device)
iters = test_embeddings.size(1)
rnn_grads = []
for t in range(iters):
h_t = rnn_step(test_embeddings[: , t, :], h_t,
rnn.w_ih, rnn.b_ih, rnn.w_hh, rnn.b_hh)
loss = h_t.abs().sum()
h_0.retain_grad()
loss.backward(retain_graph=True)
rnn_grads.append(torch.norm(h_0.grad).item())
h_0.grad.zero_()
rnn.zero_grad()
In [37]:
plt.plot(rnn_grads)
Out[37]:
In [38]:
def show_gates(i_s, o_s, f_s):
"""Show input gate, output gate, forget gate for LSTM
"""
plt.plot(i_s, "r", label="input gate")
plt.plot(o_s, "b", label="output gate")
plt.plot(f_s, "g", label="forget gate")
plt.title('Input gate, output gate and forget gate of LSTM')
plt.xlabel('t', color='#1C2833')
plt.ylabel('Mean Value', color='#1C2833')
plt.legend(loc='best')
plt.grid()
plt.show()
In [39]:
def lstm_step(x, h, c, w_ii, b_ii, w_hi, b_hi,
w_if, b_if, w_hf, b_hf,
w_ig, b_ig, w_hg, b_hg,
w_io, b_io, w_ho, b_ho, use_forget_gate=True):
"""run lstm a step
"""
x_t = x.t()
h_t = h.t()
c_t = c.t()
i = torch.sigmoid(w_ii @ x_t + b_ii + w_hi @ h_t + b_hi)
o = torch.sigmoid(w_io @ x_t + b_io + w_ho @ h_t + b_ho)
g = torch.tanh(w_ig @ x_t + b_ig + w_hg @ h_t + b_hg)
f = torch.sigmoid(w_if @ x_t + b_if + w_hf @ h_t + b_hf)
if use_forget_gate:
c_next = f * c_t + i * g
else:
c_next = c_t + i * g
h_next = o * torch.tanh(c_next)
c_next_t = c_next.t()
h_next_t = h_next.t()
i_avg = torch.mean(i).detach()
o_avg = torch.mean(o).detach()
f_avg = torch.mean(f).detach()
return h_next_t, c_next_t, f_avg, i_avg, o_avg
In [40]:
setup_seed(45)
embeddings = nn.Embedding(high, input_size).to(device)
test_embeddings = embeddings(test_idx).to(device)
h_0 = torch.zeros(1, hidden_size, requires_grad=True).to(device)
c_0 = torch.zeros(1, hidden_size, requires_grad=True).to(device)
h_t = h_0
c_t = c_0
print(test_embeddings)
print(h_0)
print(c_0)
In [41]:
lstm = NaiveLSTM(input_size, hidden_size).to(device)
iters = test_embeddings.size(1)
lstm_grads = []
i_s = []
o_s = []
f_s = []
for t in range(iters):
h_t, c_t, f, i, o = lstm_step(test_embeddings[: , t, :], h_t, c_t,
lstm.w_ii, lstm.b_ii, lstm.w_hi, lstm.b_hi,
lstm.w_if, lstm.b_if, lstm.w_hf, lstm.b_hf,
lstm.w_ig, lstm.b_ig, lstm.w_hg, lstm.b_hg,
lstm.w_io, lstm.b_io, lstm.w_ho, lstm.b_ho,
use_forget_gate=False)
loss = h_t.abs().sum()
h_0.retain_grad()
loss.backward(retain_graph=True)
lstm_grads.append(torch.norm(h_0.grad).item())
i_s.append(i)
o_s.append(o)
f_s.append(f)
h_0.grad.zero_()
lstm.zero_grad()
In [42]:
plt.plot(lstm_grads)
Out[42]:
In [43]:
show_gates(i_s, o_s, f_s)
In [44]:
setup_seed(45)
embeddings = nn.Embedding(high, input_size).to(device)
test_embeddings = embeddings(test_idx).to(device)
h_0 = torch.zeros(1, hidden_size, requires_grad=True).to(device)
c_0 = torch.zeros(1, hidden_size, requires_grad=True).to(device)
h_t = h_0
c_t = c_0
print(test_embeddings)
print(h_0)
print(c_0)
In [45]:
lstm = NaiveLSTM(input_size, hidden_size).to(device)
## BIG CHANGE!!
lstm.b_hf.data = torch.ones_like(lstm.b_hf) * 1/2
lstm.b_if.data = torch.ones_like(lstm.b_if) * 1/2
iters = test_embeddings.size(1)
lstm_grads = []
i_s = []
o_s = []
f_s = []
for t in range(iters):
h_t, c_t, f, i, o = lstm_step(test_embeddings[: , t, :], h_t, c_t,
lstm.w_ii, lstm.b_ii, lstm.w_hi, lstm.b_hi,
lstm.w_if, lstm.b_if, lstm.w_hf, lstm.b_hf,
lstm.w_ig, lstm.b_ig, lstm.w_hg, lstm.b_hg,
lstm.w_io, lstm.b_io, lstm.w_ho, lstm.b_ho,
use_forget_gate=True)
loss = h_t.abs().sum()
h_0.retain_grad()
loss.backward(retain_graph=True)
lstm_grads.append(torch.norm(h_0.grad).item())
i_s.append(i)
o_s.append(o)
f_s.append(f)
h_0.grad.zero_()
lstm.zero_grad()
In [46]:
plt.plot(lstm_grads)
Out[46]:
In [47]:
show_gates(i_s, o_s, f_s)
EncoderLSTM -> EncoderGRU, 将nn.LSTM换成nn.GRU
DecoderLSTM -> DecoderGRU, 将nn.LSTM换成nn.GRU
train_by_sentence -> train_by_sentence_v2
train -> train_v2
evaluate -> train_v2
In [ ]: