Learning Objectives:
In this notebook we'll make the jump from training and predicting locally, to do doing both in the cloud. We'll take advantage of Google Cloud's AI Platform Training Service.
AI Platform Training Service is a managed service that allows the training and deployment of ML models without having to provision or maintain servers. The infrastructure is handled seamlessly by the managed service for us.
In [ ]:
# Uncomment and run if you need to update your Google SDK
# !sudo apt-get update && sudo apt-get --only-upgrade install google-cloud-sdk
In [ ]:
PROJECT = "cloud-training-demos" # Replace with your PROJECT
BUCKET = "cloud-training-bucket" # Replace with your BUCKET
REGION = "us-central1" # Choose an available region for AI Platform Training Service
TFVERSION = "1.14" # TF version for AI Platform Training Service to use
Jupyter allows the subsitution of python variables into bash commands when using the !<cmd> format.
It is also possible using the %%bash magic but requires an additional parameter.
In [ ]:
!gcloud config set project {PROJECT}
!gsutil mb -l {REGION} gs://{BUCKET}
!gsutil -m cp *.csv gs://{BUCKET}/taxifare/smallinput/
When you execute a AI Platform Training Service training job, the service zips up your code and ships it to the Cloud so it can be run on Cloud infrastructure. In order to do this AI Platform Training Service requires your code to be a Python package.
A Python package is simply a collection of one or more .py files along with an __init__.py file to identify the containing directory as a package. The __init__.py sometimes contains initialization code but for our purposes an empty file suffices.
In [ ]:
%%bash
mkdir taxifaremodel
touch taxifaremodel/__init__.py
In the cell below, write the content of themodel.py packaging the model we
developped in the previous labs so that we can deploy it to AI Platform Training Service.
You'll need to reuse the input functions, the EvalSpec, TrainSpec, RunConfig, etc.
we implemented in the previous labs.
Run the two cell below this one to test your code (the one that creates the task.py and the following one that launches a local training).
When your code runs locally, execute the next cells to train and deploy your packaged model to AI Platform Training Service.
In [ ]:
%%writefile taxifaremodel/model.py
import tensorflow as tf
import shutil
CSV_COLUMN_NAMES = ["fare_amount","dayofweek","hourofday","pickuplon","pickuplat","dropofflon","dropofflat"]
CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]
FEATURE_NAMES = CSV_COLUMN_NAMES[1:]
def parse_row(row):
fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)
features = dict(zip(CSV_COLUMN_NAMES, fields))
label = features.pop("fare_amount")
return features, label
def read_dataset(csv_path):
dataset = tf.data.Dataset.list_files(file_pattern = csv_path)
dataset = dataset.flat_map(lambda filename: tf.data.TextLineDataset(filenames = filename).skip(count = 1))
dataset = dataset.map(map_func = parse_row)
return dataset
def train_input_fn(csv_path, batch_size = 128):
dataset = read_dataset(csv_path)
dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)
return dataset
def eval_input_fn(csv_path, batch_size = 128):
dataset = read_dataset(csv_path)
dataset = dataset.batch(batch_size = batch_size)
return dataset
def serving_input_receiver_fn():
receiver_tensors = {
"dayofweek" : tf.placeholder(dtype = tf.int32, shape = [None]),
"hourofday" : tf.placeholder(dtype = tf.int32, shape = [None]),
"pickuplon" : tf.placeholder(dtype = tf.float32, shape = [None]),
"pickuplat" : tf.placeholder(dtype = tf.float32, shape = [None]),
"dropofflat" : tf.placeholder(dtype = tf.float32, shape = [None]),
"dropofflon" : tf.placeholder(dtype = tf.float32, shape = [None])
}
features = receiver_tensors
return tf.estimator.export.ServingInputReceiver(features = features, receiver_tensors = receiver_tensors)
def my_rmse(labels, predictions):
pred_values = tf.squeeze(input = predictions["predictions"], axis = -1)
return {"rmse": tf.metrics.root_mean_squared_error(labels = labels, predictions = pred_values)}
def create_model(model_dir, train_steps):
config = tf.estimator.RunConfig(
tf_random_seed = 1,
save_checkpoints_steps = max(10, train_steps // 10),
model_dir = model_dir
)
feature_cols = [tf.feature_column.numeric_column(key = k) for k in FEATURE_NAMES]
model = tf.estimator.DNNRegressor(
hidden_units = [10,10],
feature_columns = feature_cols,
config = config
)
model = tf.contrib.estimator.add_metrics(model, my_rmse)
return model
def train_and_evaluate(params):
OUTDIR = params["output_dir"]
TRAIN_DATA_PATH = params["train_data_path"]
EVAL_DATA_PATH = params["eval_data_path"]
TRAIN_STEPS = params["train_steps"]
model = create_model(OUTDIR, TRAIN_STEPS)
train_spec = tf.estimator.TrainSpec(
input_fn = lambda: train_input_fn(TRAIN_DATA_PATH),
max_steps = TRAIN_STEPS
)
exporter = tf.estimator.FinalExporter(name = "exporter", serving_input_receiver_fn = serving_input_receiver_fn)
eval_spec = tf.estimator.EvalSpec(
input_fn = lambda: eval_input_fn(EVAL_DATA_PATH),
steps = None,
start_delay_secs = 1,
throttle_secs = 1,
exporters = exporter
)
tf.logging.set_verbosity(tf.logging.INFO)
shutil.rmtree(path = OUTDIR, ignore_errors = True)
tf.estimator.train_and_evaluate(estimator = model, train_spec = train_spec, eval_spec = eval_spec)
If you look closely above, you'll notice two changes to the code
We specify these parameters at run time via the command line. Which means we need to add code to parse command line parameters and invoke train_and_evaluate() with those params. This is the job of the task.py file.
Exposing parameters to the command line also allows us to use AI Platform Training Service's automatic hyperparameter tuning feature which we'll cover in a future lesson.
In [ ]:
%%writefile taxifaremodel/task.py
import argparse
import json
import os
from . import model
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--train_data_path",
help = "GCS or local path to training data",
required = True
)
parser.add_argument(
"--train_steps",
help = "Steps to run the training job for (default: 1000)",
type = int,
default = 1000
)
parser.add_argument(
"--eval_data_path",
help = "GCS or local path to evaluation data",
required = True
)
parser.add_argument(
"--output_dir",
help = "GCS location to write checkpoints and export models",
required = True
)
parser.add_argument(
"--job-dir",
help = "This is not used by our model, but it is required by gcloud",
)
args = parser.parse_args().__dict__
model.train_and_evaluate(args)
AI Platform Training Service comes with a local test tool (gcloud ai-platform local train) to ensure we've packaged our code directly. It's best to first run that for a few steps before trying a Cloud job.
The arguments before -- \ are for AI Platform Training Service
.py file should be run within the package. task.py is our entry point so we specify thatThe arguments after -- \ are sent to our task.py.
In [ ]:
%%time
!gcloud ai-platform local train \
--package-path=taxifaremodel \
--module-name=taxifaremodel.task \
-- \
--train_data_path=taxi-train.csv \
--eval_data_path=taxi-valid.csv \
--train_steps=1 \
--output_dir=taxi_trained
To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname] and simply specify some additional parameters for AI Platform Training Service:
Below the -- \ note how we've changed our task.py args to be GCS locations
In [ ]:
OUTDIR = "gs://{}/taxifare/trained_small".format(BUCKET)
In [ ]:
!gsutil -m rm -rf {OUTDIR} # start fresh each time
!gcloud ai-platform jobs submit training taxifare_$(date -u +%y%m%d_%H%M%S) \
--package-path=taxifaremodel \
--module-name=taxifaremodel.task \
--job-dir=gs://{BUCKET}/taxifare \
--python-version=3.5 \
--runtime-version={TFVERSION} \
--region={REGION} \
-- \
--train_data_path=gs://{BUCKET}/taxifare/smallinput/taxi-train.csv \
--eval_data_path=gs://{BUCKET}/taxifare/smallinput/taxi-valid.csv \
--train_steps=1000 \
--output_dir={OUTDIR}
You can track your job and view logs using cloud console. It will take 5-10 minutes to complete. Wait till the job finishes before moving on.
In [ ]:
!gsutil ls gs://{BUCKET}/taxifare/trained_small/export/exporter
AI Platform Training Service uses a model versioning system. First you create a model folder, and within the folder you create versions of the model.
Note: You will see an error below if the model folder already exists, it is safe to ignore
In [ ]:
VERSION='v1'
!gcloud ai-platform models create taxifare --regions us-central1
!gcloud ai-platform versions delete {VERSION} --model taxifare --quiet
!gcloud ai-platform versions create {VERSION} --model taxifare \
--origin $(gsutil ls gs://{BUCKET}/taxifare/trained_small/export/exporter | tail -1) \
--python-version=3.5 \
--runtime-version {TFVERSION}
Now that we have deployed our model behind a production grade REST API, we can invoke it remotely.
We could invoke it directly calling the REST API with an HTTP POST request reference docs, however AI Platform Training Service provides an easy way to invoke it via command line.
In [ ]:
%%writefile ./test.json
{"dayofweek": 1, "hourofday": 0, "pickuplon": -73.885262, "pickuplat": 40.773008, "dropofflon": -73.987232, "dropofflat": 40.732403}
Then we use gcloud ai-platform predict and specify the model name and location of the json file. Since we don't explicitly specify --version, the default model version will be used.
Since we only have one version it is already the default, but if we had multiple model versions we can designate the default using gcloud ai-platform versions set-default or using cloud console
In [ ]:
!gcloud ai-platform predict --model=taxifare --json-instances=./test.json
In [ ]:
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import json
credentials = GoogleCredentials.get_application_default()
api = discovery.build("ml", "v1", credentials = credentials,
discoveryServiceUrl = "https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json")
request_data = {"instances":
[
{
"dayofweek": 1,
"hourofday": 8,
"pickuplon": -73.885,
"pickuplat": 40.773,
"dropofflon": -73.987,
"dropofflat": 40.732,
}
]
}
parent = "projects/{}/models/taxifare".format(PROJECT) # use default version
#parent = "projects/{}/models/taxifare/versions/{}".format(PROJECT,VERSION) # specify a specific version
response = api.projects().predict(body = request_data, name = parent).execute()
print("response = {0}".format(response))
Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License