Training Keras model on Cloud AI Platform

Learning Objectives

  1. Create a BigQuery Dataset and Google Cloud Storage Bucket
  2. Export from BigQuery to CSVs in GCS
  3. Training on Cloud AI Platform

Note: This notebook requires TensorFlow 2.1 as we are creating a model using Keras.

TODO: Complete the lab notebook #TODO sections. You can refer to the solutions/ notebook for reference.

This notebook illustrates distributed training and hyperparameter tuning on Cloud AI Platform (formerly known as Cloud ML Engine). This uses Keras and requires TensorFlow 2.1


In [ ]:
# Ensure the right version of Tensorflow is installed.
!pip freeze | grep tensorflow==2.1

Set up environment variables and load necessary libraries

Set environment variables so that we can use them throughout the entire notebook. We will be using our project name for our bucket, so you only need to change your project and region.


In [2]:
# change these to try this notebook out
BUCKET = 'cloud-training-demos-ml' # Replace with the your bucket name
PROJECT = 'cloud-training-demos' # Replace with your project-id
REGION = 'us-central1'

In [ ]:
import os
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "2.1"
os.environ["PYTHONVERSION"] = "3.7"

Check that the Google BigQuery library is installed and if not, install it.


In [1]:
%%bash
sudo pip freeze | grep google-cloud-bigquery==1.6.1 || \
sudo pip install google-cloud-bigquery==1.6.1


google-cloud-bigquery==1.6.1

In [2]:
import os
from google.cloud import bigquery

In [ ]:
%%bash
export PROJECT=$(gcloud config list project --format "value(core.project)")
echo "Your current GCP Project Name is: "$PROJECT

The source dataset

Our dataset is hosted in BigQuery. The CDC's Natality data has details on US births from 1969 to 2008 and is a publically available dataset, meaning anyone with a GCP account has access. Click here to access the dataset.

The natality dataset is relatively large at almost 138 million rows and 31 columns, but simple to understand. weight_pounds is the target, the continuous value we’ll train a model to predict.

Create a BigQuery Dataset and Google Cloud Storage Bucket

A BigQuery dataset is a container for tables, views, and models built with BigQuery ML. Let's create one called babyweight. We'll do the same for a GCS bucket for our project too.


In [ ]:
%%bash

# Create a BigQuery dataset for babyweight if it doesn't exist
datasetexists=$(bq ls -d | grep -w babyweight)

if [ -n "$datasetexists" ]; then
    echo -e "BigQuery dataset already exists, let's not recreate it."

else
    echo "Creating BigQuery dataset titled: babyweight"
    
    bq --location=US mk --dataset \
        --description "Babyweight" \
        $PROJECT:babyweight
    echo "Here are your current datasets:"
    bq ls
fi
    
## Create GCS bucket if it doesn't exist already...
exists=$(gsutil ls -d | grep -w gs://${BUCKET}/)

if [ -n "$exists" ]; then
    echo -e "Bucket exists, let's not recreate it."
    
else
    echo "Creating a new GCS bucket."
    gsutil mb -l ${REGION} gs://${BUCKET}
    echo "Here are your current buckets:"
    gsutil ls
fi

Create the training and evaluation data tables

Since there is already a publicly available dataset, we can simply create the training and evaluation data tables using this raw input data. First we are going to create a subset of the data limiting our columns to weight_pounds, is_male, mother_age, plurality, and gestation_weeks as well as some simple filtering and a column to hash on for repeatable splitting.

  • Note: The dataset in the create table code below is the one created previously, e.g. "babyweight".

Preprocess and filter dataset

We have some preprocessing and filtering we would like to do to get our data in the right format for training.

Preprocessing:

  • Cast is_male from BOOL to STRING
  • Cast plurality from INTEGER to STRING where [1, 2, 3, 4, 5] becomes ["Single(1)", "Twins(2)", "Triplets(3)", "Quadruplets(4)", "Quintuplets(5)"]
  • Add hashcolumn hashing on year and month

Filtering:

  • Only want data for years later than 2000
  • Only want baby weights greater than 0
  • Only want mothers whose age is greater than 0
  • Only want plurality to be greater than 0
  • Only want the number of weeks of gestation to be greater than 0

In [6]:
%%bigquery
CREATE OR REPLACE TABLE
    babyweight.babyweight_data AS
SELECT
    weight_pounds,
    CAST(is_male AS STRING) AS is_male,
    mother_age,
    CASE
        WHEN plurality = 1 THEN "Single(1)"
        WHEN plurality = 2 THEN "Twins(2)"
        WHEN plurality = 3 THEN "Triplets(3)"
        WHEN plurality = 4 THEN "Quadruplets(4)"
        WHEN plurality = 5 THEN "Quintuplets(5)"
    END AS plurality,
    gestation_weeks,
    FARM_FINGERPRINT(
        CONCAT(
            CAST(year AS STRING),
            CAST(month AS STRING)
        )
    ) AS hashmonth
FROM
    publicdata.samples.natality
WHERE
    year > 2000
    AND weight_pounds > 0
    AND mother_age > 0
    AND plurality > 0
    AND gestation_weeks > 0

Augment dataset to simulate missing data

Now we want to augment our dataset with our simulated babyweight data by setting all gender information to Unknown and setting plurality of all non-single births to Multiple(2+).


In [7]:
%%bigquery
CREATE OR REPLACE TABLE
    babyweight.babyweight_augmented_data AS
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks,
    hashmonth
FROM
    babyweight.babyweight_data
UNION ALL
SELECT
    weight_pounds,
    "Unknown" AS is_male,
    mother_age,
    CASE
        WHEN plurality = "Single(1)" THEN plurality
        ELSE "Multiple(2+)"
    END AS plurality,
    gestation_weeks,
    hashmonth
FROM
    babyweight.babyweight_data

Split augmented dataset into train and eval sets

Using hashmonth, apply a modulo to get approximately a 75/25 train/eval split.

Split augmented dataset into train dataset


In [8]:
%%bigquery
CREATE OR REPLACE TABLE
    babyweight.babyweight_data_train AS
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks
FROM
    babyweight.babyweight_augmented_data
WHERE
    ABS(MOD(hashmonth, 4)) < 3

Split augmented dataset into eval dataset


In [9]:
%%bigquery
CREATE OR REPLACE TABLE
    babyweight.babyweight_data_eval AS
SELECT
    weight_pounds,
    is_male,
    mother_age,
    plurality,
    gestation_weeks
FROM
    babyweight.babyweight_augmented_data
WHERE
    ABS(MOD(hashmonth, 4)) = 3

Verify table creation

Verify that you created the dataset and training data table.


In [10]:
%%bigquery
-- LIMIT 0 is a free query; this allows us to check that the table exists.
SELECT * FROM babyweight.babyweight_data_train
LIMIT 0

In [11]:
%%bigquery
-- LIMIT 0 is a free query; this allows us to check that the table exists.
SELECT * FROM babyweight.babyweight_data_eval
LIMIT 0

Export from BigQuery to CSVs in GCS

Use BigQuery Python API to export our train and eval tables to Google Cloud Storage in the CSV format to be used later for TensorFlow/Keras training. We'll want to use the dataset we've been using above as well as repeat the process for both training and evaluation data.


In [14]:
# Construct a BigQuery client object.
client = bigquery.Client()

dataset_name = "babyweight"

# Create dataset reference object
dataset_ref = client.dataset(
    dataset_id=dataset_name, project=client.project)

# Export both train and eval tables
for step in ["train", "eval"]:
    destination_uri = os.path.join(
        "gs://", BUCKET, dataset_name, "data", "{}*.csv".format(step))
    table_name = "babyweight_data_{}".format(step)
    table_ref = dataset_ref.table(table_name)
    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        # Location must match that of the source table.
        location="US",
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print("Exported {}:{}.{} to {}".format(
        client.project, dataset_name, table_name, destination_uri))

Verify CSV creation

Verify that we correctly created the CSV files in our bucket.


In [15]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/data/*.csv

Check data exists

Verify that you previously created CSV files we'll be using for training and evaluation.


In [ ]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/data/*000000000000.csv

In [ ]:
%%bash
mkdir -p babyweight/trainer
touch babyweight/trainer/__init__.py

We then use the %%writefile magic to write the contents of the cell below to a file called task.py in the babyweight/trainer folder.

Create trainer module's task.py to hold hyperparameter argparsing code.

The cell below writes the file babyweight/trainer/task.py which sets up our training job. Here is where we determine which parameters of our model to pass as flags during training using the parser module. Look at how batch_size is passed to the model in the code below. Use this as an example to parse arguements for the following variables

  • nnsize which represents the hidden layer sizes to use for DNN feature columns
  • nembeds which represents the embedding size of a cross of n key real-valued parameters
  • train_examples which represents the number of examples (in thousands) to run the training job
  • eval_steps which represents the positive number of steps for which to evaluate model

Be sure to include a default value for the parsed arguments above and specfy the type if necessary.


In [ ]:
%%writefile babyweight/trainer/task.py
import argparse
import json
import os

from trainer import model

import tensorflow as tf

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--job-dir",
        help="this model ignores this field, but it is required by gcloud",
        default="junk"
    )
    parser.add_argument(
        "--train_data_path",
        help="GCS location of training data",
        required=True
    )
    parser.add_argument(
        "--eval_data_path",
        help="GCS location of evaluation data",
        required=True
    )
    parser.add_argument(
        "--output_dir",
        help="GCS location to write checkpoints and export models",
        required=True
    )
    parser.add_argument(
        "--batch_size",
        help="Number of examples to compute gradient over.",
        type=int,
        default=512
    )
    parser.add_argument(
        "--nnsize",
        help="Hidden layer sizes for DNN -- provide space-separated layers",
        nargs="+",
        type=int,
        default=[128, 32, 4]
    )
    parser.add_argument(
        "--nembeds",
        help="Embedding size of a cross of n key real-valued parameters",
        type=int,
        default=3
    )
    parser.add_argument(
        "--num_epochs",
        help="Number of epochs to train the model.",
        type=int,
        default=10
    )
    parser.add_argument(
        "--train_examples",
        help="""Number of examples (in thousands) to run the training job over.
        If this is more than actual # of examples available, it cycles through
        them. So specifying 1000 here when you have only 100k examples makes
        this 10 epochs.""",
        type=int,
        default=5000
    )
    parser.add_argument(
        "--eval_steps",
        help="""Positive number of steps for which to evaluate model. Default
        to None, which means to evaluate until input_fn raises an end-of-input
        exception""",
        type=int,
        default=None
    )

    # Parse all arguments
    args = parser.parse_args()
    arguments = args.__dict__

    # Unused args provided by service
    arguments.pop("job_dir", None)
    arguments.pop("job-dir", None)

    # Modify some arguments
    arguments["train_examples"] *= 1000

    # Append trial_id to path if we are doing hptuning
    # This code can be removed if you are not using hyperparameter tuning
    arguments["output_dir"] = os.path.join(
        arguments["output_dir"],
        json.loads(
            os.environ.get("TF_CONFIG", "{}")
        ).get("task", {}).get("trial", "")
    )

    # Run the training job
    model.train_and_evaluate(arguments)

In the same way we can write to the file model.py the model that we developed in the previous notebooks.

Create trainer module's model.py to hold Keras model code.

To create our model.py, we'll use the code we wrote for the Wide & Deep model. Look back at your 9_keras_wide_and_deep_babyweight notebook and copy/paste the necessary code from that notebook into its place in the cell below.


In [ ]:
%%writefile babyweight/trainer/model.py
import datetime
import os
import shutil
import numpy as np
import tensorflow as tf
import hypertune

# Determine CSV, label, and key columns
CSV_COLUMNS = ["weight_pounds",
               "is_male",
               "mother_age",
               "plurality",
               "gestation_weeks"]
LABEL_COLUMN = "weight_pounds"

# Set default values for each CSV column.
# Treat is_male and plurality as strings.
DEFAULTS = [[0.0], ["null"], [0.0], ["null"], [0.0]]


def features_and_labels(row_data):
    """Splits features and labels from feature dictionary.

    Args:
        row_data: Dictionary of CSV column names and tensor values.
    Returns:
        Dictionary of feature tensors and label tensor.
    """
    label = row_data.pop(LABEL_COLUMN)

    return row_data, label  # features, label


def load_dataset(pattern, batch_size=1, mode='eval'):
    """Loads dataset using the tf.data API from CSV files.

    Args:
        pattern: str, file pattern to glob into list of files.
        batch_size: int, the number of examples per batch.
        mode: 'train' | 'eval' to determine if training or evaluating.
    Returns:
        `Dataset` object.
    """
    print("mode = {}".format(mode))
    # Make a CSV dataset
    dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=pattern,
        batch_size=batch_size,
        column_names=CSV_COLUMNS,
        column_defaults=DEFAULTS)

    # Map dataset to features and label
    dataset = dataset.map(map_func=features_and_labels)  # features, label

    # Shuffle and repeat for training
    if mode == 'train':
        dataset = dataset.shuffle(buffer_size=1000).repeat()

    # Take advantage of multi-threading; 1=AUTOTUNE
    dataset = dataset.prefetch(buffer_size=1)

    return dataset


def create_input_layers():
    """Creates dictionary of input layers for each feature.

    Returns:
        Dictionary of `tf.Keras.layers.Input` layers for each feature.
    """
    deep_inputs = {
        colname: tf.keras.layers.Input(
            name=colname, shape=(), dtype="float32")
        for colname in ["mother_age", "gestation_weeks"]
    }

    wide_inputs = {
        colname: tf.keras.layers.Input(
            name=colname, shape=(), dtype="string")
        for colname in ["is_male", "plurality"]
    }

    inputs = {**wide_inputs, **deep_inputs}

    return inputs


def categorical_fc(name, values):
    """Helper function to wrap categorical feature by indicator column.

    Args:
        name: str, name of feature.
        values: list, list of strings of categorical values.
    Returns:
        Categorical and indicator column of categorical feature.
    """
    cat_column = tf.feature_column.categorical_column_with_vocabulary_list(
            key=name, vocabulary_list=values)
    ind_column = tf.feature_column.indicator_column(
        categorical_column=cat_column)

    return cat_column, ind_column


def create_feature_columns(nembeds):
    """Creates wide and deep dictionaries of feature columns from inputs.

    Args:
        nembeds: int, number of dimensions to embed categorical column down to.
    Returns:
        Wide and deep dictionaries of feature columns.
    """
    deep_fc = {
        colname: tf.feature_column.numeric_column(key=colname)
        for colname in ["mother_age", "gestation_weeks"]
    }
    wide_fc = {}
    is_male, wide_fc["is_male"] = categorical_fc(
        "is_male", ["True", "False", "Unknown"])
    plurality, wide_fc["plurality"] = categorical_fc(
        "plurality", ["Single(1)", "Twins(2)", "Triplets(3)",
                      "Quadruplets(4)", "Quintuplets(5)", "Multiple(2+)"])

    # Bucketize the float fields. This makes them wide
    age_buckets = tf.feature_column.bucketized_column(
        source_column=deep_fc["mother_age"],
        boundaries=np.arange(15, 45, 1).tolist())
    wide_fc["age_buckets"] = tf.feature_column.indicator_column(
        categorical_column=age_buckets)

    gestation_buckets = tf.feature_column.bucketized_column(
        source_column=deep_fc["gestation_weeks"],
        boundaries=np.arange(17, 47, 1).tolist())
    wide_fc["gestation_buckets"] = tf.feature_column.indicator_column(
        categorical_column=gestation_buckets)

    # Cross all the wide columns, have to do the crossing before we one-hot
    crossed = tf.feature_column.crossed_column(
        keys=[age_buckets, gestation_buckets],
        hash_bucket_size=1000)
    deep_fc["crossed_embeds"] = tf.feature_column.embedding_column(
        categorical_column=crossed, dimension=nembeds)

    return wide_fc, deep_fc


def get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units):
    """Creates model architecture and returns outputs.

    Args:
        wide_inputs: Dense tensor used as inputs to wide side of model.
        deep_inputs: Dense tensor used as inputs to deep side of model.
        dnn_hidden_units: List of integers where length is number of hidden
            layers and ith element is the number of neurons at ith layer.
    Returns:
        Dense tensor output from the model.
    """
    # Hidden layers for the deep side
    layers = [int(x) for x in dnn_hidden_units]
    deep = deep_inputs
    for layerno, numnodes in enumerate(layers):
        deep = tf.keras.layers.Dense(
            units=numnodes,
            activation="relu",
            name="dnn_{}".format(layerno+1))(deep)
    deep_out = deep

    # Linear model for the wide side
    wide_out = tf.keras.layers.Dense(
        units=10, activation="relu", name="linear")(wide_inputs)

    # Concatenate the two sides
    both = tf.keras.layers.concatenate(
        inputs=[deep_out, wide_out], name="both")

    # Final output is a linear activation because this is regression
    output = tf.keras.layers.Dense(
        units=1, activation="linear", name="weight")(both)

    return output


def rmse(y_true, y_pred):
    """Calculates RMSE evaluation metric.

    Args:
        y_true: tensor, true labels.
        y_pred: tensor, predicted labels.
    Returns:
        Tensor with value of RMSE between true and predicted labels.
    """
    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))


def build_wide_deep_model(dnn_hidden_units=[64, 32], nembeds=3):
    """Builds wide and deep model using Keras Functional API.

    Returns:
        `tf.keras.models.Model` object.
    """
    # Create input layers
    inputs = create_input_layers()

    # Create feature columns for both wide and deep
    wide_fc, deep_fc = create_feature_columns(nembeds)

    # The constructor for DenseFeatures takes a list of numeric columns
    # The Functional API in Keras requires: LayerConstructor()(inputs)
    wide_inputs = tf.keras.layers.DenseFeatures(
        feature_columns=wide_fc.values(), name="wide_inputs")(inputs)
    deep_inputs = tf.keras.layers.DenseFeatures(
        feature_columns=deep_fc.values(), name="deep_inputs")(inputs)

    # Get output of model given inputs
    output = get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units)

    # Build model and compile it all together
    model = tf.keras.models.Model(inputs=inputs, outputs=output)
    model.compile(optimizer="adam", loss="mse", metrics=[rmse, "mse"])

    return model


def train_and_evaluate(args):
    model = build_wide_deep_model(args["nnsize"], args["nembeds"])
    print("Here is our Wide-and-Deep architecture so far:\n")
    print(model.summary())

    trainds = load_dataset(
        args["train_data_path"],
        args["batch_size"],
        'train')

    evalds = load_dataset(
        args["eval_data_path"], 1000, 'eval')
    if args["eval_steps"]:
        evalds = evalds.take(count=args["eval_steps"])

    num_batches = args["batch_size"] * args["num_epochs"]
    steps_per_epoch = args["train_examples"] // num_batches

    checkpoint_path = os.path.join(args["output_dir"], "checkpoints/babyweight")
    cp_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath=checkpoint_path, verbose=1, save_weights_only=True)

    history = model.fit(
        trainds,
        validation_data=evalds,
        epochs=args["num_epochs"],
        steps_per_epoch=steps_per_epoch,
        verbose=2,  # 0=silent, 1=progress bar, 2=one line per epoch
        callbacks=[cp_callback])

    EXPORT_PATH = os.path.join(
        args["output_dir"], datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
    tf.saved_model.save(
        obj=model, export_dir=EXPORT_PATH)  # with default serving function
    
    hp_metric = history.history['val_rmse'][-1]

    hpt = hypertune.HyperTune()
    hpt.report_hyperparameter_tuning_metric(
        hyperparameter_metric_tag='rmse',
        metric_value=hp_metric,
        global_step=args['num_epochs'])
    
    print("Exported trained model to {}".format(EXPORT_PATH))

Training on Cloud AI Platform

Now that we see everything is working locally, it's time to train on the cloud!

To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname] and simply specify some additional parameters for AI Platform Training Service:

  • jobname: A unique identifier for the Cloud job. We usually append system time to ensure uniqueness
  • job-dir: A GCS location to upload the Python package to
  • runtime-version: Version of TF to use.
  • python-version: Version of Python to use. Currently only Python 3.7 is supported for TF 2.1.
  • region: Cloud region to train in. See here for supported AI Platform Training Service regions

Below the -- \ we add in the arguments for our task.py file.


In [ ]:
%%bash

OUTDIR=gs://${BUCKET}/babyweight/trained_model
JOBID=babyweight_$(date -u +%y%m%d_%H%M%S)

gcloud ai-platform jobs submit training ${JOBID} \
    --region=${REGION} \
    --module-name=trainer.task \
    --package-path=$(pwd)/babyweight/trainer \
    --job-dir=${OUTDIR} \
    --staging-bucket=gs://${BUCKET} \
    --master-machine-type=n1-standard-8 \
    --scale-tier=CUSTOM \
    --runtime-version=${TFVERSION} \
    --python-version=${PYTHONVERSION} \
    -- \
    --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
    --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
    --output_dir=${OUTDIR} \
    --num_epochs=10 \
    --train_examples=10000 \
    --eval_steps=100 \
    --batch_size=32 \
    --nembeds=8

The training job should complete within 15 to 20 minutes. You do not need to wait for this training job to finish before moving forward in the notebook, but will need a trained model.

Check our trained model files

Let's check the directory structure of our outputs of our trained model in folder we exported. We'll want to deploy the saved_model.pb within the timestamped directory as well as the variable values in the variables folder. Therefore, we need the path of the timestamped directory so that everything within it can be found by Cloud AI Platform's model deployment service.


In [10]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/trained_model

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License