LAB 5a: Training Keras model on Cloud AI Platform.

Learning Objectives

  1. Setup up the environment
  2. Create trainer module's to hold hyperparameter argparsing code
  3. Create trainer module's to hold Keras model code
  4. Run trainer module package locally
  5. Submit training job to Cloud AI Platform
  6. Submit hyperparameter tuning job to Cloud AI Platform


After having testing our training pipeline both locally and in the cloud on a susbset of the data, we can submit another (much larger) training job to the cloud. It is also a good idea to run a hyperparameter tuning job to make sure we have optimized the hyperparameters of our model.

In this notebook, we'll be training our Keras model at scale using Cloud AI Platform.

In this lab, we will set up the environment, create the trainer module's to hold hyperparameter argparsing code, create the trainer module's to hold Keras model code, run the trainer module package locally, submit a training job to Cloud AI Platform, and submit a hyperparameter tuning job to Cloud AI Platform.

Set up environment variables and load necessary libraries

First we will install the cloudml-hypertune package on our local machine. This is the package which we will use to report hyperparameter tuning metrics to Cloud AI Platform. Installing the package will allow us to test our trainer package locally.

!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst

!pip3 install cloudml-hypertune

Import necessary libraries.

import os

Set environment variables.

Set environment variables so that we can use them throughout the entire lab. We will be using our project name for our bucket, so you only need to change your project and region.

export PROJECT=$(gcloud config list project --format "value(core.project)")
echo "Your current GCP Project Name is: "${PROJECT}

# TODO: Change these to try this notebook out
PROJECT = "your-project-name-here"  # Replace with your PROJECT
BUCKET = PROJECT   # defaults to PROJECT
REGION = "us-central1"  # Replace with your REGION

os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "2.1"
os.environ["PYTHONVERSION"] = "3.7"

gcloud config set project ${PROJECT}
gcloud config set compute/region ${REGION}

if ! gsutil ls | grep -q gs://${BUCKET}; then
    gsutil mb -l ${REGION} gs://${BUCKET}

Check data exists

Verify that you previously created CSV files we'll be using for training and evaluation. If not, go back to lab 1b_prepare_data_babyweight to create them.

gsutil ls gs://${BUCKET}/babyweight/data/*000000000000.csv

Now that we have the Keras wide-and-deep code working on a subset of the data, we can package the TensorFlow code up as a Python module and train it on Cloud AI Platform.

Train on Cloud AI Platform

Training on Cloud AI Platform requires:

  • Making the code a Python package
  • Using gcloud to submit the training code to Cloud AI Platform

Ensure that the AI Platform API is enabled by going to this link.

Move code into a Python package

A Python package is simply a collection of one or more .py files along with an file to identify the containing directory as a package. The sometimes contains initialization code but for our purposes an empty file suffices.

The bash command touch creates an empty file in the specified location, the directory babyweight should already exist.

mkdir -p babyweight/trainer
touch babyweight/trainer/

We then use the %%writefile magic to write the contents of the cell below to a file called in the babyweight/trainer folder.

Create trainer module's to hold hyperparameter argparsing code.

The cell below writes the file babyweight/trainer/ which sets up our training job. Here is where we determine which parameters of our model to pass as flags during training using the parser module. Look at how batch_size is passed to the model in the code below. Use this as an example to parse arguements for the following variables

  • nnsize which represents the hidden layer sizes to use for DNN feature columns
  • nembeds which represents the embedding size of a cross of n key real-valued parameters
  • train_examples which represents the number of examples (in thousands) to run the training job
  • eval_steps which represents the positive number of steps for which to evaluate model

Be sure to include a default value for the parsed arguments above and specfy the type if necessary.

%%writefile babyweight/trainer/
import argparse
import json
import os

from trainer import model

import tensorflow as tf

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
        help="this model ignores this field, but it is required by gcloud",
        help="GCS location of training data",
        help="GCS location of evaluation data",
        help="GCS location to write checkpoints and export models",
        help="Number of examples to compute gradient over.",
        help="Hidden layer sizes for DNN -- provide space-separated layers",
        default=[128, 32, 4]
        help="Embedding size of a cross of n key real-valued parameters",
        help="Number of epochs to train the model.",
        help="""Number of examples (in thousands) to run the training job over.
        If this is more than actual # of examples available, it cycles through
        them. So specifying 1000 here when you have only 100k examples makes
        this 10 epochs.""",
        help="""Positive number of steps for which to evaluate model. Default
        to None, which means to evaluate until input_fn raises an end-of-input

    # Parse all arguments
    args = parser.parse_args()
    arguments = args.__dict__

    # Unused args provided by service
    arguments.pop("job_dir", None)
    arguments.pop("job-dir", None)

    # Modify some arguments
    arguments["train_examples"] *= 1000

    # Append trial_id to path if we are doing hptuning
    # This code can be removed if you are not using hyperparameter tuning
    arguments["output_dir"] = os.path.join(
            os.environ.get("TF_CONFIG", "{}")
        ).get("task", {}).get("trial", "")

    # Run the training job

In the same way we can write to the file the model that we developed in the previous notebooks.

Create trainer module's to hold Keras model code.

To create our, we'll use the code we wrote for the Wide & Deep model. Look back at your 9_keras_wide_and_deep_babyweight notebook and copy/paste the necessary code from that notebook into its place in the cell below.

%%writefile babyweight/trainer/
import datetime
import os
import shutil
import numpy as np
import tensorflow as tf
import hypertune

# Determine CSV, label, and key columns
CSV_COLUMNS = ["weight_pounds",
LABEL_COLUMN = "weight_pounds"

# Set default values for each CSV column.
# Treat is_male and plurality as strings.
DEFAULTS = [[0.0], ["null"], [0.0], ["null"], [0.0]]

def features_and_labels(row_data):
    """Splits features and labels from feature dictionary.

        row_data: Dictionary of CSV column names and tensor values.
        Dictionary of feature tensors and label tensor.
    label = row_data.pop(LABEL_COLUMN)

    return row_data, label  # features, label

