In [ ]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Overview

In this notebook we'll use a liquor sales dataset from Kaggle to learn how to submit a job to AI Platform Training. In the job we'll train our TensorFlow 2 model and export the saved model to Cloud Storage.

Dataset

The Iowa Liquor Sales dataset from BigQuery Public Datasets is used in this example. The dataset contains wholesale liquor purchases in the state of Iowa from 2012 to the present.

Set up your GCP project

The following steps are required, regardless of your notebook environment.

  1. Select or create a GCP project.. When you first create an account, you get a $300 free credit towards your compute/storage costs.

  2. Make sure that billing is enabled for your project.

  3. Enable the AI Platform APIs and Compute Engine APIs.

Install packages and dependencies

Import libraries and define constants


In [ ]:
import datetime
import json
import os
import time

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf

from google.cloud import storage
from pandas.plotting import register_matplotlib_converters
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler

In [ ]:
# Check the TensorFlow version installed
tf.__version__

In [ ]:
# Enter your project, region, and bucket. Then run the  cell to make sure the
# Cloud SDK uses the right project for all the commands in this notebook.

PROJECT = "your-project-name" # REPLACE WITH YOUR PROJECT NAME
BUCKET = "your-bucket-id" # REPLACE WITH YOUR BUCKET ID
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1
BUCKET_URI = "gs://" + BUCKET

assert PROJECT != 'your-project-name', 'Don''t forget to change the project variables!'

%env BUCKET_URI=$BUCKET_URI
%env REGION=$REGION

In [ ]:
target_col = 'y' # What we are predicting
ts_col = 'ds' # Time series column
if os.path.exists('iowa_daily.csv'):
    input_file = 'iowa_daily.csv' # File created in previous lab
else:
    input_file = 'data/iowa_daily.csv'

n_features = 2 # Two features: y (previous values) and whether the date is a holiday
n_input_steps = 30 # Lookback window
n_output_steps = 1 # How many steps to predict forward

train_split = 0.75 # % Split between train/test data
epochs = 1000 # How many passes through the data (early-stopping will cause training to stop before this)
patience = 5 # Terminate training after the validation loss does not decrease after this many epochs

lstm_units = 64

model_name = 'retail_forecasting'
framework='TENSORFLOW'
runtime_version = '2.1'
python_version = '3.7'
predictions_file = 'predictions.json'
input_layer_name = 'lstm_input'

%env MODEL_NAME = $model_name
%env FRAMEWORK = $framework
%env RUNTIME_VERSION = $runtime_version
%env PYTHON_VERSION = $python_version
%env PREDICTIONS_FILE = $predictions_file

Create a Cloud Storage bucket

The following steps are required, regardless of your notebook environment.

When you submit a training job using the Cloud SDK, you upload a Python package containing your training code to a Cloud Storage bucket. AI Platform runs the code from this package. In this tutorial, AI Platform also saves the trained model that results from your job in the same bucket. You can then create an AI Platform model version based on this output in order to serve online predictions.


In [ ]:
storage_client = storage.Client()
try:
    bucket = storage_client.get_bucket(BUCKET)
    print('Bucket exists, let''s not recreate it.')
except:
    bucket = storage_client.create_bucket(BUCKET)
    print('Created bucket: ' + BUCKET)

Download and preview the data

We've done some pre-processing on the original dataset and made the pre-processed one available on Cloud Storage.


In [ ]:
df = pd.read_csv(input_file, index_col=ts_col, parse_dates=True)

In [ ]:
# Plot one 30-day window of sales 
df[target_col][:30].plot()

Process data and remove outliers


In [ ]:
# Split data
size = int(len(df) * train_split)
df_train, df_test = df[0:size].copy(deep=True), df[size:len(df)].copy(deep=True)

df_train.head()

In [ ]:
df_train.plot()

Scale sales values


In [ ]:
# Review original values

df_train.head()

In [ ]:
# For neural networks to converge quicker, it is helpful to scale the values.
# For example, each feature might be transformed to have a mean of 0 and std. dev. of 1.
#
# We are working with a mix of features, input timesteps, output horizon, etc.
# which don't work out-of-the-box with common scaling utilities.
# So, here are a couple wrappers to handle scaling and inverting the scaling.

feature_scaler = StandardScaler()
target_scaler = StandardScaler()

def scale(df, 
          fit=True, 
          target_col=target_col,
          feature_scaler=feature_scaler,
          target_scaler=target_scaler):
    """
    Scale the input features, using a separate scaler for the target.
    
    Parameters: 
    df (pd.DataFrame): Input dataframe
    fit (bool): Whether to fit the scaler to the data (only apply to training data)
    target_col (pd.Series): The column that is being predicted
    feature_scaler (StandardScaler): Scaler used for features
    target_scaler (StandardScaler): Scaler used for target
      
    Returns: 
    df_scaled (pd.DataFrame): Scaled dataframe   
    """    
    target = df[target_col].values.reshape(-1, 1)
    if fit:
        target_scaler.fit(target)
    data_scaled = target_scaler.transform(target)
    
    features = df.loc[:, df.columns != target_col].values
    if features.shape[1]:
        if fit:
            feature_scaler.fit(features)
        features_scaled = feature_scaler.transform(features)
        data_scaled = np.concatenate([data_scaled, features_scaled], axis=1)

    df_scaled = pd.DataFrame(data_scaled, columns=df.columns)
    
    return df_scaled

def inverse_scale(data, target_scaler=target_scaler):
    """
    Transform the scaled values of the target back into their original form.
    The features are left alone, as we're assuming that the output of the model only includes the target.
    
    Parameters: 
    data (np.array): Input array
    target_scaler (StandardScaler): Scaler used for target
      
    Returns: 
    data_scaled (np.array): Scaled array   
    """    
    
    df = pd.DataFrame()
    data_scaled = np.empty([data.shape[1], data.shape[0]])
    for i in range(data.shape[1]):
        data_scaled[i] = target_scaler.inverse_transform(data[:,i])
    return data_scaled.transpose()

df_train_scaled=scale(df_train)
df_test_scaled=scale(df_test, False)

In [ ]:
# Review scaled values

df_train_scaled.head()

Create sequences of time series data


In [ ]:
def reframe(data, n_input_steps = n_input_steps, n_output_steps = n_output_steps):

    # Iterate through data and create sequences of features and outputs
    df = pd.DataFrame(data)
    cols=list()
    for i in range(n_input_steps, 0, -1):
        cols.append(df.shift(i))
    for i in range(0, n_output_steps):
        cols.append(df.shift(-i))
        
    # Concatenate values and remove any missing values
    df = pd.concat(cols, axis=1)
    df.dropna(inplace=True)
    
    # Split the data into feature and target variables
    n_feature_cols = n_input_steps * n_features
    features = df.iloc[:,0:n_feature_cols]
    target_cols = [i for i in range(n_feature_cols, n_feature_cols + n_output_steps * n_features, n_features)]
    targets = df.iloc[:,target_cols]

    return (features, targets)

X_train_reframed, y_train_reframed = reframe(df_train_scaled)
X_test_reframed, y_test_reframed = reframe(df_test_scaled)

Verify that X + y values are aligned

df_train_scaled.iloc[n_input_steps + index]['y'] should be equal to y_train_reframed[index], and the same for the test dataset.

