Example, some code and a lot of inspiration taken from: https://machinelearningmastery.com/how-to-develop-lstm-models-for-time-series-forecasting/
In [0]:
# univariate data preparation
import numpy as np
# split a univariate sequence into samples
def split_sequence(sequence, n_steps):
X, y = list(), list()
for i in range(len(sequence)):
# find the end of this pattern
end_ix = i + n_steps
# check if we are beyond the sequence
if end_ix > len(sequence)-1:
break
# gather input and output parts of the pattern
seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
X.append(seq_x)
y.append(seq_y)
return np.array(X), np.array(y)
In [2]:
# define input sequence
raw_seq = [10, 20, 30, 40, 50, 60, 70, 80, 90]
# choose a number of time steps
n_steps = 3
# split into samples
X, y = split_sequence(raw_seq, n_steps)
# summarize the data
list(zip(X, y))
Out[2]:
In [3]:
X
Out[3]:
In [4]:
# reshape from [samples, timesteps] into [samples, timesteps, features]
n_features = 1
X = X.reshape((X.shape[0], X.shape[1], n_features))
X
Out[4]:
In [0]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Dense, LSTM, GRU, SimpleRNN, Bidirectional
from tensorflow.keras.models import Sequential, Model
model = Sequential()
model.add(SimpleRNN(units=50, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"))
model.add(Dense(units=1, name="Linear_Output"))
model.compile(optimizer='adam', loss='mse')
In [6]:
%time history = model.fit(X, y, epochs=500, verbose=0)
In [7]:
import matplotlib.pyplot as plt
plt.plot(history.history['loss'])
Out[7]:
In [8]:
# this does not look too bad
X_sample = np.array([[10, 20, 30], [70, 80, 90]])
X_sample = X_sample.reshape((X_sample.shape[0], X_sample.shape[1], n_features))
X_sample
Out[8]:
In [9]:
y_pred = model.predict(X_sample)
y_pred
Out[9]:
In [0]:
def predict(model, samples, n_features=1):
input = np.array(samples)
input = input.reshape((input.shape[0], input.shape[1], n_features))
y_pred = model.predict(input)
return y_pred
In [11]:
# do not look too close, though
predict(model, [[100, 110, 120], [200, 210, 220], [200, 300, 400]])
Out[11]:
In [12]:
# https://keras.io/layers/recurrent/
# input: (samples, timesteps, input_dim)
# output: (samples, units)
# let's have a look at the actual output for an example
rnn_layer = model.get_layer("RNN_Input")
model_stub = Model(inputs = model.input, outputs = rnn_layer.output)
hidden = predict(model_stub, [[10, 20, 30]])
hidden
Out[12]:
http://colah.github.io/posts/2015-08-Understanding-LSTMs/
http://colah.github.io/posts/2015-08-Understanding-LSTMs/
From Deep Learning with Python, Chapter 6, François Chollet, Manning: https://livebook.manning.com/#!/book/deep-learning-with-python/chapter-6/129
Sigmoid compressing between 0 and 1
Hyperbolic tangent, like sigmoind, but compressing between -1 and 1, thus allowing for negative values as well
In [13]:
# https://arxiv.org/ftp/arxiv/papers/1701/1701.05923.pdf
# n = output dimension
# m = input dimension
# Total number of parameters for
# Simple RNN = n**2 + nm + n
# GRU = 3 × (n**2 + nm + n)
# LSTM = 4 × (n**2 + nm + n)
rnn_units = 1
model = Sequential()
model.add(SimpleRNN(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"))
# model.add(GRU(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"))
model.summary()
In [14]:
output_dimension = rnn_units
input_dimension = n_features
parameters = 1 * (output_dimension ** 2 + output_dimension * input_dimension + output_dimension)
parameters
Out[14]:
In [15]:
# from only a single output for the final timestep
# ideal for feeding into something that *does not* handle timesteps
rnn_units = 1
model = Sequential([
SimpleRNN(units=rnn_units, activation='relu', input_shape=(n_steps, n_features))
])
predict(model, [[10, 20, 30]])
Out[15]:
In [16]:
# to one output for each timestep
# ideal for feeding into something that *expects* timesteps
rnn_units = 1
model = Sequential([
SimpleRNN(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), return_sequences=True)
])
# https://keras.io/layers/recurrent/
# input: (samples, timesteps, input_dim)
# output with return_sequences: (samples, timesteps, units)
predict(model, [[10, 20, 30]])
Out[16]:
In [17]:
rnn_units = 50
model = Sequential([
SimpleRNN(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), return_sequences=True, name="RNN_Input"),
SimpleRNN(units=rnn_units, activation='relu', name="RNN_Latent"),
Dense(units=1, name="Linear_Output")
])
model.compile(optimizer='adam', loss='mse')
model.summary()
In [18]:
%time history = model.fit(X, y, epochs=500, verbose=0)
plt.plot(history.history['loss'])
Out[18]:
In [19]:
predict(model, [[10, 20, 30], [70, 80, 90], [100, 110, 120], [200, 210, 220], [200, 300, 400]])
Out[19]:
In [0]:
rnn_units = 50
model = Sequential([
Bidirectional(SimpleRNN(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input")),
Dense(units=1, name="Linear_Output")
])
model.compile(optimizer='adam', loss='mse')
In [21]:
%time history = model.fit(X, y, epochs=500, verbose=0)
plt.plot(history.history['loss'])
Out[21]:
In [22]:
predict(model, [[10, 20, 30], [70, 80, 90], [100, 110, 120], [200, 210, 220], [200, 300, 400]])
Out[22]:
In [23]:
rnn_units = 50
model = Sequential([
LSTM(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"),
Dense(units=1, name="Linear_Output")
])
model.compile(optimizer='adam', loss='mse')
model.summary()
In [24]:
output_dimension = rnn_units
input_dimension = n_features
parameters = 4 * (output_dimension ** 2 + output_dimension * input_dimension + output_dimension)
parameters
Out[24]:
In [25]:
%time history = model.fit(X, y, epochs=500, verbose=0)
plt.plot(history.history['loss'])
Out[25]:
In [26]:
predict(model, [[10, 20, 30], [70, 80, 90], [100, 110, 120], [200, 210, 220], [200, 300, 400]])
Out[26]:
In [27]:
rnn_units = 50
model = Sequential([
GRU(units=rnn_units, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"),
Dense(units=1, name="Linear_Output")
])
model.compile(optimizer='adam', loss='mse')
model.summary()
In [28]:
output_dimension = rnn_units
input_dimension = n_features
parameters = 3 * (output_dimension ** 2 + output_dimension * input_dimension + output_dimension)
parameters
Out[28]:
In [29]:
%time history = model.fit(X, y, epochs=500, verbose=0)
plt.plot(history.history['loss'])
Out[29]:
In [30]:
predict(model, [[10, 20, 30], [70, 80, 90], [100, 110, 120], [200, 210, 220], [200, 300, 400]])
Out[30]:
In [31]:
in_seq1 = [10, 20, 30, 40, 50, 60, 70, 80, 90]
in_seq2 = [15, 25, 35, 45, 55, 65, 75, 85, 95]
out_seq = [in1 + in2 for in1, in2 in zip(in_seq1, in_seq2)]
out_seq
Out[31]:
In [32]:
# convert to [rows, columns] structure
in_seq1 = np.array(in_seq1).reshape((len(in_seq1), 1))
in_seq2 = np.array(in_seq2).reshape((len(in_seq2), 1))
out_seq = np.array(out_seq).reshape((len(out_seq), 1))
out_seq
Out[32]:
In [33]:
# horizontally stack columns
dataset = np.hstack((in_seq1, in_seq2, out_seq))
dataset
Out[33]:
In [0]:
# split a multivariate sequence into samples
def split_sequences(sequences, n_steps):
X, y = list(), list()
for i in range(len(sequences)):
# find the end of this pattern
end_ix = i + n_steps
# check if we are beyond the dataset
if end_ix > len(sequences):
break
# gather input and output parts of the pattern
seq_x, seq_y = sequences[i:end_ix, :-1], sequences[end_ix-1, -1]
X.append(seq_x)
y.append(seq_y)
return np.array(X), np.array(y)
In [35]:
# choose a number of time steps
n_steps = 3
# convert into input/output
X, y = split_sequences(dataset, n_steps)
# summarize the data
list(zip(X, y))
Out[35]:
In [0]:
# the dataset knows the number of features, e.g. 2
n_features = X.shape[2]
# define model
model = Sequential()
model.add(GRU(units=50, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"))
model.add(Dense(units=1, name="Linear_Output"))
model.compile(optimizer='adam', loss='mse')
In [37]:
# fit model
%time history = model.fit(X, y, epochs=500, verbose=0)
import matplotlib.pyplot as plt
plt.yscale('log')
plt.plot(history.history['loss'])
Out[37]:
In [0]:
def predict_multi(model, samples):
input = np.array(samples)
input = input.reshape(1, input.shape[0], input.shape[1])
y_pred = model.predict(input)
return y_pred
In [39]:
predict_multi(model, [[80, 85], [90, 95], [100, 105]])
Out[39]:
In [40]:
predict_multi(model, [[10, 15], [20, 25], [30, 35]])
Out[40]:
In [41]:
predict_multi(model, [[180, 185], [190, 195], [200, 205]])
Out[41]:
In [42]:
y += 20
list(zip(X, y))
Out[42]:
In [43]:
model = Sequential()
model.add(GRU(units=50, activation='relu', input_shape=(n_steps, n_features), name="RNN_Input"))
model.add(Dense(units=1, name="Linear_Output"))
model.compile(optimizer='adam', loss='mse')
# train a little bit longer, as this should be harder now
%time history = model.fit(X, y, epochs=2000, verbose=0)
import matplotlib.pyplot as plt
plt.yscale('log')
plt.plot(history.history['loss'])
Out[43]:
In [44]:
predict_multi(model, [[80, 85], [90, 95], [100, 105]])
Out[44]:
In [45]:
predict_multi(model, [[10, 15], [20, 25], [30, 35]])
Out[45]:
In [46]:
predict_multi(model, [[180, 185], [190, 195], [200, 205]])
Out[46]:
In [47]:
# split a univariate sequence into samples
def split_sequence(sequence, n_steps_in, n_steps_out):
X, y = list(), list()
for i in range(len(sequence)):
# find the end of this pattern
end_ix = i + n_steps_in
out_end_ix = end_ix + n_steps_out
# check if we are beyond the sequence
if out_end_ix > len(sequence):
break
# gather input and output parts of the pattern
seq_x, seq_y = sequence[i:end_ix], sequence[end_ix:out_end_ix]
X.append(seq_x)
y.append(seq_y)
return np.array(X), np.array(y)
# define input sequence
raw_seq = [10, 20, 30, 40, 50, 60, 70, 80, 90]
# choose a number of time steps
n_steps_in, n_steps_out = 3, 2
# split into samples
X, y = split_sequence(raw_seq, n_steps_in, n_steps_out)
# summarize the data
for input, output in zip(X, y):
print (input, output)
In [0]:
# reshape from [samples, timesteps] into [samples, timesteps, features]
n_features = 1
X = X.reshape((X.shape[0], X.shape[1], n_features))
# define model
model = Sequential()
model.add(GRU(100, activation='relu', input_shape=(n_steps_in, n_features)))
# model.add(GRU(100, activation='relu', return_sequences=True, input_shape=(n_steps_in, n_features)))
# model.add(GRU(100, activation='relu'))
model.add(Dense(n_steps_out))
model.compile(optimizer='adam', loss='mse')
In [49]:
# fit model
%time history = model.fit(X, y, epochs=500, verbose=0)
import matplotlib.pyplot as plt
plt.yscale('log')
plt.plot(history.history['loss'])
Out[49]:
In [50]:
X_sample = np.array([70, 80, 90]).reshape((1, n_steps_in, n_features))
y_pred = model.predict(X_sample)
print(y_pred)
In [51]:
X_sample = np.array([10, 20, 30]).reshape((1, n_steps_in, n_features))
y_pred = model.predict(X_sample)
print(y_pred)
In [0]: