Training at scale with AI Platform Training Service

Learning Objectives:

  1. Learn how to organize your training code into a Python package
  2. Train your model using cloud infrastructure via Google Cloud AI Platform Training Service
  3. (optional) Learn how to run your training package using Docker containers and push training Docker images on a Docker registry

Introduction

In this notebook we'll make the jump from training locally, to do training in the cloud. We'll take advantage of Google Cloud's AI Platform Training Service.

AI Platform Training Service is a managed service that allows the training and deployment of ML models without having to provision or maintain servers. The infrastructure is handled seamlessly by the managed service for us.

Specify your project name and bucket name in the cell below.


In [ ]:
# change these to try this notebook out
PROJECT = "<YOUR PROJECT>"
BUCKET = "<YOUR PROJECT>"
REGION = "<YOUR REGION>"

In [ ]:
import os
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = "2.1"

In [ ]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Make code compatible with AI Platform Training Service

In order to make our code compatible with AI Platform Training Service we need to make the following changes:

  1. Upload data to Google Cloud Storage
  2. Move code into a trainer Python package
  3. Submit training job with gcloud to train on AI Platform

Upload data to Google Cloud Storage (GCS)

Cloud services don't have access to our local files, so we need to upload them to a location the Cloud servers can read from. In this case we'll use GCS.

To do this run the notebook 0_export_data_from_bq_to_gcs.ipynb, which will export the taxifare data from BigQuery directly into a GCS bucket. If all ran smoothly, you should be able to list the data bucket by running the following command:


In [ ]:
!gsutil ls gs://$BUCKET/taxifare/data

Move code into a python package

The first thing to do is to convert you training code snippets into a regular Python package that we will then pip install into the Docker container.

A Python package is simply a collection of one or more .py files along with an __init__.py file to identify the containing directory as a package. The __init__.py sometimes contains initialization code but for our purposes an empty file suffices.

Create the package directory

Our package directory contains 3 files:


In [ ]:
ls ./taxifare/trainer/

Paste existing code into model.py

A Python package requires our code to be in a .py file, as opposed to notebook cells. So, we simply copy and paste our existing code for the previous notebook into a single file.

In the cell below, we write the contents of the cell into model.py packaging the model we developed in the previous labs so that we can deploy it to AI Platform Training Service.

Exercise.

There are two places to fill in TODOs in model.py.

  • in the build_dnn_model function, add code to use a optimizer with a custom learning rate.
  • in the train_and_evaluate function, add code to define variables using the hparams dictionary.

In [ ]:
%%writefile ./taxifare/trainer/model.py
#TODO 1
import datetime
import logging
import os
import shutil

import numpy as np
import tensorflow as tf

from tensorflow.keras import activations
from tensorflow.keras import callbacks
from tensorflow.keras import layers
from tensorflow.keras import models

from tensorflow import feature_column as fc

logging.info(tf.version.VERSION)


CSV_COLUMNS = [
        'fare_amount',
        'pickup_datetime',
        'pickup_longitude',
        'pickup_latitude',
        'dropoff_longitude',
        'dropoff_latitude',
        'passenger_count',
        'key',
]
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]
DAYS = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']


def features_and_labels(row_data):
    for unwanted_col in ['key']:
        row_data.pop(unwanted_col)
    label = row_data.pop(LABEL_COLUMN)
    return row_data, label


def load_dataset(pattern, batch_size, num_repeat):
    dataset = tf.data.experimental.make_csv_dataset(
        file_pattern=pattern,
        batch_size=batch_size,
        column_names=CSV_COLUMNS,
        column_defaults=DEFAULTS,
        num_epochs=num_repeat,
    )
    return dataset.map(features_and_labels)


def create_train_dataset(pattern, batch_size):
    dataset = load_dataset(pattern, batch_size, num_repeat=None)
    return dataset.prefetch(1)


def create_eval_dataset(pattern, batch_size):
    dataset = load_dataset(pattern, batch_size, num_repeat=1)
    return dataset.prefetch(1)


def parse_datetime(s):
    if type(s) is not str:
        s = s.numpy().decode('utf-8')
    return datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S %Z")


