Introducing AI Platform Training Service

Learning Objectives:

  • Learn how to make code compatible with AI Platform Training Service
  • Train your model using cloud infrastructure via AI Platform Training Service
  • Deploy your model behind a production grade REST API using AI Platform Training Service

Introduction

In this notebook we'll make the jump from training and predicting locally, to do doing both in the cloud. We'll take advantage of Google Cloud's AI Platform Training Service.

AI Platform Training Service is a managed service that allows the training and deployment of ML models without having to provision or maintain servers. The infrastructure is handled seamlessly by the managed service for us.


In [ ]:
# Uncomment and run if you need to update your Google SDK
# !sudo apt-get update && sudo apt-get --only-upgrade install google-cloud-sdk

Make code compatible with AI Platform Training Service

In order to make our code compatible with AI Platform Training Service we need to make the following changes:

  1. Upload data to Google Cloud Storage
  2. Move code into a Python package
  3. Modify code to read data from and write checkpoint files to GCS

Upload data to Google Cloud Storage (GCS)

Cloud services don't have access to our local files, so we need to upload them to a location the Cloud servers can read from. In this case we'll use GCS.

Specify your project name and bucket name in the cell below.


In [ ]:
PROJECT = "cloud-training-demos"  # Replace with your PROJECT
BUCKET = "cloud-training-bucket"  # Replace with your BUCKET
REGION = "us-central1"            # Choose an available region for AI Platform Training Service
TFVERSION = "1.14"                # TF version for AI Platform Training Service to use

Jupyter allows the subsitution of python variables into bash commands when using the !<cmd> format. It is also possible using the %%bash magic but requires an additional parameter.


In [ ]:
!gcloud config set project {PROJECT}
!gsutil mb -l {REGION} gs://{BUCKET}
!gsutil -m cp *.csv gs://{BUCKET}/taxifare/smallinput/

Move code into a python package

When you execute a AI Platform Training Service training job, the service zips up your code and ships it to the Cloud so it can be run on Cloud infrastructure. In order to do this AI Platform Training Service requires your code to be a Python package.

A Python package is simply a collection of one or more .py files along with an __init__.py file to identify the containing directory as a package. The __init__.py sometimes contains initialization code but for our purposes an empty file suffices.

Create package directory and __init__.py

The bash command touch creates an empty file in the specified location.


In [ ]:
%%bash
mkdir taxifaremodel
touch taxifaremodel/__init__.py

Paste existing code into model.py

A Python package requires our code to be in a .py file, as opposed to notebook cells. So, we simply copy and paste our existing code for the previous notebook into a single file.

The %%writefile magic writes the contents of its cell to disk with the specified name.

In the cell below, write the content of themodel.py packaging the model we developped in the previous labs so that we can deploy it to AI Platform Training Service.

You'll need to reuse the input functions, the EvalSpec, TrainSpec, RunConfig, etc. we implemented in the previous labs.

Run the two cell below this one to test your code (the one that creates the task.py and the following one that launches a local training).

When your code runs locally, execute the next cells to train and deploy your packaged model to AI Platform Training Service.


In [ ]:
%%writefile taxifaremodel/model.py
import tensorflow as tf
import shutil

CSV_COLUMN_NAMES = ["fare_amount","dayofweek","hourofday","pickuplon","pickuplat","dropofflon","dropofflat"]
CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]
FEATURE_NAMES = CSV_COLUMN_NAMES[1:]

def parse_row(row):
    fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)
    features = dict(zip(CSV_COLUMN_NAMES, fields))
    label = features.pop("fare_amount")
    return features, label

def read_dataset(csv_path):
    dataset = tf.data.Dataset.list_files(file_pattern = csv_path)
    dataset = dataset.flat_map(lambda filename: tf.data.TextLineDataset(filenames = filename).skip(count = 1))
    dataset = dataset.map(map_func = parse_row)
    return dataset

def train_input_fn(csv_path, batch_size = 128):
    dataset = read_dataset(csv_path)
    dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)
    return dataset

def eval_input_fn(csv_path, batch_size = 128):
    dataset = read_dataset(csv_path)
    dataset = dataset.batch(batch_size = batch_size)
    return dataset
  
def serving_input_receiver_fn():
    receiver_tensors = {
        "dayofweek" : tf.placeholder(dtype = tf.int32, shape = [None]), 
        "hourofday" : tf.placeholder(dtype = tf.int32, shape = [None]),
        "pickuplon" : tf.placeholder(dtype = tf.float32, shape = [None]), 
        "pickuplat" : tf.placeholder(dtype = tf.float32, shape = [None]),
        "dropofflat" : tf.placeholder(dtype = tf.float32, shape = [None]),
        "dropofflon" : tf.placeholder(dtype = tf.float32, shape = [None])
    }
    
    features = receiver_tensors
    
    return tf.estimator.export.ServingInputReceiver(features = features, receiver_tensors = receiver_tensors)
      
def my_rmse(labels, predictions):
    pred_values = tf.squeeze(input = predictions["predictions"], axis = -1)
    return {"rmse": tf.metrics.root_mean_squared_error(labels = labels, predictions = pred_values)}

def create_model(model_dir, train_steps):
    config = tf.estimator.RunConfig(
        tf_random_seed = 1,
        save_checkpoints_steps = max(10, train_steps // 10),
        model_dir = model_dir
    )
    
    feature_cols = [tf.feature_column.numeric_column(key = k) for k in FEATURE_NAMES]
    
    model = tf.estimator.DNNRegressor(
        hidden_units = [10,10],
        feature_columns = feature_cols, 
        config = config
    )
    
    model = tf.contrib.estimator.add_metrics(model, my_rmse)
    
    return model

def train_and_evaluate(params):
    OUTDIR = params["output_dir"]
    TRAIN_DATA_PATH = params["train_data_path"]
    EVAL_DATA_PATH = params["eval_data_path"]
    TRAIN_STEPS = params["train_steps"]

    model = create_model(OUTDIR, TRAIN_STEPS)

    train_spec = tf.estimator.TrainSpec(
        input_fn = lambda: train_input_fn(TRAIN_DATA_PATH),
        max_steps = TRAIN_STEPS
    )    
    exporter = tf.estimator.FinalExporter(name = "exporter", serving_input_receiver_fn = serving_input_receiver_fn)
    
    eval_spec = tf.estimator.EvalSpec(
        input_fn = lambda: eval_input_fn(EVAL_DATA_PATH),
        steps = None,
        start_delay_secs = 1,
        throttle_secs = 1,
        exporters = exporter
    )
    
    tf.logging.set_verbosity(tf.logging.INFO)
    
    shutil.rmtree(path = OUTDIR, ignore_errors = True)

    tf.estimator.train_and_evaluate(estimator = model, train_spec = train_spec, eval_spec = eval_spec)

Modify code to read data from and write checkpoint files to GCS

If you look closely above, you'll notice two changes to the code

  1. The input function now supports reading a list of files matching a file name pattern instead of just a single CSV
    • This is useful because large datasets tend to exist in shards.
  2. The train and evaluate portion is wrapped in a function that takes a parameter dictionary as an argument.
    • This is useful because the output directory, data paths and number of train steps will be different depending on whether we're training locally or in the cloud. Parametrizing allows us to use the same code for both.

We specify these parameters at run time via the command line. Which means we need to add code to parse command line parameters and invoke train_and_evaluate() with those params. This is the job of the task.py file.

Exposing parameters to the command line also allows us to use AI Platform Training Service's automatic hyperparameter tuning feature which we'll cover in a future lesson.


In [ ]:
%%writefile taxifaremodel/task.py
import argparse
import json
import os

from . import model


if __name__ == "__main__":
    
    parser = argparse.ArgumentParser()
    
    parser.add_argument(
        "--train_data_path",
        help = "GCS or local path to training data",
        required = True
    )
    parser.add_argument(
        "--train_steps",
        help = "Steps to run the training job for (default: 1000)",
        type = int,
        default = 1000
    )
    parser.add_argument(
        "--eval_data_path",
        help = "GCS or local path to evaluation data",
        required = True
    )
    parser.add_argument(
        "--output_dir",
        help = "GCS location to write checkpoints and export models",
        required = True
    )
    parser.add_argument(
        "--job-dir",
        help = "This is not used by our model, but it is required by gcloud",
    )
    args = parser.parse_args().__dict__

    model.train_and_evaluate(args)

Train using AI Platform Training Service (Local)

AI Platform Training Service comes with a local test tool (gcloud ai-platform local train) to ensure we've packaged our code directly. It's best to first run that for a few steps before trying a Cloud job.

The arguments before -- \ are for AI Platform Training Service

  • package-path: speficies the location of the Python package
  • module-name: specifies which .py file should be run within the package. task.py is our entry point so we specify that

The arguments after -- \ are sent to our task.py.


In [ ]:
%%time
!gcloud ai-platform local train \
    --package-path=taxifaremodel \
    --module-name=taxifaremodel.task \
    -- \
    --train_data_path=taxi-train.csv \
    --eval_data_path=taxi-valid.csv  \
    --train_steps=1 \
    --output_dir=taxi_trained

Train using AI Platform Training Service (Cloud)

To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname] and simply specify some additional parameters for AI Platform Training Service:

  • jobname: A unique identifier for the Cloud job. We usually append system time to ensure uniqueness
  • job-dir: A GCS location to upload the Python package to
  • runtime-version: Version of TF to use. Defaults to 1.0 if not specified
  • python-version: Version of Python to use. Defaults to 2.7 if not specified
  • region: Cloud region to train in. See here for supported AI Platform Training Service regions

