Automated ML for time series predicion

We used one of the dataset in Numenta Anomaly Benchmark (NAB) for demo, i.e. NYC taxi passengers dataset, which contains 10320 records, each indicating the total number of taxi passengers in NYC at a corresonponding time spot.


In [1]:
%load_ext autoreload
%autoreload 2

0. Helper function definations


In [2]:
# plot the predicted values and actual values (for the test data)
def plot_result(test_df, pred_df, dt_col="timestamp", value_col="value", past_seq_len=1):
    # target column of dataframe is "value"
    # default past sequence length is 1
    pred_value = pred_df[value_col].values
    true_value = test_df[value_col].values[past_seq_len:]
    fig, axs = plt.subplots(figsize=(12, 5))

    axs.plot(pred_df[dt_col], pred_value, color='red', label='predicted values')
    axs.plot(test_df[dt_col][past_seq_len:], true_value, color='blue', label='actual values')
    axs.set_title('the predicted values and actual values (for the test data)')

    plt.xlabel(dt_col)
    plt.xticks(rotation=45)
    plt.ylabel('number of taxi passengers')
    plt.legend(loc='upper left')
    plt.show()

In [3]:
# plot results of multi step forecasting
# plot at most five values for better view
# plot the predicted values and actual values (for the test data)
def plot_less_five_step_result(test_df, pred_df, dt_col="timestamp", value_col="value", past_seq_len=1):
    fig, axs = plt.subplots(figsize=(12, 5))
    target_value = test_df[value_col].values[past_seq_len:]
    axs.plot(test_df[dt_col][past_seq_len:], target_value, color='blue', label='actual values')

    value_cols=["{}_{}".format(value_col, i) for i in range(min(pred_df.shape[1] - 1, 5))]
    time_delta = pred_df[dt_col][1] - pred_df[dt_col][0]
    plot_color = ["g", "r", "c", "m", "y"]
    for i in range(len(value_cols)):
        pred_value = pred_df[value_cols[i]].values
        pred_dt = pred_df[dt_col].values + time_delta * i
        axs.plot(pred_dt, pred_value, color=plot_color[i], label='predicted values' + str(i))

    axs.set_title('the predicted values and actual values (for the test data)')

    plt.xlabel(dt_col)
    plt.xticks(rotation=45)
    plt.ylabel('number of taxi passengers')
    plt.legend(loc='upper left')
    plt.show()

In [4]:
# plot results of multi step forecasting
# plot result of multi step forecasting
# plot the predicted values and actual values (for the test data)
def plot_first_last_step_result(test_df, pred_df, dt_col="timestamp", value_col="value", past_seq_len=1):
    fig, axs = plt.subplots(figsize=(12, 5))
    target_value = test_df[value_col].values[past_seq_len:]
    axs.plot(test_df[dt_col][past_seq_len:], target_value, color='blue', label='actual values')

    value_cols=["{}_{}".format(value_col, i) for i in range(pred_df.shape[1] - 1)]
    time_delta = pred_df[dt_col][1] - pred_df[dt_col][0]
  
    pred_value_first = pred_df[value_cols[0]].values
    pred_dt_first = pred_df[dt_col].values
    axs.plot(pred_dt_first, pred_value_first, color="g", label='first predicted values')
 
    pred_value_last = pred_df[value_cols[-1]].values
    pred_dt_last = pred_df[dt_col].values + time_delta * (len(value_cols)-1)
    axs.plot(pred_dt_last, pred_value_last, color="r", label='last predicted values')

    axs.set_title('the predicted values and actual values (for the test data)')

    plt.xlabel(dt_col)
    plt.xticks(rotation=45)
    plt.ylabel('number of taxi passengers')
    plt.legend(loc='upper left')
    plt.show()

1. load data


In [5]:
import os
import pandas as pd
import numpy as np