def euclidean(params):
    lon1, lat1, lon2, lat2 = params
    londiff = lon2 - lon1
    latdiff = lat2 - lat1
    return tf.sqrt(londiff*londiff + latdiff*latdiff)


def get_dayofweek(s):
    ts = parse_datetime(s)
    return DAYS[ts.weekday()]


@tf.function
def dayofweek(ts_in):
    return tf.map_fn(
        lambda s: tf.py_function(get_dayofweek, inp=[s], Tout=tf.string),
        ts_in
    )


@tf.function
def fare_thresh(x):
    return 60 * activations.relu(x)


def transform(inputs, NUMERIC_COLS, STRING_COLS, nbuckets):
    # Pass-through columns
    transformed = inputs.copy()
    del transformed['pickup_datetime']

    feature_columns = {
        colname: fc.numeric_column(colname)
        for colname in NUMERIC_COLS
    }

    # Scaling longitude from range [-70, -78] to [0, 1]
    for lon_col in ['pickup_longitude', 'dropoff_longitude']:
        transformed[lon_col] = layers.Lambda(
            lambda x: (x + 78)/8.0,
            name='scale_{}'.format(lon_col)
        )(inputs[lon_col])

    # Scaling latitude from range [37, 45] to [0, 1]
    for lat_col in ['pickup_latitude', 'dropoff_latitude']:
        transformed[lat_col] = layers.Lambda(
            lambda x: (x - 37)/8.0,
            name='scale_{}'.format(lat_col)
        )(inputs[lat_col])

    # Adding Euclidean dist (no need to be accurate: NN will calibrate it)
    transformed['euclidean'] = layers.Lambda(euclidean, name='euclidean')([
        inputs['pickup_longitude'],
        inputs['pickup_latitude'],
        inputs['dropoff_longitude'],
        inputs['dropoff_latitude']
    ])
    feature_columns['euclidean'] = fc.numeric_column('euclidean')

    # hour of day from timestamp of form '2010-02-08 09:17:00+00:00'
    transformed['hourofday'] = layers.Lambda(
        lambda x: tf.strings.to_number(
            tf.strings.substr(x, 11, 2), out_type=tf.dtypes.int32),
        name='hourofday'
    )(inputs['pickup_datetime'])
    feature_columns['hourofday'] = fc.indicator_column(
        fc.categorical_column_with_identity(
            'hourofday', num_buckets=24))

    latbuckets = np.linspace(0, 1, nbuckets).tolist()
    lonbuckets = np.linspace(0, 1, nbuckets).tolist()
    b_plat = fc.bucketized_column(
        feature_columns['pickup_latitude'], latbuckets)
    b_dlat = fc.bucketized_column(
            feature_columns['dropoff_latitude'], latbuckets)
    b_plon = fc.bucketized_column(
            feature_columns['pickup_longitude'], lonbuckets)
    b_dlon = fc.bucketized_column(
            feature_columns['dropoff_longitude'], lonbuckets)
    ploc = fc.crossed_column(
            [b_plat, b_plon], nbuckets * nbuckets)
    dloc = fc.crossed_column(
            [b_dlat, b_dlon], nbuckets * nbuckets)
    pd_pair = fc.crossed_column([ploc, dloc], nbuckets ** 4)
    feature_columns['pickup_and_dropoff'] = fc.embedding_column(
            pd_pair, 100)

    return transformed, feature_columns


def rmse(y_true, y_pred):
    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))