In other words, the ground truth labels should contain the sales for the next day after the training window (or whatever future projection you're making).


In [ ]:
for i in range(3):
    print(df_train_scaled.iloc[n_input_steps + i][target_col])
    print(y_train_reframed.iloc[i].values[0])
    print()

In [ ]:
# Confirm the same is true for the test set

for i in range(3):
    print(df_test_scaled.iloc[n_input_steps + i][target_col])
    print(y_test_reframed.iloc[i].values[0])
    print()

Build a model and submit your training job to AI Platform

The model we're building here trains pretty fast so we could train it in this notebook, but for more computationally expensive models, it's useful to train them in the Cloud. To show you how AI Platform Training works, we'll package up our training code and submit a training job to the AI Platform Prediction service.

In our training script, we'll also export our trained SavedModel to a Cloud Storage bucket.


In [ ]:
# Reshape test data to match model inputs and outputs

X_train = X_train_reframed.values.reshape(X_train_reframed.shape[0], n_input_steps, n_features)
X_test = X_test_reframed.values.reshape(X_test_reframed.shape[0], n_input_steps, n_features)
y_train = y_train_reframed.values.reshape(y_train_reframed.shape[0], n_output_steps)
y_test = y_test_reframed.values.reshape(y_test_reframed.shape[0], n_output_steps)

In [ ]:
trainer_dir = 'trainer'
export_dir = 'tf_export'

%env trainer_dir = trainer_dir
%env export_dir = export_dir

In [ ]:
!mkdir $trainer_dir
!touch $trainer_dir/__init__.py

In [ ]:
# Copy numpy arrays to npy files

np.save(trainer_dir + '/x_train.npy', X_train)
np.save(trainer_dir + '/x_test.npy', X_test)
np.save(trainer_dir + '/y_train.npy', y_train)
np.save(trainer_dir + '/y_test.npy', y_test)

In [ ]:
# Write training code out to a file that will be submitted to the training job
# Note: f-strings are supported in Python 3.6 and above

model_template = f"""import argparse
import numpy as np
import os
import tempfile

from google.cloud import storage
from tensorflow import keras
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.callbacks import EarlyStopping

n_features = {n_features} # Two features: y (previous values) and whether the date is a holiday
n_input_steps = {n_input_steps} # Lookback window
n_output_steps = {n_output_steps} # How many steps to predict forward

epochs = {epochs} # How many passes through the data (early-stopping will cause training to stop before this)
patience = {patience} # Terminate training after the validation loss does not decrease after this many epochs

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--job-dir',
        default=None,
        help='URL to store the job output')
    args = parser.parse_args()
    print(args)
    return args

def main():
    args = get_args()
    print('args: ', args)
    model_dir = args.job_dir
    
    storage_client = storage.Client()
    bucket_name = model_dir.split('/')[2]
    
    bucket = storage_client.get_bucket(bucket_name)

    # Get the training data and convert back to np arrays
    local_data_dir = os.path.join(os.getcwd(), tempfile.gettempdir())
    
    data_files = ['x_train.npy', 'y_train.npy', 'x_test.npy', 'y_test.npy']
 
    for i in data_files:
        blob = storage.Blob('{trainer_dir}/' + i, bucket)
        destination_file = local_data_dir + '/' + i
        open(destination_file, 'a').close()
        blob.download_to_filename(destination_file)

    X_train = np.load(local_data_dir + '/x_train.npy')
    y_train = np.load(local_data_dir + '/y_train.npy')
    X_test = np.load(local_data_dir + '/x_test.npy')
    y_test = np.load(local_data_dir + '/y_test.npy')
    
    # Build and train the model
    model = Sequential([
        LSTM({lstm_units}, input_shape=[n_input_steps, n_features], recurrent_activation=None),
        Dense(n_output_steps)])

    model.compile(optimizer='adam', loss='mae')

    early_stopping = EarlyStopping(monitor='val_loss', patience=patience)
    _ = model.fit(x=X_train, y=y_train, validation_data=(X_test, y_test), epochs=epochs, callbacks=[early_stopping])
    
    # Export the model
    export_path = os.path.join(model_dir, '{export_dir}')
    model.save(export_path)
    
if __name__ == '__main__':
    main()
"""

with open(os.path.join(trainer_dir, 'model.py'), 'w') as f:
    f.write(model_template.format(**globals()))

In [ ]:
# Copy the train data files to a GCS bucket

!gsutil -m cp -r trainer $BUCKET_URI

In [ ]:
!gsutil ls $BUCKET_URI/$trainer_dir

In [ ]:
# Re-run this if you need to create a new training job

timestamp = str(datetime.datetime.now().time())
JOB_NAME = 'caip_training_' + str(int(time.time()))
%env JOB_NAME=$JOB_NAME

In [ ]:
MODULE_NAME = trainer_dir + '.model'
TRAIN_DIR = os.getcwd() + '/' + trainer_dir
JOB_DIR = BUCKET_URI
%env TRAIN_DIR=$TRAIN_DIR
%env MODULE_NAME=$MODULE_NAME

In [ ]:
# Submit the training job

!gcloud ai-platform jobs submit training $JOB_NAME \
        --scale-tier basic \
        --package-path $TRAIN_DIR \
        --module-name $MODULE_NAME \
        --job-dir $BUCKET_URI \
        --region $REGION \
        --runtime-version $RUNTIME_VERSION \
        --python-version $PYTHON_VERSION

In [ ]:
# Check the job status

!gcloud ai-platform jobs describe $JOB_NAME

Monitor output of your training job

Follow the instructions in the output of the gcloud command above to view the logs from your training job. You can also navigate to the Jobs Section of your Cloud Console to view logs.

Once your training job completes successfully, it'll export your trained model as a TensorFlow SavedModel and write the output to a directory in your Cloud Storage bucket.


In [ ]:
# Verify model was exported correctly

storage_client = storage.Client()
bucket = storage_client.get_bucket(BUCKET)
bucket_files = list(bucket.list_blobs(prefix=export_dir + '/'))

# If you see a saved_model.pb and a variables/ and assets/ directory here, it means your model was exported correctly in your training job. Yay!

for file in bucket_files:
    print(file)

Deploy a model version


In [ ]:
%%bash
# List models in project
models=$(gcloud ai-platform models list)

# Search for model in list; create if not found
if !(echo $models | grep -q $MODEL_NAME); then
    gcloud ai-platform models create $MODEL_NAME --regions $REGION
else
    echo 'Model already exists. Skipping create step.'
fi

In [ ]:
export_path = BUCKET_URI +'/' + export_dir
version = 'version_' + str(int(time.time()))

In [ ]:
# Create the model version

!gcloud ai-platform versions create $version \
  --model $model_name \
  --origin $export_path \
  --runtime-version=$runtime_version \
  --framework $framework \
  --python-version=$python_version

Get predictions on deployed model


In [ ]:
# Remove existing prediction file

if os.path.exists(predictions_file):
    !rm $PREDICTIONS_FILE

In [ ]:
prediction_json = {input_layer_name: X_test[0].tolist()}

with open(predictions_file, 'a') as outfile:
    json.dump(prediction_json, outfile)

In [ ]:
preds = !gcloud ai-platform predict --model $MODEL_NAME --json-instances=$PREDICTIONS_FILE --format="json"

In [ ]:
# Parse output

preds.pop(0) # Remove warning
preds = "\n".join(preds) # Concatenate list of strings into one string
preds = json.loads(preds) # Convert JSON string into Python dict

pred_val = preds['predictions'][0]['dense'][0] # Access prediction
pred_val

In [ ]:
# Print prediction and compare to actual value

print('Predicted sales:', int(round(inverse_scale(np.array([[pred_val]]))[0][0])))
print('Actual sales:   ', int(round(inverse_scale(np.array([y_test[0]]))[0][0])))

Conclusion

In this section, you've learned how to:

  • Prepare data and models for training in the cloud
  • Train your model and monitor the progress of the job with AI Platform Training
  • Predict using the model with AI Platform Predictions