Learning Objectives:
In this notebook we'll make the jump from training locally, to do training in the cloud. We'll take advantage of Google Cloud's AI Platform Training Service.
AI Platform Training Service is a managed service that allows the training and deployment of ML models without having to provision or maintain servers. The infrastructure is handled seamlessly by the managed service for us.
Specify your project name and bucket name in the cell below.
In [ ]:
# change these to try this notebook out
PROJECT = "<YOUR PROJECT>"
BUCKET = "<YOUR PROJECT>"
REGION = "<YOUR REGION>"
In [ ]:
import os
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = "2.1"
In [ ]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION
Cloud services don't have access to our local files, so we need to upload them to a location the Cloud servers can read from. In this case we'll use GCS.
To do this run the notebook 0_export_data_from_bq_to_gcs.ipynb, which will export the taxifare data from BigQuery directly into a GCS bucket. If all ran smoothly, you should be able to list the data bucket by running the following command:
In [ ]:
!gsutil ls gs://$BUCKET/taxifare/data
The first thing to do is to convert you training code snippets into a regular Python package that we will then pip install
into the Docker container.
A Python package is simply a collection of one or more .py
files along with an __init__.py
file to identify the containing directory as a package. The __init__.py
sometimes contains initialization code but for our purposes an empty file suffices.
Our package directory contains 3 files:
In [ ]:
ls ./taxifare/trainer/
In the cell below, we write the contents of the cell into model.py
packaging the model we
developed in the previous labs so that we can deploy it to AI Platform Training Service.
In [ ]:
%%writefile ./taxifare/trainer/model.py
#TODO 1
import datetime
import logging
import os
import shutil
import numpy as np
import tensorflow as tf
from tensorflow.keras import activations
from tensorflow.keras import callbacks
from tensorflow.keras import layers
from tensorflow.keras import models
from tensorflow import feature_column as fc
logging.info(tf.version.VERSION)
CSV_COLUMNS = [
'fare_amount',
'pickup_datetime',
'pickup_longitude',
'pickup_latitude',
'dropoff_longitude',
'dropoff_latitude',
'passenger_count',
'key',
]
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]
DAYS = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']
def features_and_labels(row_data):
for unwanted_col in ['key']:
row_data.pop(unwanted_col)
label = row_data.pop(LABEL_COLUMN)
return row_data, label
def load_dataset(pattern, batch_size, num_repeat):
dataset = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
num_epochs=num_repeat,
)
return dataset.map(features_and_labels)
def create_train_dataset(pattern, batch_size):
dataset = load_dataset(pattern, batch_size, num_repeat=None)
return dataset.prefetch(1)
def create_eval_dataset(pattern, batch_size):
dataset = load_dataset(pattern, batch_size, num_repeat=1)
return dataset.prefetch(1)
def parse_datetime(s):
if type(s) is not str:
s = s.numpy().decode('utf-8')
return datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S %Z")
def euclidean(params):
lon1, lat1, lon2, lat2 = params
londiff = lon2 - lon1
latdiff = lat2 - lat1
return tf.sqrt(londiff*londiff + latdiff*latdiff)
def get_dayofweek(s):
ts = parse_datetime(s)
return DAYS[ts.weekday()]
@tf.function
def dayofweek(ts_in):
return tf.map_fn(
lambda s: tf.py_function(get_dayofweek, inp=[s], Tout=tf.string),
ts_in
)
@tf.function
def fare_thresh(x):
return 60 * activations.relu(x)
def transform(inputs, NUMERIC_COLS, STRING_COLS, nbuckets):
# Pass-through columns
transformed = inputs.copy()
del transformed['pickup_datetime']
feature_columns = {
colname: fc.numeric_column(colname)
for colname in NUMERIC_COLS
}
# Scaling longitude from range [-70, -78] to [0, 1]
for lon_col in ['pickup_longitude', 'dropoff_longitude']:
transformed[lon_col] = layers.Lambda(
lambda x: (x + 78)/8.0,
name='scale_{}'.format(lon_col)
)(inputs[lon_col])
# Scaling latitude from range [37, 45] to [0, 1]
for lat_col in ['pickup_latitude', 'dropoff_latitude']:
transformed[lat_col] = layers.Lambda(
lambda x: (x - 37)/8.0,
name='scale_{}'.format(lat_col)
)(inputs[lat_col])
# Adding Euclidean dist (no need to be accurate: NN will calibrate it)
transformed['euclidean'] = layers.Lambda(euclidean, name='euclidean')([
inputs['pickup_longitude'],
inputs['pickup_latitude'],
inputs['dropoff_longitude'],
inputs['dropoff_latitude']
])
feature_columns['euclidean'] = fc.numeric_column('euclidean')
# hour of day from timestamp of form '2010-02-08 09:17:00+00:00'
transformed['hourofday'] = layers.Lambda(
lambda x: tf.strings.to_number(
tf.strings.substr(x, 11, 2), out_type=tf.dtypes.int32),
name='hourofday'
)(inputs['pickup_datetime'])
feature_columns['hourofday'] = fc.indicator_column(
fc.categorical_column_with_identity(
'hourofday', num_buckets=24))
latbuckets = np.linspace(0, 1, nbuckets).tolist()
lonbuckets = np.linspace(0, 1, nbuckets).tolist()
b_plat = fc.bucketized_column(
feature_columns['pickup_latitude'], latbuckets)
b_dlat = fc.bucketized_column(
feature_columns['dropoff_latitude'], latbuckets)
b_plon = fc.bucketized_column(
feature_columns['pickup_longitude'], lonbuckets)
b_dlon = fc.bucketized_column(
feature_columns['dropoff_longitude'], lonbuckets)
ploc = fc.crossed_column(
[b_plat, b_plon], nbuckets * nbuckets)
dloc = fc.crossed_column(
[b_dlat, b_dlon], nbuckets * nbuckets)
pd_pair = fc.crossed_column([ploc, dloc], nbuckets ** 4)
feature_columns['pickup_and_dropoff'] = fc.embedding_column(
pd_pair, 100)
return transformed, feature_columns
def rmse(y_true, y_pred):
return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))
def build_dnn_model(nbuckets, nnsize, lr):
# input layer is all float except for pickup_datetime which is a string
STRING_COLS = ['pickup_datetime']
NUMERIC_COLS = (
set(CSV_COLUMNS) - set([LABEL_COLUMN, 'key']) - set(STRING_COLS)
)
inputs = {
colname: layers.Input(name=colname, shape=(), dtype='float32')
for colname in NUMERIC_COLS
}
inputs.update({
colname: layers.Input(name=colname, shape=(), dtype='string')
for colname in STRING_COLS
})
# transforms
transformed, feature_columns = transform(
inputs, NUMERIC_COLS, STRING_COLS, nbuckets=nbuckets)
dnn_inputs = layers.DenseFeatures(feature_columns.values())(transformed)
x = dnn_inputs
for layer, nodes in enumerate(nnsize):
x = layers.Dense(nodes, activation='relu', name='h{}'.format(layer))(x)
output = layers.Dense(1, name='fare')(x)
model = models.Model(inputs, output)
lr_optimizer = # TODO: Your code goes here
model.compile( # TODO: Your code goes here
return model
def train_and_evaluate(hparams):
batch_size = # TODO: Your code goes here
nbuckets = # TODO: Your code goes here
lr = # TODO: Your code goes here
nnsize = hparams['nnsize']
eval_data_path = hparams['eval_data_path']
num_evals = hparams['num_evals']
num_examples_to_train_on = hparams['num_examples_to_train_on']
output_dir = hparams['output_dir']
train_data_path = hparams['train_data_path']
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
savedmodel_dir = os.path.join(output_dir, 'export/savedmodel')
model_export_path = os.path.join(savedmodel_dir, timestamp)
checkpoint_path = os.path.join(output_dir, 'checkpoints')
tensorboard_path = os.path.join(output_dir, 'tensorboard')
if tf.io.gfile.exists(output_dir):
tf.io.gfile.rmtree(output_dir)
model = build_dnn_model(nbuckets, nnsize, lr)
logging.info(model.summary())
trainds = create_train_dataset(train_data_path, batch_size)
evalds = create_eval_dataset(eval_data_path, batch_size)
steps_per_epoch = num_examples_to_train_on // (batch_size * num_evals)
checkpoint_cb = callbacks.ModelCheckpoint(
checkpoint_path,
save_weights_only=True,
verbose=1
)
tensorboard_cb = callbacks.TensorBoard(tensorboard_path)
history = model.fit(
trainds,
validation_data=evalds,
epochs=num_evals,
steps_per_epoch=max(1, steps_per_epoch),
verbose=2, # 0=silent, 1=progress bar, 2=one line per epoch
callbacks=[checkpoint_cb, tensorboard_cb]
)
# Exporting the model with default serving function.
tf.saved_model.save(model, model_export_path)
return history
If you look closely above, you'll notice a new function, train_and_evaluate
that wraps the code that actually trains the model. This allows us to parametrize the training by passing a dictionary of parameters to this function (e.g, batch_size
, num_examples_to_train_on
, train_data_path
etc.)
This is useful because the output directory, data paths and number of train steps will be different depending on whether we're training locally or in the cloud. Parametrizing allows us to use the same code for both.
We specify these parameters at run time via the command line. Which means we need to add code to parse command line parameters and invoke train_and_evaluate()
with those params. This is the job of the task.py
file.
In [ ]:
%%writefile taxifare/trainer/task.py
# TODO 1
import argparse
from trainer import model
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# TODO: Your code goes here
# TODO: Your code goes here
# TODO: Your code goes here
parser.add_argument(
"--eval_data_path",
help="GCS location pattern of eval files",
required=True
)
parser.add_argument(
"--nnsize",
help="Hidden layer sizes (provide space-separated sizes)",
nargs="+",
type=int,
default=[32, 8]
)
parser.add_argument(
"--num_evals",
help="Number of times to evaluate model on eval data training.",
type=int,
default=5
)
parser.add_argument(
"--num_examples_to_train_on",
help="Number of examples to train on.",
type=int,
default=100
)
parser.add_argument(
"--output_dir",
help="GCS location to write checkpoints and export models",
required=True
)
parser.add_argument(
"--train_data_path",
help="GCS location pattern of train files containing eval URLs",
required=True
)
parser.add_argument(
"--job-dir",
help="this model ignores this field, but it is required by gcloud",
default="junk"
)
args = parser.parse_args()
hparams = args.__dict__
hparams.pop("job-dir", None)
model.train_and_evaluate(hparams)
In [ ]:
%%bash
EVAL_DATA_PATH=./taxifare/tests/data/taxi-valid*
TRAIN_DATA_PATH=./taxifare/tests/data/taxi-train*
OUTPUT_DIR=./taxifare-model
test ${OUTPUT_DIR} && rm -rf ${OUTPUT_DIR}
export PYTHONPATH=${PYTHONPATH}:${PWD}/taxifare
python3 -m trainer.task \
--eval_data_path $EVAL_DATA_PATH \
--output_dir $OUTPUT_DIR \
--train_data_path $TRAIN_DATA_PATH \
--batch_size 5 \
--num_examples_to_train_on 100 \
--num_evals 1 \
--nbuckets 10 \
--lr 0.001 \
--nnsize 32 8
Once the code works in standalone mode locally, you can run it on Cloud AI Platform. To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname]
and simply specify some additional parameters for AI Platform Training Service:
The arguments before -- \
are for AI Platform Training Service.
The arguments after -- \
are sent to our task.py
.
Because this is on the entire dataset, it will take a while. You can monitor the job from the GCP console in the Cloud AI Platform section.
In [ ]:
%%bash
# TODO 2
# Output directory and jobID
OUTDIR=gs://${BUCKET}/taxifare/trained_model_$(date -u +%y%m%d_%H%M%S)
JOBID=taxifare_$(date -u +%y%m%d_%H%M%S)
echo ${OUTDIR} ${REGION} ${JOBID}
gsutil -m rm -rf ${OUTDIR}
# Model and training hyperparameters
BATCH_SIZE=50
NUM_EXAMPLES_TO_TRAIN_ON=100
NUM_EVALS=100
NBUCKETS=10
LR=0.001
NNSIZE="32 8"
# GCS paths
GCS_PROJECT_PATH=gs://$BUCKET/taxifare
DATA_PATH=$GCS_PROJECT_PATH/data
TRAIN_DATA_PATH=$DATA_PATH/taxi-train*
EVAL_DATA_PATH=$DATA_PATH/taxi-valid*
gcloud ai-platform jobs submit training $JOBID \
--module-name= # TODO: Your code goes here
--package-path= # TODO: Your code goes here
--staging-bucket= # TODO: Your code goes here
--python-version= # TODO: Your code goes here
--runtime-version= # TODO: Your code goes here
--region= # TODO: Your code goes here
-- \
--eval_data_path # TODO: Your code goes here
--output_dir # TODO: Your code goes here
--train_data_path # TODO: Your code goes here
--batch_size # TODO: Your code goes here
--num_examples_to_train_on # TODO: Your code goes here
--num_evals # TODO: Your code goes here
--nbuckets # TODO: Your code goes here
--lr # TODO: Your code goes here
--nnsize # TODO: Your code goes here
AI Platform Training also supports training in custom containers, allowing users to bring their own Docker containers with any pre-installed ML framework or algorithm to run on AI Platform Training.
In this last section, we'll see how to submit a Cloud training job using a customized Docker image.
Containerizing our ./taxifare/trainer
package involves 3 steps:
./taxifare
The Dockerfile
specifies
pip install /trainer
)ENTRYPOINT
line)
In [ ]:
%%writefile ./taxifare/Dockerfile
FROM gcr.io/deeplearning-platform-release/tf2-cpu
COPY . /code
RUN apt-get update && \
apt-get install --yes python3-pip && \
pip3 install /code
RUN python3 -m pip install --upgrade --quiet cloudml-hypertune
ENTRYPOINT ["python3", "/code/trainer/task.py"]
In [ ]:
!gcloud auth configure-docker
In [ ]:
%%bash
PROJECT_DIR=$(cd ./taxifare && pwd)
PROJECT_ID=$(gcloud config list project --format "value(core.project)")
IMAGE_NAME=taxifare_training_container
DOCKERFILE=$PROJECT_DIR/Dockerfile
IMAGE_URI=gcr.io/$PROJECT_ID/$IMAGE_NAME
docker build $PROJECT_DIR -f $DOCKERFILE -t $IMAGE_URI
docker push $IMAGE_URI
Remark: If you prefer to build the container image from the command line, we have written a script for that ./taxifare/scripts/build.sh
. This script reads its configuration from the file ./taxifare/scripts/env.sh
. You can configure these arguments the way you want in that file. You can also simply type make build
from within ./taxifare
to build the image (which will invoke the build script). Similarly, we wrote the script ./taxifare/scripts/push.sh
to push the Docker image, which you can also trigger by typing make push
from within ./taxifare
.
To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname]
and simply specify some additional parameters for AI Platform Training Service:
The arguments before -- \
are for AI Platform Training Service.
The arguments after -- \
are sent to our task.py
.
You can track your job and view logs using cloud console.
In [ ]:
%%bash
PROJECT_ID=$(gcloud config list project --format "value(core.project)")
BUCKET=$PROJECT_ID
REGION="us-central1"
# Output directory and jobID
OUTDIR=gs://${BUCKET}/taxifare/trained_model
JOBID=taxifare_container_$(date -u +%y%m%d_%H%M%S)
echo ${OUTDIR} ${REGION} ${JOBID}
gsutil -m rm -rf ${OUTDIR}
# Model and training hyperparameters
BATCH_SIZE=50
NUM_EXAMPLES_TO_TRAIN_ON=100
NUM_EVALS=100
NBUCKETS=10
NNSIZE="32 8"
# AI-Platform machines to use for training
MACHINE_TYPE=n1-standard-4
SCALE_TIER=CUSTOM
# GCS paths.
GCS_PROJECT_PATH=gs://$BUCKET/taxifare
DATA_PATH=$GCS_PROJECT_PATH/data
TRAIN_DATA_PATH=$DATA_PATH/taxi-train*
EVAL_DATA_PATH=$DATA_PATH/taxi-valid*
IMAGE_NAME=taxifare_training_container
IMAGE_URI=gcr.io/$PROJECT_ID/$IMAGE_NAME
gcloud beta ai-platform jobs submit training $JOBID \
--staging-bucket=gs://$BUCKET \
--region=$REGION \
--master-image-uri=$IMAGE_URI \
--master-machine-type=$MACHINE_TYPE \
--scale-tier=$SCALE_TIER \
-- \
--eval_data_path $EVAL_DATA_PATH \
--output_dir $OUTDIR \
--train_data_path $TRAIN_DATA_PATH \
--batch_size $BATCH_SIZE \
--num_examples_to_train_on $NUM_EXAMPLES_TO_TRAIN_ON \
--num_evals $NUM_EVALS \
--nbuckets $NBUCKETS \
--nnsize $NNSIZE
Remark: If you prefer submitting your jobs for training on the AI-platform using the command line, we have written the ./taxifare/scripts/submit.sh
for you (that you can also invoke using make submit
from within ./taxifare
). As the other scripts, it reads it configuration variables from ./taxifare/scripts/env.sh
.
Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License