def build_dnn_model(nbuckets, nnsize, lr):
    # input layer is all float except for pickup_datetime which is a string
    STRING_COLS = ['pickup_datetime']
    NUMERIC_COLS = (
            set(CSV_COLUMNS) - set([LABEL_COLUMN, 'key']) - set(STRING_COLS)
    )
    inputs = {
        colname: layers.Input(name=colname, shape=(), dtype='float32')
        for colname in NUMERIC_COLS
    }
    inputs.update({
        colname: layers.Input(name=colname, shape=(), dtype='string')
        for colname in STRING_COLS
    })

    # transforms
    transformed, feature_columns = transform(
        inputs, NUMERIC_COLS, STRING_COLS, nbuckets=nbuckets)
    dnn_inputs = layers.DenseFeatures(feature_columns.values())(transformed)

    x = dnn_inputs
    for layer, nodes in enumerate(nnsize):
        x = layers.Dense(nodes, activation='relu', name='h{}'.format(layer))(x)
    output = layers.Dense(1, name='fare')(x)

    model = models.Model(inputs, output)
    
    lr_optimizer = # TODO: Your code goes here
    model.compile( # TODO: Your code goes here
    
    return model


def train_and_evaluate(hparams):
    batch_size = # TODO: Your code goes here
    nbuckets = # TODO: Your code goes here
    lr = # TODO: Your code goes here
    nnsize = hparams['nnsize']
    eval_data_path = hparams['eval_data_path']
    num_evals = hparams['num_evals']
    num_examples_to_train_on = hparams['num_examples_to_train_on']
    output_dir = hparams['output_dir']
    train_data_path = hparams['train_data_path']

    timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
    savedmodel_dir = os.path.join(output_dir, 'export/savedmodel')
    model_export_path = os.path.join(savedmodel_dir, timestamp)
    checkpoint_path = os.path.join(output_dir, 'checkpoints')
    tensorboard_path = os.path.join(output_dir, 'tensorboard')

    if tf.io.gfile.exists(output_dir):
        tf.io.gfile.rmtree(output_dir)

    model = build_dnn_model(nbuckets, nnsize, lr)
    logging.info(model.summary())

    trainds = create_train_dataset(train_data_path, batch_size)
    evalds = create_eval_dataset(eval_data_path, batch_size)

    steps_per_epoch = num_examples_to_train_on // (batch_size * num_evals)

    checkpoint_cb = callbacks.ModelCheckpoint(
        checkpoint_path,
        save_weights_only=True,
        verbose=1
    )
    tensorboard_cb = callbacks.TensorBoard(tensorboard_path)

    history = model.fit(
        trainds,
        validation_data=evalds,
        epochs=num_evals,
        steps_per_epoch=max(1, steps_per_epoch),
        verbose=2,  # 0=silent, 1=progress bar, 2=one line per epoch
        callbacks=[checkpoint_cb, tensorboard_cb]
    )

    # Exporting the model with default serving function.
    tf.saved_model.save(model, model_export_path)
    return history

Modify code to read data from and write checkpoint files to GCS

If you look closely above, you'll notice a new function, train_and_evaluate that wraps the code that actually trains the model. This allows us to parametrize the training by passing a dictionary of parameters to this function (e.g, batch_size, num_examples_to_train_on, train_data_path etc.)

This is useful because the output directory, data paths and number of train steps will be different depending on whether we're training locally or in the cloud. Parametrizing allows us to use the same code for both.

We specify these parameters at run time via the command line. Which means we need to add code to parse command line parameters and invoke train_and_evaluate() with those params. This is the job of the task.py file.

Exercise.

Fill in the TODOs in the code below to parse three additional command line arguments for the batch size, the learning rate and the number of buckets (i.e batch_size, lr and nbuckets).


In [ ]:
%%writefile taxifare/trainer/task.py
# TODO 1
import argparse

from trainer import model


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    
    # TODO: Your code goes here

    # TODO: Your code goes here
    
    # TODO: Your code goes here

    parser.add_argument(
        "--eval_data_path",
        help="GCS location pattern of eval files",
        required=True
    )
    parser.add_argument(
        "--nnsize",
        help="Hidden layer sizes (provide space-separated sizes)",
        nargs="+",
        type=int,
        default=[32, 8]
    )
        
    parser.add_argument(
        "--num_evals",
        help="Number of times to evaluate model on eval data training.",
        type=int,
        default=5
    )
    parser.add_argument(
        "--num_examples_to_train_on",
        help="Number of examples to train on.",
        type=int,
        default=100
    )
    parser.add_argument(
        "--output_dir",
        help="GCS location to write checkpoints and export models",
        required=True
    )
    parser.add_argument(
        "--train_data_path",
        help="GCS location pattern of train files containing eval URLs",
        required=True
    )
    parser.add_argument(
        "--job-dir",
        help="this model ignores this field, but it is required by gcloud",
        default="junk"
    )
    args = parser.parse_args()
    hparams = args.__dict__
    hparams.pop("job-dir", None)

    model.train_and_evaluate(hparams)

Run trainer module package locally

Now we can test our training code locally as follows using the local test data. We'll run a very small training job over a single file with a small batch size and one eval step.


In [ ]:
%%bash

EVAL_DATA_PATH=./taxifare/tests/data/taxi-valid*
TRAIN_DATA_PATH=./taxifare/tests/data/taxi-train*
OUTPUT_DIR=./taxifare-model

test ${OUTPUT_DIR} && rm -rf ${OUTPUT_DIR}
export PYTHONPATH=${PYTHONPATH}:${PWD}/taxifare
    
python3 -m trainer.task \
--eval_data_path $EVAL_DATA_PATH \
--output_dir $OUTPUT_DIR \
--train_data_path $TRAIN_DATA_PATH \
--batch_size 5 \
--num_examples_to_train_on 100 \
--num_evals 1 \
--nbuckets 10 \
--lr 0.001 \
--nnsize 32 8

Run your training package on Cloud AI Platform

Once the code works in standalone mode locally, you can run it on Cloud AI Platform. To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname] and simply specify some additional parameters for AI Platform Training Service:

  • jobid: A unique identifier for the Cloud job. We usually append system time to ensure uniqueness
  • region: Cloud region to train in. See here for supported AI Platform Training Service regions

The arguments before -- \ are for AI Platform Training Service. The arguments after -- \ are sent to our task.py.

Because this is on the entire dataset, it will take a while. You can monitor the job from the GCP console in the Cloud AI Platform section.

Exercise.

Fill in the TODOs in the code below to submit your job for training on Cloud AI Platform.


In [ ]:
%%bash
# TODO 2

# Output directory and jobID
OUTDIR=gs://${BUCKET}/taxifare/trained_model_$(date -u +%y%m%d_%H%M%S)
JOBID=taxifare_$(date -u +%y%m%d_%H%M%S)
echo ${OUTDIR} ${REGION} ${JOBID}
gsutil -m rm -rf ${OUTDIR}

# Model and training hyperparameters
BATCH_SIZE=50
NUM_EXAMPLES_TO_TRAIN_ON=100
NUM_EVALS=100
NBUCKETS=10
LR=0.001
NNSIZE="32 8"

# GCS paths
GCS_PROJECT_PATH=gs://$BUCKET/taxifare
DATA_PATH=$GCS_PROJECT_PATH/data
TRAIN_DATA_PATH=$DATA_PATH/taxi-train*
EVAL_DATA_PATH=$DATA_PATH/taxi-valid*

gcloud ai-platform jobs submit training $JOBID \
    --module-name= # TODO: Your code goes here
    --package-path= # TODO: Your code goes here
    --staging-bucket= # TODO: Your code goes here
    --python-version= # TODO: Your code goes here
    --runtime-version= # TODO: Your code goes here
    --region= # TODO: Your code goes here
    -- \
    --eval_data_path # TODO: Your code goes here
    --output_dir # TODO: Your code goes here
    --train_data_path # TODO: Your code goes here
    --batch_size # TODO: Your code goes here
    --num_examples_to_train_on # TODO: Your code goes here
    --num_evals # TODO: Your code goes here
    --nbuckets # TODO: Your code goes here
    --lr # TODO: Your code goes here
    --nnsize # TODO: Your code goes here

(Optional) Run your training package using Docker container

AI Platform Training also supports training in custom containers, allowing users to bring their own Docker containers with any pre-installed ML framework or algorithm to run on AI Platform Training.

In this last section, we'll see how to submit a Cloud training job using a customized Docker image.

Containerizing our ./taxifare/trainer package involves 3 steps:

  • Writing a Dockerfile in ./taxifare
  • Building the Docker image
  • Pusing it to the Google Cloud container registry in our GCP project

The Dockerfile specifies

  1. How the container needs to be provisioned so that all the dependencies in our code are satisfied
  2. Where to copy our trainer Package in the container and how to install it (pip install /trainer)
  3. What command to run when the container is ran (the ENTRYPOINT line)