import matplotlib
matplotlib.use('Agg')
%pylab inline
import matplotlib.dates as md
from matplotlib import pyplot as plt


Populating the interactive namespace from numpy and matplotlib

Now we download the dataset and load it into a pandas dataframe. Run the script $ANALYTICS_ZOO_HOME/dist/bin/data/NAB/nyc_taxi/get_nyc_taxi.sh to download the raw data


In [6]:
# load nyc taxi data
try:
    dataset_path = os.getenv("ANALYTICS_ZOO_HOME")+"/bin/data/NAB/nyc_taxi/nyc_taxi.csv"
    raw_df = pd.read_csv(dataset_path)
except Exception as e:
    print("nyc_taxi.csv doesn't exist")
    print("you can run $ANALYTICS_ZOO_HOME/bin/data/NAB/nyc_taxi/get_nyc_taxi.sh to download nyc_taxi.csv")

Below are some example records of the data


In [7]:
raw_df.head(5)


Out[7]:
timestamp value
0 2014-07-01 00:00:00 10844
1 2014-07-01 00:30:00 8127
2 2014-07-01 01:00:00 6210
3 2014-07-01 01:30:00 4656
4 2014-07-01 02:00:00 3820

Convert string timestamp to TimeStamp


In [8]:
df = pd.DataFrame(pd.to_datetime(raw_df.timestamp))
df["value"] = raw_df["value"]
df.head()


Out[8]:
timestamp value
0 2014-07-01 00:00:00 10844
1 2014-07-01 00:30:00 8127
2 2014-07-01 01:00:00 6210
3 2014-07-01 01:30:00 4656
4 2014-07-01 02:00:00 3820

You can use train_val_test_split to split the whole dataset into train/val/test sets. There will be two columns in the output dataframe: "timestamp" and "value", where the data type of "timestamp" column is datetime64.


In [9]:
from zoo.automl.common.util import train_val_test_split
train_df, val_df, test_df = train_val_test_split(df, val_ratio=0.1, test_ratio=0.1)

In [10]:
train_df.describe()


Out[10]:
value
count 8256.000000
mean 15421.585514
std 6871.989592
min 1431.000000
25% 11040.000000
50% 17014.500000
75% 20032.250000
max 39197.000000

In [11]:
train_df.head()


Out[11]:
timestamp value
0 2014-07-01 00:00:00 10844
1 2014-07-01 00:30:00 8127
2 2014-07-01 01:00:00 6210
3 2014-07-01 01:30:00 4656
4 2014-07-01 02:00:00 3820

In [12]:
# shape of the dataframe
print("The shape of train_df is", train_df.shape)
print("The shape of val_df is", val_df.shape)
print("The shape of test_df is", test_df.shape)


The shape of train_df is (8256, 2)
The shape of val_df is (1032, 2)
The shape of test_df is (1032, 2)

In [13]:
# visualisation of anomaly throughout time in train_df
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()

fig, ax = plt.subplots(figsize=(12, 5))
# pd.plotting.deregister_matplotlib_converters()

ax.plot(train_df['timestamp'], train_df['value'], color='blue', linewidth=0.6)
ax.set_title('NYC taxi passengers throughout time')

plt.xlabel('timestamp')
plt.xticks(rotation=45) 
plt.ylabel('The Number of NYC taxi passengers')
plt.legend(loc='upper left')
plt.show()


No handles with labels found to put in legend.

2. Train and validation

You can use analytices zoo automl to predict time series data by simply define a TimeSequencePredictor.

We use feature tools to generate features from the given datetime. The generated features are ['HOUR', 'DAY', 'MONTH'. 'IS_AWAKE', 'IS_BUSY_HOURS']. Our feature space comprises these generated features as well as the original inputs such as ['datetime','value','extra_features'].

Currently, We use RNN to learn from 50 previous values, and predict just the 1 next value. You can specify the sequence length to predict while creating TimeSequencePredictor with arg: future_seq_len.