Below the -- \ note how we've changed our task.py args to be GCS locations


In [ ]:
OUTDIR = "gs://{}/taxifare/trained_small".format(BUCKET)

In [ ]:
!gsutil -m rm -rf {OUTDIR} # start fresh each time
!gcloud ai-platform jobs submit training taxifare_$(date -u +%y%m%d_%H%M%S) \
    --package-path=taxifaremodel \
    --module-name=taxifaremodel.task \
    --job-dir=gs://{BUCKET}/taxifare \
    --python-version=3.5 \
    --runtime-version={TFVERSION} \
    --region={REGION} \
    -- \
    --train_data_path=gs://{BUCKET}/taxifare/smallinput/taxi-train.csv \
    --eval_data_path=gs://{BUCKET}/taxifare/smallinput/taxi-valid.csv  \
    --train_steps=1000 \
    --output_dir={OUTDIR}

You can track your job and view logs using cloud console. It will take 5-10 minutes to complete. Wait till the job finishes before moving on.

Deploy model

Now let's take our exported SavedModel and deploy it behind a REST API. To do so we'll use AI Platform Training Service's managed TF Serving feature which auto-scales based on load.


In [ ]:
!gsutil ls gs://{BUCKET}/taxifare/trained_small/export/exporter

AI Platform Training Service uses a model versioning system. First you create a model folder, and within the folder you create versions of the model.

Note: You will see an error below if the model folder already exists, it is safe to ignore


In [ ]:
VERSION='v1'
!gcloud ai-platform models create taxifare --regions us-central1
!gcloud ai-platform versions delete {VERSION} --model taxifare --quiet
!gcloud ai-platform versions create {VERSION} --model taxifare \
    --origin $(gsutil ls gs://{BUCKET}/taxifare/trained_small/export/exporter | tail -1) \
    --python-version=3.5 \
    --runtime-version {TFVERSION}

Online prediction

Now that we have deployed our model behind a production grade REST API, we can invoke it remotely.

We could invoke it directly calling the REST API with an HTTP POST request reference docs, however AI Platform Training Service provides an easy way to invoke it via command line.

Invoke prediction REST API via command line

First we write our prediction requests to file in json format


In [ ]:
%%writefile ./test.json
{"dayofweek": 1, "hourofday": 0, "pickuplon": -73.885262, "pickuplat": 40.773008, "dropofflon": -73.987232, "dropofflat": 40.732403}

Then we use gcloud ai-platform predict and specify the model name and location of the json file. Since we don't explicitly specify --version, the default model version will be used.

Since we only have one version it is already the default, but if we had multiple model versions we can designate the default using gcloud ai-platform versions set-default or using cloud console


In [ ]:
!gcloud ai-platform predict --model=taxifare --json-instances=./test.json

Invoke prediction REST API via python

In the cell below, use the Google Python client library to query the model you just deployed on AI Platform Training Service.


In [ ]:
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import json

credentials = GoogleCredentials.get_application_default()
api = discovery.build("ml", "v1", credentials = credentials,
            discoveryServiceUrl = "https://storage.googleapis.com/cloud-ml/discovery/ml_v1_discovery.json")

request_data = {"instances":
  [
      {
        "dayofweek": 1,
        "hourofday": 8,
        "pickuplon": -73.885,
        "pickuplat": 40.773,
        "dropofflon": -73.987,
        "dropofflat": 40.732,
      }
  ]
}

parent = "projects/{}/models/taxifare".format(PROJECT) # use default version
#parent = "projects/{}/models/taxifare/versions/{}".format(PROJECT,VERSION) # specify a specific version

response = api.projects().predict(body = request_data, name = parent).execute()
print("response = {0}".format(response))

Challenge exercise

Modify your solution to the challenge exercise in d_trainandevaluate.ipynb appropriately. Make sure that you implement training and deployment. Increase the size of your dataset by 10x since you are running on the cloud. Does your accuracy improve?

Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License