In [ ]:
%%writefile ./taxifare/Dockerfile
FROM gcr.io/deeplearning-platform-release/tf2-cpu

COPY . /code

RUN apt-get update && \
    apt-get install --yes python3-pip && \
    pip3 install /code 

RUN python3 -m pip install --upgrade --quiet cloudml-hypertune

ENTRYPOINT ["python3", "/code/trainer/task.py"]

In [ ]:
!gcloud auth configure-docker

In [ ]:
%%bash 

PROJECT_DIR=$(cd ./taxifare && pwd)
PROJECT_ID=$(gcloud config list project --format "value(core.project)")
IMAGE_NAME=taxifare_training_container
DOCKERFILE=$PROJECT_DIR/Dockerfile
IMAGE_URI=gcr.io/$PROJECT_ID/$IMAGE_NAME

docker build $PROJECT_DIR -f $DOCKERFILE -t $IMAGE_URI

docker push $IMAGE_URI

Remark: If you prefer to build the container image from the command line, we have written a script for that ./taxifare/scripts/build.sh. This script reads its configuration from the file ./taxifare/scripts/env.sh. You can configure these arguments the way you want in that file. You can also simply type make build from within ./taxifare to build the image (which will invoke the build script). Similarly, we wrote the script ./taxifare/scripts/push.sh to push the Docker image, which you can also trigger by typing make push from within ./taxifare.

Train using a custom container on AI Platform

To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname] and simply specify some additional parameters for AI Platform Training Service:

  • jobname: A unique identifier for the Cloud job. We usually append system time to ensure uniqueness
  • master-image-uri: The uri of the Docker image we pushed in the Google Cloud registry
  • region: Cloud region to train in. See here for supported AI Platform Training Service regions

The arguments before -- \ are for AI Platform Training Service. The arguments after -- \ are sent to our task.py.

You can track your job and view logs using cloud console.


In [ ]:
%%bash

PROJECT_ID=$(gcloud config list project --format "value(core.project)")
BUCKET=$PROJECT_ID
REGION="us-central1"

# Output directory and jobID
OUTDIR=gs://${BUCKET}/taxifare/trained_model
JOBID=taxifare_container_$(date -u +%y%m%d_%H%M%S)
echo ${OUTDIR} ${REGION} ${JOBID}
gsutil -m rm -rf ${OUTDIR}

# Model and training hyperparameters
BATCH_SIZE=50
NUM_EXAMPLES_TO_TRAIN_ON=100
NUM_EVALS=100
NBUCKETS=10
NNSIZE="32 8"

# AI-Platform machines to use for training
MACHINE_TYPE=n1-standard-4
SCALE_TIER=CUSTOM

# GCS paths.
GCS_PROJECT_PATH=gs://$BUCKET/taxifare
DATA_PATH=$GCS_PROJECT_PATH/data
TRAIN_DATA_PATH=$DATA_PATH/taxi-train*
EVAL_DATA_PATH=$DATA_PATH/taxi-valid*

IMAGE_NAME=taxifare_training_container
IMAGE_URI=gcr.io/$PROJECT_ID/$IMAGE_NAME

gcloud beta ai-platform jobs submit training $JOBID \
   --staging-bucket=gs://$BUCKET \
   --region=$REGION \
   --master-image-uri=$IMAGE_URI \
   --master-machine-type=$MACHINE_TYPE \
   --scale-tier=$SCALE_TIER \
  -- \
  --eval_data_path $EVAL_DATA_PATH \
  --output_dir $OUTDIR \
  --train_data_path $TRAIN_DATA_PATH \
  --batch_size $BATCH_SIZE \
  --num_examples_to_train_on $NUM_EXAMPLES_TO_TRAIN_ON \
  --num_evals $NUM_EVALS \
  --nbuckets $NBUCKETS \
  --nnsize $NNSIZE

Remark: If you prefer submitting your jobs for training on the AI-platform using the command line, we have written the ./taxifare/scripts/submit.sh for you (that you can also invoke using make submit from within ./taxifare). As the other scripts, it reads it configuration variables from ./taxifare/scripts/env.sh.

Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License