def load_dataset(pattern, batch_size=1, mode='eval'):
    """Loads dataset using the API from CSV files.

        pattern: str, file pattern to glob into list of files.
        batch_size: int, the number of examples per batch.
        mode: 'train' | 'eval' to determine if training or evaluating.
        `Dataset` object.
    print("mode = {}".format(mode))
    # Make a CSV dataset
    dataset =

    # Map dataset to features and label
    dataset =  # features, label

    # Shuffle and repeat for training
    if mode == 'train':
        dataset = dataset.shuffle(buffer_size=1000).repeat()

    # Take advantage of multi-threading; 1=AUTOTUNE
    dataset = dataset.prefetch(buffer_size=1)

    return dataset

def create_input_layers():
    """Creates dictionary of input layers for each feature.

        Dictionary of `tf.Keras.layers.Input` layers for each feature.
    deep_inputs = {
        colname: tf.keras.layers.Input(
            name=colname, shape=(), dtype="float32")
        for colname in ["mother_age", "gestation_weeks"]

    wide_inputs = {
        colname: tf.keras.layers.Input(
            name=colname, shape=(), dtype="string")
        for colname in ["is_male", "plurality"]

    inputs = {**wide_inputs, **deep_inputs}

    return inputs

def categorical_fc(name, values):
    """Helper function to wrap categorical feature by indicator column.

        name: str, name of feature.
        values: list, list of strings of categorical values.
        Categorical and indicator column of categorical feature.
    cat_column = tf.feature_column.categorical_column_with_vocabulary_list(
            key=name, vocabulary_list=values)
    ind_column = tf.feature_column.indicator_column(

    return cat_column, ind_column

def create_feature_columns(nembeds):
    """Creates wide and deep dictionaries of feature columns from inputs.

        nembeds: int, number of dimensions to embed categorical column down to.
        Wide and deep dictionaries of feature columns.
    deep_fc = {
        colname: tf.feature_column.numeric_column(key=colname)
        for colname in ["mother_age", "gestation_weeks"]
    wide_fc = {}
    is_male, wide_fc["is_male"] = categorical_fc(
        "is_male", ["True", "False", "Unknown"])
    plurality, wide_fc["plurality"] = categorical_fc(
        "plurality", ["Single(1)", "Twins(2)", "Triplets(3)",
                      "Quadruplets(4)", "Quintuplets(5)", "Multiple(2+)"])

    # Bucketize the float fields. This makes them wide
    age_buckets = tf.feature_column.bucketized_column(
        boundaries=np.arange(15, 45, 1).tolist())
    wide_fc["age_buckets"] = tf.feature_column.indicator_column(

    gestation_buckets = tf.feature_column.bucketized_column(
        boundaries=np.arange(17, 47, 1).tolist())
    wide_fc["gestation_buckets"] = tf.feature_column.indicator_column(

    # Cross all the wide columns, have to do the crossing before we one-hot
    crossed = tf.feature_column.crossed_column(
        keys=[age_buckets, gestation_buckets],
    deep_fc["crossed_embeds"] = tf.feature_column.embedding_column(
        categorical_column=crossed, dimension=nembeds)

    return wide_fc, deep_fc

def get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units):
    """Creates model architecture and returns outputs.

        wide_inputs: Dense tensor used as inputs to wide side of model.
        deep_inputs: Dense tensor used as inputs to deep side of model.
        dnn_hidden_units: List of integers where length is number of hidden
            layers and ith element is the number of neurons at ith layer.
        Dense tensor output from the model.
    # Hidden layers for the deep side
    layers = [int(x) for x in dnn_hidden_units]
    deep = deep_inputs
    for layerno, numnodes in enumerate(layers):
        deep = tf.keras.layers.Dense(
    deep_out = deep

    # Linear model for the wide side
    wide_out = tf.keras.layers.Dense(
        units=10, activation="relu", name="linear")(wide_inputs)

    # Concatenate the two sides
    both = tf.keras.layers.concatenate(
        inputs=[deep_out, wide_out], name="both")

    # Final output is a linear activation because this is regression
    output = tf.keras.layers.Dense(
        units=1, activation="linear", name="weight")(both)

    return output

def rmse(y_true, y_pred):
    """Calculates RMSE evaluation metric.

        y_true: tensor, true labels.
        y_pred: tensor, predicted labels.
        Tensor with value of RMSE between true and predicted labels.
    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))

def build_wide_deep_model(dnn_hidden_units=[64, 32], nembeds=3):
    """Builds wide and deep model using Keras Functional API.

        `tf.keras.models.Model` object.
    # Create input layers
    inputs = create_input_layers()

    # Create feature columns for both wide and deep
    wide_fc, deep_fc = create_feature_columns(nembeds)

    # The constructor for DenseFeatures takes a list of numeric columns
    # The Functional API in Keras requires: LayerConstructor()(inputs)
    wide_inputs = tf.keras.layers.DenseFeatures(
        feature_columns=wide_fc.values(), name="wide_inputs")(inputs)
    deep_inputs = tf.keras.layers.DenseFeatures(
        feature_columns=deep_fc.values(), name="deep_inputs")(inputs)

    # Get output of model given inputs
    output = get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units)

    # Build model and compile it all together
    model = tf.keras.models.Model(inputs=inputs, outputs=output)
    model.compile(optimizer="adam", loss="mse", metrics=[rmse, "mse"])

    return model

def train_and_evaluate(args):
    model = build_wide_deep_model(args["nnsize"], args["nembeds"])
    print("Here is our Wide-and-Deep architecture so far:\n")

    trainds = load_dataset(

    evalds = load_dataset(
        args["eval_data_path"], 1000, 'eval')
    if args["eval_steps"]:
        evalds = evalds.take(count=args["eval_steps"])

    num_batches = args["batch_size"] * args["num_epochs"]
    steps_per_epoch = args["train_examples"] // num_batches

    checkpoint_path = os.path.join(args["output_dir"], "checkpoints/babyweight")
    cp_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_path, verbose=1, save_weights_only=True)

    history =
        verbose=2,  # 0=silent, 1=progress bar, 2=one line per epoch

    EXPORT_PATH = os.path.join(
        obj=model, export_dir=EXPORT_PATH)  # with default serving function
    hp_metric = history.history['val_rmse'][-1]

    hpt = hypertune.HyperTune()
    print("Exported trained model to {}".format(EXPORT_PATH))

Train locally

After moving the code to a package, make sure it works as a standalone. Note, we incorporated the --train_examples flag so that we don't try to train on the entire dataset while we are developing our pipeline. Once we are sure that everything is working on a subset, we can change it so that we can train on all the data. Even for this subset, this takes about 3 minutes in which you won't see any output ...

Run trainer module package locally.

We can run a very small training job over a single file with a small batch size, 1 epoch, 1 train example, and 1 eval step.

rm -rf ${OUTDIR}
export PYTHONPATH=${PYTHONPATH}:${PWD}/babyweight
python3 -m trainer.task \
    --job-dir=./tmp \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv  \
    --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
    --output_dir=${OUTDIR} \
    --batch_size=10 \
    --num_epochs=1 \
    --train_examples=1 \

Training on Cloud AI Platform

Now that we see everything is working locally, it's time to train on the cloud!

To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname] and simply specify some additional parameters for AI Platform Training Service:

  • jobname: A unique identifier for the Cloud job. We usually append system time to ensure uniqueness
  • job-dir: A GCS location to upload the Python package to
  • runtime-version: Version of TF to use.
  • python-version: Version of Python to use. Currently only Python 3.7 is supported for TF 2.1.
  • region: Cloud region to train in. See here for supported AI Platform Training Service regions

Below the -- \ we add in the arguments for our file.

JOBID=babyweight_$(date -u +%y%m%d_%H%M%S)

gcloud ai-platform jobs submit training ${JOBID} \
    --region=${REGION} \
    --module-name=trainer.task \
    --package-path=$(pwd)/babyweight/trainer \
    --job-dir=${OUTDIR} \
    --staging-bucket=gs://${BUCKET} \
    --master-machine-type=n1-standard-8 \
    --scale-tier=CUSTOM \
    --runtime-version=${TFVERSION} \
    --python-version=${PYTHONVERSION} \
    -- \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
    --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
    --output_dir=${OUTDIR} \
    --num_epochs=10 \
    --train_examples=10000 \
    --eval_steps=100 \
    --batch_size=32 \

The training job should complete within 10 to 15 minutes. You do not need to wait for this training job to finish before moving forward in the notebook, but will need a trained model to complete our next lab.

Hyperparameter tuning

All of these are command-line parameters to my program. To do hyperparameter tuning, create hyperparam.yaml and pass it as --config hyperparam.yaml. This step will take up to 2 hours -- you can increase maxParallelTrials or reduce maxTrials to get it done faster. Since maxParallelTrials is the number of initial seeds to start searching from, you don't want it to be too large; otherwise, all you have is a random search.

%%writefile hyperparam.yaml
    scaleTier: STANDARD_1
        hyperparameterMetricTag: rmse
        goal: MINIMIZE
        maxTrials: 20
        maxParallelTrials: 5
        enableTrialEarlyStopping: True
        - parameterName: batch_size
          type: INTEGER
          minValue: 8
          maxValue: 512
          scaleType: UNIT_LOG_SCALE
        - parameterName: nembeds
          type: INTEGER
          minValue: 3
          maxValue: 30
          scaleType: UNIT_LINEAR_SCALE

JOBID=babyweight_$(date -u +%y%m%d_%H%M%S)

gcloud ai-platform jobs submit training ${JOBID} \
    --region=${REGION} \
    --module-name=trainer.task \
    --package-path=$(pwd)/babyweight/trainer \
    --job-dir=${OUTDIR} \
    --staging-bucket=gs://${BUCKET} \
    --master-machine-type=n1-standard-8 \
    --scale-tier=CUSTOM \
    --config=hyperparam.yaml \
    --runtime-version=${TFVERSION} \
    --python-version=${PYTHONVERSION} \
    -- \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
    --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
    --output_dir=${OUTDIR} \
    --num_epochs=10 \
    --train_examples=5000 \
    --eval_steps=100 \
    --batch_size=32 \

Repeat training

This time with tuned parameters for batch_size and nembeds. Note that your best results may differ from below. So be sure to fill yours in!

JOBID=babyweight_$(date -u +%y%m%d_%H%M%S)
gsutil -m rm -rf ${OUTDIR}

gcloud ai-platform jobs submit training ${JOBID} \
    --region=${REGION} \
    --module-name=trainer.task \
    --package-path=$(pwd)/babyweight/trainer \
    --job-dir=${OUTDIR} \
    --staging-bucket=gs://${BUCKET} \
    --master-machine-type=n1-standard-8 \
    --scale-tier=CUSTOM \
    --runtime-version=${TFVERSION} \
    --python-version=${PYTHONVERSION} \
    -- \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
    --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
    --output_dir=${OUTDIR} \
    --num_epochs=10 \
    --train_examples=20000 \
    --eval_steps=100 \
    --batch_size=32 \

Lab Summary:

In this lab, we set up the environment, created the trainer module's to hold hyperparameter argparsing code, created the trainer module's to hold Keras model code, ran the trainer module package locally, submitted a training job to Cloud AI Platform, and submitted a hyperparameter tuning job to Cloud AI Platform.

Extra: Training on Cloud AI Platform using containers

Though we can directly submit TensorFlow 2.1 models using the gcloud ai-platform jobs submit training command, we can also submit containerized models for training. One advantage of using this approach is that we can use frameworks not natively supported by Cloud AI Platform for training and have more control over the environment in which the training loop is running.

The rest of this notebook is dedicated to using the containerized model approach.

Create Dockerfile

We need to create a container with everything we need to be able to run our model. This includes our trainer module package, python3, as well as the libraries we use such as the most up to date TensorFlow 2.0 version.

%%writefile babyweight/Dockerfile
COPY trainer /babyweight/trainer
RUN apt update && \
    apt install --yes python3-pip && \
    pip3 install --upgrade --quiet tensorflow==2.1 && \
    pip3 install --upgrade --quiet cloudml-hypertune

ENTRYPOINT ["python3", "babyweight/trainer/"]

Build and push container image to repo

Now that we have created our Dockerfile, we need to build and push our container image to our project's container repo. To do this, we'll create a small shell script that we can call from the bash.

%%writefile babyweight/
export PROJECT_ID=$(gcloud config list project --format "value(core.project)")
export IMAGE_REPO_NAME=babyweight_training_container

echo "Building  $IMAGE_URI"
docker build -f Dockerfile -t ${IMAGE_URI} ./
echo "Pushing $IMAGE_URI"
docker push ${IMAGE_URI}

Note: If you get a permissions/stat error when running from Notebooks, do it from CloudShell:

Open CloudShell on the GCP Console

This step takes 5-10 minutes to run.

cd babyweight

Test container locally

Before we submit our training job to Cloud AI Platform, let's make sure our container that we just built and pushed to our project's container repo works perfectly. We can do that by calling our container in bash and passing the necessary user_args for our's parser.

export PROJECT_ID=$(gcloud config list project --format "value(core.project)")
export IMAGE_REPO_NAME=babyweight_training_container
echo "Running  $IMAGE_URI"
docker run ${IMAGE_URI} \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
    --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
    --output_dir=gs://${BUCKET}/babyweight/trained_model \
    --batch_size=10 \
    --num_epochs=10 \
    --train_examples=1 \

Train on Cloud AI Platform

Once the code works in standalone mode, you can run it on Cloud AI Platform. Because this is on the entire dataset, it will take a while. The training run took about two hours for me. You can monitor the job from the GCP console in the Cloud AI Platform section.

JOBID=babyweight_$(date -u +%y%m%d_%H%M%S)
gsutil -m rm -rf ${OUTDIR}${PROJECT}/babyweight_training_container

gcloud ai-platform jobs submit training ${JOBID} \
    --staging-bucket=gs://${BUCKET} \
    --region=${REGION} \
    --master-image-uri=${IMAGE} \
    --master-machine-type=n1-standard-4 \
    --scale-tier=CUSTOM \
    -- \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
    --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
    --output_dir=${OUTDIR} \
    --num_epochs=10 \
    --train_examples=20000 \
    --eval_steps=100 \
    --batch_size=32 \

When I ran it, I used train_examples=2000000. When training finished, I filtered in the Stackdriver log on the word "dict" and saw that the last line was:

Saving dict for global step 5714290: average_loss = 1.06473, global_step = 5714290, loss = 34882.4, rmse = 1.03186
The final RMSE was 1.03 pounds.

Hyperparameter tuning

All of these are command-line parameters to my program. To do hyperparameter tuning, create hyperparam.yaml and pass it as --config hyperparam.yaml. This step will take up to 2 hours -- you can increase maxParallelTrials or reduce maxTrials to get it done faster. Since maxParallelTrials is the number of initial seeds to start searching from, you don't want it to be too large; otherwise, all you have is a random search.

Note that this is the same hyperparam.yaml file as above, but included here for convenience.

%%writefile hyperparam.yaml
    scaleTier: STANDARD_1
        hyperparameterMetricTag: rmse
        goal: MINIMIZE
        maxTrials: 20
        maxParallelTrials: 5
        enableTrialEarlyStopping: True
        - parameterName: batch_size
          type: INTEGER
          minValue: 8
          maxValue: 512
          scaleType: UNIT_LOG_SCALE
        - parameterName: nembeds
          type: INTEGER
          minValue: 3
          maxValue: 30
          scaleType: UNIT_LINEAR_SCALE

JOBNAME=babyweight_$(date -u +%y%m%d_%H%M%S)
gsutil -m rm -rf ${OUTDIR}${PROJECT}/babyweight_training_container

gcloud ai-platform jobs submit training ${JOBNAME} \
    --staging-bucket=gs://${BUCKET} \
    --region=${REGION} \
    --master-image-uri=${IMAGE} \
    --master-machine-type=n1-standard-8 \
    --scale-tier=CUSTOM \
    --config=hyperparam.yaml \
    -- \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
    --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
    --output_dir=${OUTDIR} \
    --num_epochs=10 \
    --train_examples=5000 \

Repeat training

This time with tuned parameters for batch_size and nembeds. Note that your best results may differ from below. So be sure to fill yours in!

JOBNAME=babyweight_$(date -u +%y%m%d_%H%M%S)
gsutil -m rm -rf ${OUTDIR}${PROJECT}/babyweight_training_container

gcloud ai-platform jobs submit training ${JOBNAME} \
    --staging-bucket=gs://${BUCKET} \
    --region=${REGION} \
    --master-image-uri=${IMAGE} \
    --master-machine-type=n1-standard-4 \
    --scale-tier=CUSTOM \
    -- \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
    --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
    --output_dir=${OUTDIR} \
    --num_epochs=10 \
    --train_examples=20000 \
    --eval_steps=100 \
    --batch_size=32 \

Extra Summary:

In this lab, we set up the environment, created the trainer module's to hold hyperparameter argparsing code, created the trainer module's to hold Keras model code, built a container to run the trainer package ran the trainer module package locally, submitted a training job to Cloud AI Platform, and submitted a hyperparameter tuning job to Cloud AI Platform.

