In ths notebook, we're going to train a simple RNN to do time-series prediction. Given some set of input data, it should be able to generate a prediction for the next time step!
- First, we'll create our data
- Then, define an RNN in PyTorch
- Finally, we'll train our network and see how it performs
In [1]:
import torch
from torch import nn
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
In [2]:
plt.figure(figsize=(8,5))
# how many time steps/data pts are in one batch of data
seq_length = 20
# generate evenly spaced data pts
time_steps = np.linspace(start=0, stop=np.pi, num=seq_length + 1)
data = np.sin(time_steps)
# Size becomes (seq_length+1, 1), adds an input_size dimension
data.resize((seq_length + 1, 1))
# Input X becomes all but the last piece of data
x = data[:-1]
# Target Y becomes all but the first piece of data
y = data[1:]
# display the data
plt.plot(time_steps[1:], x, 'r.', label='input, x')
plt.plot(time_steps[1:], y, 'b.', label='target, y')
plt.legend(loc='best')
plt.show()
Next, we define an RNN in PyTorch. We'll use nn.RNN to create an RNN layer, then we'll add a last, fully-connected layer to get the output size that we want. An RNN takes in a number of parameters:
Take a look at the RNN documentation to read more about recurrent layers.
In [3]:
class RNN(nn.Module):
def __init__(self, input_size, output_size, hidden_dim, n_layers):
super(RNN, self).__init__()
self.hidden_dim=hidden_dim
# Define an RNN with specified parameters
# Batch_first means that the first dim of the input and output will be the batch_size
self.rnn = nn.RNN(input_size=input_size,
hidden_size=hidden_dim,
num_layers=n_layers,
batch_first=True)
# last, fully-connected layer
self.fc = nn.Linear(hidden_dim, output_size)
def forward(self, x, hidden):
# x (batch_size, seq_length, input_size)
# hidden (n_layers, batch_size, hidden_dim)
# r_out (batch_size, time_step, hidden_size)
batch_size = x.size(0)
# Get RNN outputs
r_out, hidden = self.rnn(x, hidden)
# Shape output to be (batch_size*seq_length, hidden_dim)
r_out = r_out.view(-1, self.hidden_dim)
# Get final output
output = self.fc(r_out)
return output, hidden
In [4]:
# Test that dimensions are as expected
test_rnn = RNN(input_size=1,
output_size=1,
hidden_dim=10,
n_layers=2)
# Generate evenly spaced, test data pts
time_steps = np.linspace(0, np.pi, seq_length)
data = np.sin(time_steps)
data.resize((seq_length, 1))
# Give it a batch_size of 1 as first dimension
print('Input size before: ', torch.Tensor(data).size())
test_input = torch.Tensor(data).unsqueeze(0)
print('Input size after: ', test_input.size())
# Test out rnn sizes
test_out, test_h = test_rnn(test_input, None)
print('Output size: ', test_out.size())
print('Hidden state size: ', test_h.size())
In [5]:
# Decide on hyperparameters
input_size=1
output_size=1
hidden_dim=32
n_layers=1
# Instantiate an RNN
rnn = RNN(input_size, output_size, hidden_dim, n_layers)
print(rnn)
This is a regression problem: can we train an RNN to accurately predict the next data point, given a current data point?
- The data points are coordinate values, so to compare a predicted and ground_truth point, we'll use a regression loss: the mean squared error.
- It's typical to use an Adam optimizer for recurrent models.
In [6]:
# MSE loss and Adam optimizer with a learning rate of 0.01
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(params=rnn.parameters(),
lr=0.01)
This function takes in an rnn, a number of steps to train for, and returns a trained rnn. This function is also responsible for displaying the loss and the predictions, every so often.
Pay close attention to the hidden state, here:
In [7]:
# Train the RNN
def train(rnn, n_steps, print_every):
# Initialize the hidden state
hidden = None
for batch_i, step in enumerate(range(n_steps)):
# Defining the training data
time_steps = np.linspace(step * np.pi, (step+1)*np.pi, seq_length + 1)
data = np.sin(time_steps)
data.resize((seq_length + 1, 1)) # input_size=1
x = data[:-1]
y = data[1:]
# Convert data into Tensors
# unsqueeze gives a 1, batch_size dimension
x_tensor = torch.Tensor(x).unsqueeze(0)
y_tensor = torch.Tensor(y)
# Outputs from the rnn
prediction, hidden = rnn(x_tensor, hidden)
## Representing Memory ##
# Make a new variable for hidden and detach the hidden state from its history
# this way, we don't backpropagate through the entire history
hidden = hidden.data
# Calculate the loss
loss = criterion(prediction, y_tensor)
# Zero gradients
optimizer.zero_grad()
# Perform backprop and update weights
loss.backward()
optimizer.step()
# Display loss and predictions
if batch_i % print_every == 0:
print('Loss: ', loss.item())
# Plot input
plt.plot(time_steps[1:], x, 'r.')
# Plot predictions
plt.plot(time_steps[1:], prediction.data.numpy().flatten(), 'b.') # predictions
plt.show()
return rnn
In [8]:
# Train the rnn and monitor results
n_steps = 75
print_every = 15
trained_rnn = train(rnn, n_steps, print_every)