In [14]:
# build time sequence predictor
from zoo.automl.regression.time_sequence_predictor import *

# you need to specify the name of datetime column and target column
# The default names are "timestamp" and "value" respectively.
tsp = TimeSequencePredictor(dt_col="timestamp",
                            target_col="value",
                            extra_features_col=None)

In [15]:
from zoo import init_spark_on_local
from zoo.ray import RayContext
sc = init_spark_on_local(cores=4)
ray_ctx = RayContext(sc=sc, object_store_memory="1g")
ray_ctx.init()


Current pyspark location is : /Users/liuruolan/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/__init__.py
Start to getOrCreate SparkContext
2020-06-11 14:17:58,562	INFO resource_spec.py:212 -- Starting Ray with 2.1 GiB memory available for workers and up to 0.93 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
Successfully got a SparkContext
2020-06-11 14:17:59,021	INFO services.py:1148 -- View the Ray dashboard at localhost:8265
Out[15]:
{'node_ip_address': '192.168.101.6',
 'redis_address': '192.168.101.6:44879',
 'object_store_address': '/tmp/ray/session_2020-06-11_14-17-58_547548_12576/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2020-06-11_14-17-58_547548_12576/sockets/raylet',
 'webui_url': 'localhost:8265',
 'session_dir': '/tmp/ray/session_2020-06-11_14-17-58_547548_12576'}

In [16]:
%%time
# fit train_df and validate with val_df, return the best trial as pipeline.
# the default recipe is SmokeRecipe,which runs one epoch and one iteration with only 1 random sample.
# you can change recipe by define `recipe` in `fit`. The recipes you can choose are SmokeRecipe, RandomRecipe, LSTMGridRandomRecipe, GridRandomRecipe and BayesRecipe.
look_back_single = 5
pipeline = tsp.fit(train_df,
                   validation_df=val_df,
                   metric="mse",
                   recipe=LSTMGridRandomRecipe(
                              num_rand_samples=1,
                              epochs=2,
                              look_back=look_back_single, 
                              batch_size=[64]))
print("Training completed.")


== Status ==
Memory usage on this node: 5.2/8.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/4 CPUs, 0/0 GPUs, 0.0/2.1 GiB heap, 0.0/0.63 GiB objects
Result logdir: /Users/liuruolan/ray_results/automl
Number of trials: 3 (3 TERMINATED)
Trial name status loc batch_size dropout_2 lr lstm_1_units lstm_2_unitsselected_features iter total time (s)
train_func_00000TERMINATED 64 0.4855350.00733797 64 16["IS_AWAKE(timestamp)", "IS_BUSY_HOURS(timestamp)", "MONTH(timestamp)", "DAY(timestamp)", "IS_WEEKEND(timestamp)", "WEEKDAY(timestamp)"] 10 43.5348
train_func_00001TERMINATED 64 0.4316020.00337888 16 32["IS_BUSY_HOURS(timestamp)", "IS_AWAKE(timestamp)", "WEEKDAY(timestamp)", "HOUR(timestamp)"] 10 41.3426
train_func_00002TERMINATED 64 0.37464 0.00753538 128 64["IS_BUSY_HOURS(timestamp)", "HOUR(timestamp)", "WEEKDAY(timestamp)", "MONTH(timestamp)"] 10 40.7521


