In [ ]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
In this notebook we'll use a liquor sales dataset from Kaggle to learn how to submit a job to AI Platform Training. In the job we'll train our TensorFlow 2 model and export the saved model to Cloud Storage.
The Iowa Liquor Sales dataset from BigQuery Public Datasets is used in this example. The dataset contains wholesale liquor purchases in the state of Iowa from 2012 to the present.
The following steps are required, regardless of your notebook environment.
Select or create a GCP project.. When you first create an account, you get a $300 free credit towards your compute/storage costs.
In [ ]:
import datetime
import json
import os
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import tensorflow as tf
from google.cloud import storage
from pandas.plotting import register_matplotlib_converters
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
In [ ]:
# Check the TensorFlow version installed
tf.__version__
In [ ]:
# Enter your project, region, and bucket. Then run the cell to make sure the
# Cloud SDK uses the right project for all the commands in this notebook.
PROJECT = "your-project-name" # REPLACE WITH YOUR PROJECT NAME
BUCKET = "your-bucket-id" # REPLACE WITH YOUR BUCKET ID
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1
BUCKET_URI = "gs://" + BUCKET
assert PROJECT != 'your-project-name', 'Don''t forget to change the project variables!'
%env BUCKET_URI=$BUCKET_URI
%env REGION=$REGION
In [ ]:
target_col = 'y' # What we are predicting
ts_col = 'ds' # Time series column
if os.path.exists('iowa_daily.csv'):
input_file = 'iowa_daily.csv' # File created in previous lab
else:
input_file = 'data/iowa_daily.csv'
n_features = 2 # Two features: y (previous values) and whether the date is a holiday
n_input_steps = 30 # Lookback window
n_output_steps = 1 # How many steps to predict forward
train_split = 0.75 # % Split between train/test data
epochs = 1000 # How many passes through the data (early-stopping will cause training to stop before this)
patience = 5 # Terminate training after the validation loss does not decrease after this many epochs
lstm_units = 64
model_name = 'retail_forecasting'
framework='TENSORFLOW'
runtime_version = '2.1'
python_version = '3.7'
predictions_file = 'predictions.json'
input_layer_name = 'lstm_input'
%env MODEL_NAME = $model_name
%env FRAMEWORK = $framework
%env RUNTIME_VERSION = $runtime_version
%env PYTHON_VERSION = $python_version
%env PREDICTIONS_FILE = $predictions_file
The following steps are required, regardless of your notebook environment.
When you submit a training job using the Cloud SDK, you upload a Python package containing your training code to a Cloud Storage bucket. AI Platform runs the code from this package. In this tutorial, AI Platform also saves the trained model that results from your job in the same bucket. You can then create an AI Platform model version based on this output in order to serve online predictions.
In [ ]:
storage_client = storage.Client()
try:
bucket = storage_client.get_bucket(BUCKET)
print('Bucket exists, let''s not recreate it.')
except:
bucket = storage_client.create_bucket(BUCKET)
print('Created bucket: ' + BUCKET)
In [ ]:
df = pd.read_csv(input_file, index_col=ts_col, parse_dates=True)
In [ ]:
# Plot one 30-day window of sales
df[target_col][:30].plot()
In [ ]:
# Split data
size = int(len(df) * train_split)
df_train, df_test = df[0:size].copy(deep=True), df[size:len(df)].copy(deep=True)
df_train.head()
In [ ]:
df_train.plot()
In [ ]:
# Review original values
df_train.head()
In [ ]:
# For neural networks to converge quicker, it is helpful to scale the values.
# For example, each feature might be transformed to have a mean of 0 and std. dev. of 1.
#
# We are working with a mix of features, input timesteps, output horizon, etc.
# which don't work out-of-the-box with common scaling utilities.
# So, here are a couple wrappers to handle scaling and inverting the scaling.
feature_scaler = StandardScaler()
target_scaler = StandardScaler()
def scale(df,
fit=True,
target_col=target_col,
feature_scaler=feature_scaler,
target_scaler=target_scaler):
"""
Scale the input features, using a separate scaler for the target.
Parameters:
df (pd.DataFrame): Input dataframe
fit (bool): Whether to fit the scaler to the data (only apply to training data)
target_col (pd.Series): The column that is being predicted
feature_scaler (StandardScaler): Scaler used for features
target_scaler (StandardScaler): Scaler used for target
Returns:
df_scaled (pd.DataFrame): Scaled dataframe
"""
target = df[target_col].values.reshape(-1, 1)
if fit:
target_scaler.fit(target)
data_scaled = target_scaler.transform(target)
features = df.loc[:, df.columns != target_col].values
if features.shape[1]:
if fit:
feature_scaler.fit(features)
features_scaled = feature_scaler.transform(features)
data_scaled = np.concatenate([data_scaled, features_scaled], axis=1)
df_scaled = pd.DataFrame(data_scaled, columns=df.columns)
return df_scaled
def inverse_scale(data, target_scaler=target_scaler):
"""
Transform the scaled values of the target back into their original form.
The features are left alone, as we're assuming that the output of the model only includes the target.
Parameters:
data (np.array): Input array
target_scaler (StandardScaler): Scaler used for target
Returns:
data_scaled (np.array): Scaled array
"""
df = pd.DataFrame()
data_scaled = np.empty([data.shape[1], data.shape[0]])
for i in range(data.shape[1]):
data_scaled[i] = target_scaler.inverse_transform(data[:,i])
return data_scaled.transpose()
df_train_scaled=scale(df_train)
df_test_scaled=scale(df_test, False)
In [ ]:
# Review scaled values
df_train_scaled.head()
In [ ]:
def reframe(data, n_input_steps = n_input_steps, n_output_steps = n_output_steps):
# Iterate through data and create sequences of features and outputs
df = pd.DataFrame(data)
cols=list()
for i in range(n_input_steps, 0, -1):
cols.append(df.shift(i))
for i in range(0, n_output_steps):
cols.append(df.shift(-i))
# Concatenate values and remove any missing values
df = pd.concat(cols, axis=1)
df.dropna(inplace=True)
# Split the data into feature and target variables
n_feature_cols = n_input_steps * n_features
features = df.iloc[:,0:n_feature_cols]
target_cols = [i for i in range(n_feature_cols, n_feature_cols + n_output_steps * n_features, n_features)]
targets = df.iloc[:,target_cols]
return (features, targets)
X_train_reframed, y_train_reframed = reframe(df_train_scaled)
X_test_reframed, y_test_reframed = reframe(df_test_scaled)
df_train_scaled.iloc[n_input_steps + index]['y']
should be equal to y_train_reframed[index]
, and the same for the test dataset.
In other words, the ground truth labels should contain the sales for the next day after the training window (or whatever future projection you're making).
In [ ]:
for i in range(3):
print(df_train_scaled.iloc[n_input_steps + i][target_col])
print(y_train_reframed.iloc[i].values[0])
print()
In [ ]:
# Confirm the same is true for the test set
for i in range(3):
print(df_test_scaled.iloc[n_input_steps + i][target_col])
print(y_test_reframed.iloc[i].values[0])
print()
The model we're building here trains pretty fast so we could train it in this notebook, but for more computationally expensive models, it's useful to train them in the Cloud. To show you how AI Platform Training works, we'll package up our training code and submit a training job to the AI Platform Prediction service.
In our training script, we'll also export our trained SavedModel
to a Cloud Storage bucket.
In [ ]:
# Reshape test data to match model inputs and outputs
X_train = X_train_reframed.values.reshape(X_train_reframed.shape[0], n_input_steps, n_features)
X_test = X_test_reframed.values.reshape(X_test_reframed.shape[0], n_input_steps, n_features)
y_train = y_train_reframed.values.reshape(y_train_reframed.shape[0], n_output_steps)
y_test = y_test_reframed.values.reshape(y_test_reframed.shape[0], n_output_steps)
In [ ]:
trainer_dir = 'trainer'
export_dir = 'tf_export'
%env trainer_dir = trainer_dir
%env export_dir = export_dir
In [ ]:
!mkdir $trainer_dir
!touch $trainer_dir/__init__.py
In [ ]:
# Copy numpy arrays to npy files
np.save(trainer_dir + '/x_train.npy', X_train)
np.save(trainer_dir + '/x_test.npy', X_test)
np.save(trainer_dir + '/y_train.npy', y_train)
np.save(trainer_dir + '/y_test.npy', y_test)
In [ ]:
# Write training code out to a file that will be submitted to the training job
# Note: f-strings are supported in Python 3.6 and above
model_template = f"""import argparse
import numpy as np
import os
import tempfile
from google.cloud import storage
from tensorflow import keras
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.callbacks import EarlyStopping
n_features = {n_features} # Two features: y (previous values) and whether the date is a holiday
n_input_steps = {n_input_steps} # Lookback window
n_output_steps = {n_output_steps} # How many steps to predict forward
epochs = {epochs} # How many passes through the data (early-stopping will cause training to stop before this)
patience = {patience} # Terminate training after the validation loss does not decrease after this many epochs
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'--job-dir',
default=None,
help='URL to store the job output')
args = parser.parse_args()
print(args)
return args
def main():
args = get_args()
print('args: ', args)
model_dir = args.job_dir
storage_client = storage.Client()
bucket_name = model_dir.split('/')[2]
bucket = storage_client.get_bucket(bucket_name)
# Get the training data and convert back to np arrays
local_data_dir = os.path.join(os.getcwd(), tempfile.gettempdir())
data_files = ['x_train.npy', 'y_train.npy', 'x_test.npy', 'y_test.npy']
for i in data_files:
blob = storage.Blob('{trainer_dir}/' + i, bucket)
destination_file = local_data_dir + '/' + i
open(destination_file, 'a').close()
blob.download_to_filename(destination_file)
X_train = np.load(local_data_dir + '/x_train.npy')
y_train = np.load(local_data_dir + '/y_train.npy')
X_test = np.load(local_data_dir + '/x_test.npy')
y_test = np.load(local_data_dir + '/y_test.npy')
# Build and train the model
model = Sequential([
LSTM({lstm_units}, input_shape=[n_input_steps, n_features], recurrent_activation=None),
Dense(n_output_steps)])
model.compile(optimizer='adam', loss='mae')
early_stopping = EarlyStopping(monitor='val_loss', patience=patience)
_ = model.fit(x=X_train, y=y_train, validation_data=(X_test, y_test), epochs=epochs, callbacks=[early_stopping])
# Export the model
export_path = os.path.join(model_dir, '{export_dir}')
model.save(export_path)
if __name__ == '__main__':
main()
"""
with open(os.path.join(trainer_dir, 'model.py'), 'w') as f:
f.write(model_template.format(**globals()))
In [ ]:
# Copy the train data files to a GCS bucket
!gsutil -m cp -r trainer $BUCKET_URI
In [ ]:
!gsutil ls $BUCKET_URI/$trainer_dir
In [ ]:
# Re-run this if you need to create a new training job
timestamp = str(datetime.datetime.now().time())
JOB_NAME = 'caip_training_' + str(int(time.time()))
%env JOB_NAME=$JOB_NAME
In [ ]:
MODULE_NAME = trainer_dir + '.model'
TRAIN_DIR = os.getcwd() + '/' + trainer_dir
JOB_DIR = BUCKET_URI
%env TRAIN_DIR=$TRAIN_DIR
%env MODULE_NAME=$MODULE_NAME
In [ ]:
# Submit the training job
!gcloud ai-platform jobs submit training $JOB_NAME \
--scale-tier basic \
--package-path $TRAIN_DIR \
--module-name $MODULE_NAME \
--job-dir $BUCKET_URI \
--region $REGION \
--runtime-version $RUNTIME_VERSION \
--python-version $PYTHON_VERSION
In [ ]:
# Check the job status
!gcloud ai-platform jobs describe $JOB_NAME
Follow the instructions in the output of the gcloud command above to view the logs from your training job. You can also navigate to the Jobs Section of your Cloud Console to view logs.
Once your training job completes successfully, it'll export your trained model as a TensorFlow SavedModel
and write the output to a directory in your Cloud Storage bucket.
In [ ]:
# Verify model was exported correctly
storage_client = storage.Client()
bucket = storage_client.get_bucket(BUCKET)
bucket_files = list(bucket.list_blobs(prefix=export_dir + '/'))
# If you see a saved_model.pb and a variables/ and assets/ directory here, it means your model was exported correctly in your training job. Yay!
for file in bucket_files:
print(file)
In [ ]:
%%bash
# List models in project
models=$(gcloud ai-platform models list)
# Search for model in list; create if not found
if !(echo $models | grep -q $MODEL_NAME); then
gcloud ai-platform models create $MODEL_NAME --regions $REGION
else
echo 'Model already exists. Skipping create step.'
fi
In [ ]:
export_path = BUCKET_URI +'/' + export_dir
version = 'version_' + str(int(time.time()))
In [ ]:
# Create the model version
!gcloud ai-platform versions create $version \
--model $model_name \
--origin $export_path \
--runtime-version=$runtime_version \
--framework $framework \
--python-version=$python_version
In [ ]:
# Remove existing prediction file
if os.path.exists(predictions_file):
!rm $PREDICTIONS_FILE
In [ ]:
prediction_json = {input_layer_name: X_test[0].tolist()}
with open(predictions_file, 'a') as outfile:
json.dump(prediction_json, outfile)
In [ ]:
preds = !gcloud ai-platform predict --model $MODEL_NAME --json-instances=$PREDICTIONS_FILE --format="json"
In [ ]:
# Parse output
preds.pop(0) # Remove warning
preds = "\n".join(preds) # Concatenate list of strings into one string
preds = json.loads(preds) # Convert JSON string into Python dict
pred_val = preds['predictions'][0]['dense'][0] # Access prediction
pred_val
In [ ]:
# Print prediction and compare to actual value
print('Predicted sales:', int(round(inverse_scale(np.array([[pred_val]]))[0][0])))
print('Actual sales: ', int(round(inverse_scale(np.array([y_test[0]]))[0][0])))
In this section, you've learned how to: