Example, some code and a lot of inspiration taken from: https://machinelearningmastery.com/how-to-develop-lstm-models-for-time-series-forecasting/
In [0]:
!pip install -q tf-nightly-gpu-2.0-preview
In [0]:
import tensorflow as tf
print(tf.__version__)
In [0]:
# univariate data preparation
import numpy as np
# split a univariate sequence into samples
def split_sequence(sequence, n_steps):
X, y = list(), list()
for i in range(len(sequence)):
# find the end of this pattern
end_ix = i + n_steps
# check if we are beyond the sequence
if end_ix > len(sequence)-1:
break
# gather input and output parts of the pattern
seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
X.append(seq_x)
y.append(seq_y)
return np.array(X), np.array(y)
In [0]:
# define input sequence
raw_seq = [10, 20, 30, 40, 50, 60, 70, 80, 90]
# choose a number of time steps
n_steps = 3
# split into samples
X, y = split_sequence(raw_seq, n_steps)
# summarize the data
list(zip(X, y))
Out[0]:
In [0]:
n_features = 1
X = X.reshape((X.shape[0], X.shape[1], n_features))
In [0]:
# one output for each input timestep
# ideal for feeding into something that *expects* timesteps
rnn_units = 1
from tensorflow.keras.layers import Dense, LSTM, GRU, SimpleRNN, Bidirectional
from tensorflow.keras.models import Sequential, Model
model = Sequential([
SimpleRNN(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), return_sequences=True)
])
# https://keras.io/layers/recurrent/
# input: (samples, timesteps, input_dim)
# output with return_sequences: (samples, timesteps, units)
predict(model, [[10, 20, 30]])
Out[0]:
In [0]:
rnn_units = 50
model = Sequential([
SimpleRNN(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), return_sequences=True, name="RNN_Input"),
SimpleRNN(units=rnn_units, activation='relu', name="RNN_Latent"),
Dense(units=1, name="Linear_Output")
])
model.compile(optimizer='adam', loss='mse')
model.summary()
In [0]:
%time history = model.fit(X, y, epochs=500, verbose=0)
import matplotlib.pyplot as plt
plt.yscale('log')
plt.ylabel("loss")
plt.xlabel("epochs")
plt.plot(history.history['loss'])
Out[0]:
In [0]:
predict(model, [[10, 20, 30], [70, 80, 90], [100, 110, 120], [200, 210, 220], [200, 300, 400]])
Out[0]:
In [0]:
rnn_units = 50
model = Sequential([
LSTM(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"),
Dense(units=1, name="Linear_Output")
])
model.compile(optimizer='adam', loss='mse')
model.summary()
In [0]:
# https://arxiv.org/ftp/arxiv/papers/1701/1701.05923.pdf
# n = output dimension
# m = input dimension
# Total number of parameters for
# Simple RNN = n**2 + nm + n
# GRU = 3 × (n**2 + nm + n)
# LSTM = 4 × (n**2 + nm + n)
output_dimension = rnn_units
input_dimension = n_features
parameters = 4 * (output_dimension ** 2 + output_dimension * input_dimension + output_dimension)
parameters
Out[0]:
In [0]:
%time history = model.fit(X, y, epochs=500, verbose=0)
plt.yscale('log')
plt.ylabel("loss")
plt.xlabel("epochs")
plt.plot(history.history['loss'])
Out[0]:
In [0]:
predict(model, [[10, 20, 30], [70, 80, 90], [100, 110, 120], [200, 210, 220], [200, 300, 400]])
Out[0]:
In [0]:
rnn_units = 50
model = Sequential([
GRU(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"),
Dense(units=1, name="Linear_Output")
])
model.compile(optimizer='adam', loss='mse')
model.summary()
In [0]:
output_dimension = rnn_units
input_dimension = n_features
parameters = 3 * (output_dimension ** 2 + output_dimension * input_dimension + output_dimension)
parameters
Out[0]:
In [0]:
%time history = model.fit(X, y, epochs=500, verbose=0)
plt.yscale('log')
plt.ylabel("loss")
plt.xlabel("epochs")
plt.plot(history.history['loss'])
Out[0]:
In [0]:
predict(model, [[10, 20, 30], [70, 80, 90], [100, 110, 120], [200, 210, 220], [200, 300, 400]])
Out[0]:
In [0]:
in_seq1 = [10, 20, 30, 40, 50, 60, 70, 80, 90]
in_seq2 = [15, 25, 35, 45, 55, 65, 75, 85, 95]
out_seq = [in1 + in2 for in1, in2 in zip(in_seq1, in_seq2)]
out_seq
Out[0]:
In [0]:
# convert to [rows, columns] structure
in_seq1 = np.array(in_seq1).reshape((len(in_seq1), 1))
in_seq2 = np.array(in_seq2).reshape((len(in_seq2), 1))
out_seq = np.array(out_seq).reshape((len(out_seq), 1))
out_seq
Out[0]:
In [0]:
# horizontally stack columns
dataset = np.hstack((in_seq1, in_seq2, out_seq))
dataset
Out[0]:
In [0]:
# split a multivariate sequence into samples
def split_sequences(sequences, n_steps):
X, y = list(), list()
for i in range(len(sequences)):
# find the end of this pattern
end_ix = i + n_steps
# check if we are beyond the dataset
if end_ix > len(sequences):
break
# gather input and output parts of the pattern
seq_x, seq_y = sequences[i:end_ix, :-1], sequences[end_ix-1, -1]
X.append(seq_x)
y.append(seq_y)
return np.array(X), np.array(y)
In [0]:
# choose a number of time steps
n_steps = 3
# convert into input/output
X, y = split_sequences(dataset, n_steps)
# summarize the data
list(zip(X, y))
Out[0]:
In [0]:
# the dataset knows the number of features, e.g. 2
n_features = X.shape[2]
# define model
model = Sequential()
model.add(GRU(units=50, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"))
model.add(Dense(units=1, name="Linear_Output"))
model.compile(optimizer='adam', loss='mse')
In [0]:
# fit model
%time history = model.fit(X, y, epochs=500, verbose=0)
import matplotlib.pyplot as plt
plt.yscale('log')
plt.plot(history.history['loss'])
Out[0]:
In [0]:
def predict_multi(model, samples):
input = np.array(samples)
input = input.reshape(1, input.shape[0], input.shape[1])
y_pred = model.predict(input)
return y_pred
In [0]:
predict_multi(model, [[80, 85], [90, 95], [100, 105]])
Out[0]:
In [0]:
predict_multi(model, [[10, 15], [20, 25], [30, 35]])
Out[0]:
In [0]:
predict_multi(model, [[180, 185], [190, 195], [200, 205]])
Out[0]:
In [0]:
y += 20
list(zip(X, y))
Out[0]:
In [0]:
model = Sequential()
model.add(GRU(units=50, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"))
model.add(Dense(units=1, name="Linear_Output"))
model.compile(optimizer='adam', loss='mse')
# train a little bit longer, as this should be harder now
%time history = model.fit(X, y, epochs=2000, verbose=0)
import matplotlib.pyplot as plt
plt.yscale('log')
plt.plot(history.history['loss'])
Out[0]:
In [0]:
predict_multi(model, [[80, 85], [90, 95], [100, 105]])
Out[0]:
In [0]:
predict_multi(model, [[10, 15], [20, 25], [30, 35]])
Out[0]:
In [0]:
predict_multi(model, [[180, 185], [190, 195], [200, 205]])
Out[0]:
In [0]:
# split a univariate sequence into samples
def split_sequence(sequence, n_steps_in, n_steps_out):
X, y = list(), list()
for i in range(len(sequence)):
# find the end of this pattern
end_ix = i + n_steps_in
out_end_ix = end_ix + n_steps_out
# check if we are beyond the sequence
if out_end_ix > len(sequence):
break
# gather input and output parts of the pattern
seq_x, seq_y = sequence[i:end_ix], sequence[end_ix:out_end_ix]
X.append(seq_x)
y.append(seq_y)
return np.array(X), np.array(y)
# define input sequence
raw_seq = [10, 20, 30, 40, 50, 60, 70, 80, 90]
# choose a number of time steps
n_steps_in, n_steps_out = 3, 2
# split into samples
X, y = split_sequence(raw_seq, n_steps_in, n_steps_out)
# summarize the data
for input, output in zip(X, y):
print (input, output)
In [0]:
# reshape from [samples, timesteps] into [samples, timesteps, features]
n_features = 1
X = X.reshape((X.shape[0], X.shape[1], n_features))
# define model
model = Sequential()
model.add(GRU(100, activation='relu', input_shape=(n_steps_in, n_features)))
# model.add(GRU(100, activation='relu', return_sequences=True, input_shape=(n_steps_in, n_features)))
# model.add(GRU(100, activation='relu'))
model.add(Dense(n_steps_out))
model.compile(optimizer='adam', loss='mse')
In [0]:
# fit model
%time history = model.fit(X, y, epochs=500, verbose=0)
import matplotlib.pyplot as plt
plt.yscale('log')
plt.plot(history.history['loss'])
Out[0]:
In [0]:
X_sample = np.array([70, 80, 90]).reshape((1, n_steps_in, n_features))
y_pred = model.predict(X_sample)
print(y_pred)
In [0]:
X_sample = np.array([10, 20, 30]).reshape((1, n_steps_in, n_features))
y_pred = model.predict(X_sample)
print(y_pred)