best log dir is  /Users/liuruolan/ray_results/automl/train_func_1_batch_size=64,dropout_2=0.4316,lr=0.0033789,lstm_1_units=16,lstm_2_units=32,selected_features=["IS_BUSY_HOURS(timesta_2020-06-11_14-21-05_fecmp7f
The best configurations are:
selected_features : ["IS_BUSY_HOURS(timestamp)", "IS_AWAKE(timestamp)", "WEEKDAY(timestamp)", "HOUR(timestamp)"]
model : LSTM
lstm_1_units : 16
dropout_1 : 0.2
lstm_2_units : 32
dropout_2 : 0.43160224795875773
lr : 0.003378878016568929
batch_size : 64
epochs : 2
past_seq_len : 5
WARNING:tensorflow:From /anaconda3/envs/automl/lib/python3.6/site-packages/tensorflow_core/python/ops/init_ops.py:97: calling GlorotUniform.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
WARNING:tensorflow:From /anaconda3/envs/automl/lib/python3.6/site-packages/tensorflow_core/python/ops/init_ops.py:97: calling Orthogonal.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
WARNING:tensorflow:From /anaconda3/envs/automl/lib/python3.6/site-packages/tensorflow_core/python/ops/init_ops.py:97: calling Zeros.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
WARNING:tensorflow:From /anaconda3/envs/automl/lib/python3.6/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
WARNING:tensorflow:OMP_NUM_THREADS is no longer used by the default Keras config. To configure the number of threads, use tf.config.threading APIs.
WARNING:tensorflow:From /anaconda3/envs/automl/lib/python3.6/site-packages/tensorflow_core/python/ops/math_grad.py:1424: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
Training completed.
CPU times: user 4.85 s, sys: 1.24 s, total: 6.09 s
Wall time: 1min 38s

3. Test


In [17]:
# predict test_df with the best trial
pred_df = pipeline.predict(test_df)

In [18]:
pred_df.head(5)


Out[18]:
timestamp value
0 2015-01-10 14:30:00 20910.943359
1 2015-01-10 15:00:00 22121.316406
2 2015-01-10 15:30:00 22864.675781
3 2015-01-10 16:00:00 23598.687500
4 2015-01-10 16:30:00 21863.769531

In [19]:
# prediction value start from look_back_single
test_df[look_back_single:look_back_single+5]


Out[19]:
timestamp value
5 2015-01-10 14:30:00 22371
6 2015-01-10 15:00:00 23119
7 2015-01-10 15:30:00 23941
8 2015-01-10 16:00:00 22728
9 2015-01-10 16:30:00 20126

In [21]:
# plot the predicted values and actual values
plot_result(test_df, pred_df,past_seq_len=look_back_single)



In [22]:
# evaluate test_df
mse, smape = pipeline.evaluate(test_df, metrics=["mse", "smape"])
print("Evaluate: the mean square error is", mse)
print("Evaluate: the smape value is", smape)


Evaluate: the mean square error is 1630779.4204776406
Evaluate: the smape value is 7.461118313058935

4. save and restore

We provide save and restore interface to save the pipeline with the best trial for easily rebuilding.


In [23]:
# save the pipeline with best trial
pipeline.save("/tmp/saved_pipeline/my.ppl")


Pipeline is saved in /tmp/saved_pipeline/my.ppl
Out[23]:
'/tmp/saved_pipeline/my.ppl'

In [24]:
from zoo.automl.pipeline.time_sequence import load_ts_pipeline
new_pipeline = load_ts_pipeline("/tmp/saved_pipeline/my.ppl")


Restore pipeline from /tmp/saved_pipeline/my.ppl

In [25]:
# you can do predict and evaluate again
# we use test_df as input in order to compare results before and after restoration 
new_pred = new_pipeline.predict(test_df)

In [26]:
new_pred.head(5)


Out[26]:
timestamp value
0 2015-01-10 14:30:00 20910.943359
1 2015-01-10 15:00:00 22121.316406
2 2015-01-10 15:30:00 22864.675781
3 2015-01-10 16:00:00 23598.687500
4 2015-01-10 16:30:00 21863.769531

In [27]:
# evaluate test_df
mse, smape = new_pipeline.evaluate(test_df, metrics=["mse", "smape"])
print("Evaluate: the mean square error is", mse)
print("Evaluate: the smape value is", smape)


Evaluate: the mean square error is 1630779.4204776406
Evaluate: the smape value is 7.461118313058935

5. continue training

We support continue training with incremental data using the best configuration searched and the trained model.


In [28]:
# review the initialization infomation if needed
new_pipeline.describe()


**** Initialization info ****
future_seq_len: 1
dt_col: timestamp
target_col: value
extra_features_col: None
drop_missing: True


In [29]:
# Use val_df as incremental data
new_pipeline.fit(val_df,epoch_num=5)


Train on 1027 samples
Epoch 1/5
1027/1027 [==============================] - 2s 2ms/sample - loss: 0.0518 - mean_squared_error: 0.0518
Epoch 2/5
1027/1027 [==============================] - 0s 217us/sample - loss: 0.0545 - mean_squared_error: 0.0545
Epoch 3/5
1027/1027 [==============================] - 0s 205us/sample - loss: 0.0568 - mean_squared_error: 0.0568
Epoch 4/5
1027/1027 [==============================] - 0s 216us/sample - loss: 0.0523 - mean_squared_error: 0.0523
Epoch 5/5
1027/1027 [==============================] - 0s 229us/sample - loss: 0.0546 - mean_squared_error: 0.0546
Fit done!

In [30]:
# predict results of test_df
new_pred_df = new_pipeline.predict(test_df)
plot_result(test_df, new_pred_df,past_seq_len = look_back_single)



In [31]:
# evaluate test_df
mse, smape = new_pipeline.evaluate(test_df, metrics=["mse", "smape"])
print("Evaluate: the mean square error is", mse)
print("Evaluate: the smape value is", smape)


Evaluate: the mean square error is 1286511.2008888382
Evaluate: the smape value is 5.916551012985478

6. multi step forecasting

You can do multi step forecasting by simply changing the future_seq_len option while creating a new TimeSequencePredictor object.


In [79]:
# build time sequence predictor
from zoo.automl.regression.time_sequence_predictor import *

# change future_seq_len into the step you want to forcast.
tsp = TimeSequencePredictor(future_seq_len=5,
                            dt_col="timestamp",
                            target_col="value",
                            extra_features_col=None)

In [80]:
%%time
# you can specify the look back sequence length with a single number or a range of (min_len, max_len) in recipe.
look_back_multi = 10
pipeline = tsp.fit(train_df,
                   validation_df=val_df,
                   metric="mse",
                   recipe=LSTMGridRandomRecipe(
                              num_rand_samples=3,
                              epochs=2,
                              look_back=10,
                              training_iteration=look_back_multi,
                              batch_size=[64]))
print("Training completed.")


== Status ==
Memory usage on this node: 5.4/8.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/4 CPUs, 0/0 GPUs, 0.0/2.1 GiB heap, 0.0/0.63 GiB objects
Result logdir: /Users/liuruolan/ray_results/automl
Number of trials: 9 (9 TERMINATED)
Trial name status loc batch_size dropout_2 lr lstm_1_units lstm_2_unitsselected_features iter total time (s)
train_func_00000TERMINATED 64 0.43932 0.00710839 128 16["HOUR(timestamp)", "WEEKDAY(timestamp)", "IS_BUSY_HOURS(timestamp)", "DAY(timestamp)", "IS_AWAKE(timestamp)"] 10 95.5503
train_func_00001TERMINATED 64 0.4560720.00728148 64 32["IS_WEEKEND(timestamp)", "IS_AWAKE(timestamp)", "IS_BUSY_HOURS(timestamp)"] 10 77.6193
train_func_00002TERMINATED 64 0.4549740.00968517 32 64["DAY(timestamp)", "IS_WEEKEND(timestamp)", "HOUR(timestamp)", "WEEKDAY(timestamp)", "IS_AWAKE(timestamp)"] 10 70.2947
train_func_00003TERMINATED 64 0.3369360.0089047 16 16["WEEKDAY(timestamp)", "HOUR(timestamp)", "IS_BUSY_HOURS(timestamp)"] 10 61.669
train_func_00004TERMINATED 64 0.3792530.00644577 128 32["MONTH(timestamp)", "IS_BUSY_HOURS(timestamp)", "DAY(timestamp)", "HOUR(timestamp)", "IS_AWAKE(timestamp)"] 10 100.657
train_func_00005TERMINATED 64 0.3344180.00975357 32 64["WEEKDAY(timestamp)", "MONTH(timestamp)", "DAY(timestamp)"] 10 80.5264
train_func_00006TERMINATED 64 0.2439850.00529687 32 16["WEEKDAY(timestamp)", "DAY(timestamp)", "HOUR(timestamp)", "IS_AWAKE(timestamp)", "IS_WEEKEND(timestamp)"] 10 62.412
train_func_00007TERMINATED 64 0.2268710.0012887 128 32["DAY(timestamp)", "IS_BUSY_HOURS(timestamp)", "IS_WEEKEND(timestamp)", "WEEKDAY(timestamp)", "IS_AWAKE(timestamp)"] 10 93.907
train_func_00008TERMINATED 64 0.4527250.00825144 64 64["WEEKDAY(timestamp)", "IS_BUSY_HOURS(timestamp)", "MONTH(timestamp)", "IS_AWAKE(timestamp)", "DAY(timestamp)"] 10 70.9098


best log dir is  /Users/liuruolan/ray_results/automl/train_func_7_batch_size=64,dropout_2=0.22687,lr=0.0012887,lstm_1_units=128,lstm_2_units=32,selected_features=["DAY(timestamp)", "I_2020-06-11_15-51-279965a071
The best configurations are:
selected_features : ["DAY(timestamp)", "IS_BUSY_HOURS(timestamp)", "IS_WEEKEND(timestamp)", "WEEKDAY(timestamp)", "IS_AWAKE(timestamp)"]
model : LSTM
lstm_1_units : 128
dropout_1 : 0.2
lstm_2_units : 32
dropout_2 : 0.22687111910909824
lr : 0.0012887000310535537
batch_size : 64
epochs : 2
past_seq_len : 10
Training completed.
CPU times: user 20.8 s, sys: 5.84 s, total: 26.7 s
Wall time: 7min 6s

In [89]:
# test
# predict test_df with the best trial
pred_df = pipeline.predict(test_df)

In [90]:
pred_df.head(5)


Out[90]:
timestamp value_0 value_1 value_2 value_3 value_4
0 2015-01-10 17:00:00 21104.164062 22934.585938 24529.179688 25550.427734 25832.960938
1 2015-01-10 17:30:00 23720.382812 25002.316406 25913.042969 26169.644531 25732.523438
2 2015-01-10 18:00:00 25945.230469 26386.738281 26540.681641 26222.671875 25465.779297
3 2015-01-10 18:30:00 27144.835938 26856.648438 26507.175781 26015.634766 25392.861328
4 2015-01-10 19:00:00 27349.445312 26599.843750 25986.837891 25524.503906 25157.457031

In [91]:
# plot multi step predicted values and actual values
# plot at most five step predict values for better view
plot_less_five_step_result(test_df, pred_df,past_seq_len=look_back_multi)



In [92]:
# plot only the first and the last step predict values and actual values
plot_first_last_step_result(test_df, pred_df, past_seq_len=look_back_multi)



In [93]:
# evaluate test_df
mse, smape = pipeline.evaluate(test_df, metrics=["mse", "smape"])
print("Evaluate: the mean square error is", mse)
print("Evaluate: the smape value is", smape)


Evaluate: the mean square error is [1427645.74220997 2255070.23276274 3517921.35515067 5142232.07454771
 6883082.34413457]
Evaluate: the smape value is [6.49668565 7.09795792 7.87466491 8.52209701 8.98700605]

In [94]:
ray_ctx.stop()


This instance has been stopped.

In [ ]: