This is the second model from Jason Brownlee's Long Short Term Memory Networks with Python book. His book has implementations using Keras, this notebook is an equivalent Pytorch implementation of the same problem. From section 7.2 of the book:
We can generate sequences of damped sine waves of different periods and damping decay values, and withold the last few points in each series, and have the model predict them. From a model perspective, a sequence of n timesteps are fed in, then at the end of the sequence a single prediction (consisting of a vector of multiple features that we will interpret as time steps).
In [1]:
from __future__ import division, print_function
from sklearn.metrics import mean_absolute_error
import torch
import torch.nn as nn
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import shutil
%matplotlib inline
In [2]:
def generate_sequence(length, period, decay):
return [0.5 + 0.5 * math.sin(2 * math.pi * i / period) * math.exp(-decay * i)
for i in range(length)]
def generate_data(num_samples, input_length, output_length):
xs, ys = [], []
for _ in range(num_samples):
p = np.random.randint(low=10, high=20)
d = np.random.uniform(0.01, 0.1)
sequence = generate_sequence(input_length + output_length, p, d)
xs.append(sequence[:-output_length])
ys.append(sequence[-output_length:])
X = np.array(xs).reshape(num_samples, input_length, 1)
y = np.array(ys).reshape(num_samples, output_length)
return X, y
In [3]:
X, y = generate_data(5, 20, 5)
print(X.shape, y.shape)
for i in range(X.shape[0]):
plt.plot([x for x in X[i, :, 0]] + [x for x in y[i]], marker="o")
plt.axvline(x=20, color="k", linestyle="--")
plt.show()
The parameters for our model is a little different from that proposed in the book. Instead of having (10k, 0.1k, 1k) records for training, validation and test and training the model for 1 epoch, we have 1/10-th the data volume and we train for 10 epochs. We end up getting slightly better results than the model on the original data volumes.
In [4]:
NUM_TRAIN_SAMPLES = 1000
NUM_VAL_SAMPLES = 10
NUM_TEST_SAMPLES = 100
INPUT_SEQ_LEN = 50
OUTPUT_SEQ_LEN = 5
INPUT_DIM = 1
HIDDEN_DIM = 20
NUM_LAYERS = 2
OUTPUT_DIM = 5
LEARNING_RATE = 1e-3
BATCH_SIZE = 10
NUM_EPOCHS = 10
DATA_DIR = "../../data"
MODEL_FILE = os.path.join(DATA_DIR, "torch-07-dsw-pred-{:d}.model")
In [5]:
Xtrain, ytrain = generate_data(NUM_TRAIN_SAMPLES, INPUT_SEQ_LEN, OUTPUT_SEQ_LEN)
Xval, yval = generate_data(NUM_VAL_SAMPLES, INPUT_SEQ_LEN, OUTPUT_SEQ_LEN)
Xtest, ytest = generate_data(NUM_TEST_SAMPLES, INPUT_SEQ_LEN, OUTPUT_SEQ_LEN)
print(Xtrain.shape, ytrain.shape, Xval.shape, yval.shape, Xtest.shape, ytest.shape)
In [6]:
class DampedSineWavePredictor(nn.Module):
def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
super(DampedSineWavePredictor, self).__init__()
self.num_layers = num_layers
self.hidden_dim = hidden_dim
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
if torch.cuda.is_available():
h0 = (Variable(torch.randn(self.num_layers, x.size(0), self.hidden_dim).cuda()),
Variable(torch.randn(self.num_layers, x.size(0), self.hidden_dim).cuda()))
else:
h0 = (Variable(torch.randn(self.num_layers, x.size(0), self.hidden_dim)),
Variable(torch.randn(self.num_layers, x.size(0), self.hidden_dim)))
out, h0 = self.lstm(x, h0)
out = self.fc(out[:, -1, :])
return out
In [7]:
model = DampedSineWavePredictor(INPUT_DIM, HIDDEN_DIM, NUM_LAYERS, OUTPUT_DIM)
if torch.cuda.is_available():
model.cuda()
print(model)
In [8]:
loss_fn = nn.L1Loss()
# loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
In [9]:
history = []
for epoch in range(NUM_EPOCHS):
num_batches = Xtrain.shape[0] // BATCH_SIZE
shuffled_indices = np.random.permutation(np.arange(Xtrain.shape[0]))
train_loss = 0.
for bid in range(num_batches):
Xbatch_data = Xtrain[shuffled_indices[bid * BATCH_SIZE : (bid+1) * BATCH_SIZE]]
ybatch_data = ytrain[shuffled_indices[bid * BATCH_SIZE : (bid+1) * BATCH_SIZE]]
Xbatch = Variable(torch.from_numpy(Xbatch_data).float())
ybatch = Variable(torch.from_numpy(ybatch_data).float())
if torch.cuda.is_available():
Xbatch = Xbatch.cuda()
ybatch = ybatch.cuda()
# initialize gradients
optimizer.zero_grad()
# forward
ybatch_ = model(Xbatch)
loss = loss_fn(ybatch_, ybatch)
# backward
loss.backward()
if torch.cuda.is_available():
train_loss += loss.cpu().data[0]
else:
train_loss += loss.data[0]
optimizer.step()
# compute training loss and accuracy
train_loss /= num_batches
# compute validation loss and accuracy
num_val_batches = Xval.shape[0] // BATCH_SIZE
val_loss = 0.
for bid in range(num_val_batches):
Xbatch_data = Xval[bid * BATCH_SIZE : (bid+1) * BATCH_SIZE]
ybatch_data = yval[bid * BATCH_SIZE : (bid+1) * BATCH_SIZE]
Xbatch = Variable(torch.from_numpy(Xbatch_data).float())
ybatch = Variable(torch.from_numpy(ybatch_data).float())
if torch.cuda.is_available():
Xbatch = Xbatch.cuda()
ybatch = ybatch.cuda()
ybatch_ = model(Xbatch)
loss = loss_fn(ybatch_, ybatch)
if torch.cuda.is_available():
val_loss += loss.cpu().data[0]
else:
val_loss += loss.data[0]
val_loss /= num_val_batches
torch.save(model.state_dict(), MODEL_FILE.format(epoch+1))
print("Epoch {:2d}/{:d}: loss={:.3f}, val_loss={:.3f}"
.format((epoch+1), NUM_EPOCHS, train_loss, val_loss))
history.append((train_loss, val_loss))
In [10]:
train_losses = [x[0] for x in history]
val_losses = [x[1] for x in history]
plt.plot(train_losses, color="b", label="train")
plt.plot(val_losses, color="r", label="val")
plt.legend(loc="best")
plt.xlabel("epochs")
plt.ylabel("loss (MAE)")
plt.show()
In [11]:
saved_model = DampedSineWavePredictor(INPUT_DIM, HIDDEN_DIM, NUM_LAYERS, OUTPUT_DIM)
saved_model.load_state_dict(torch.load(MODEL_FILE.format(1)))
if torch.cuda.is_available():
saved_model.cuda()
In [12]:
ylabels, ypreds = [], []
num_test_batches = Xtest.shape[0] // BATCH_SIZE
for bid in range(num_test_batches):
Xbatch_data = Xtest[bid * BATCH_SIZE : (bid + 1) * BATCH_SIZE]
ybatch_data = ytest[bid * BATCH_SIZE : (bid + 1) * BATCH_SIZE]
Xbatch = Variable(torch.from_numpy(Xbatch_data).float())
ybatch = Variable(torch.from_numpy(ybatch_data).float())
if torch.cuda.is_available():
Xbatch = Xbatch.cuda()
ybatch = ybatch.cuda()
Ybatch_ = saved_model(Xbatch)
if torch.cuda.is_available():
ylabels.extend(ybatch.cpu().data.numpy())
ypreds.extend(ybatch_.cpu().data.numpy())
else:
ylabels.extend(ybatch.data.numpy())
ypreds.extend(ybatch_.data.numpy())
test_mae = mean_absolute_error(ypreds, ylabels)
print("MAE on test set: {:.5f}".format(test_mae))
In [13]:
samples = np.random.randint(0, Xtest.shape[0], 3)
for i in range(len(samples)):
plt.subplot(1, 3, (i+1))
label = ylabels[samples[i]]
pred = ypreds[samples[i]]
xs = np.arange(len(label))
plt.ylim([0., 1.])
plt.plot(xs, label, color="b", marker="o", label="y")
plt.plot(xs, pred, color="orange", marker="o", label="yhat")
plt.legend(loc="best")
plt.tight_layout()
plt.show()
In [14]:
for i in range(NUM_EPOCHS):
os.remove(MODEL_FILE.format(i + 1))
In [ ]: