This notebook illustrates distributed training on Cloud AI Platform. This uses Keras and requires TensorFlow 2.1
In [ ]:
# Ensure the right version of Tensorflow is installed.
!pip freeze | grep tensorflow==2.1
Set environment variables so that we can use them throughout the entire notebook. We will be using our project name for our bucket, so you only need to change your project and region.
In [2]:
# change these to try this notebook out
BUCKET = 'cloud-training-demos-ml' # Replace with the your bucket name
PROJECT = 'cloud-training-demos' # Replace with your project-id
REGION = 'us-central1'
In [ ]:
import os
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "2.1"
os.environ["PYTHONVERSION"] = "3.7"
Check that the Google BigQuery library is installed and if not, install it.
In [1]:
%%bash
sudo pip freeze | grep google-cloud-bigquery==1.6.1 || \
sudo pip install google-cloud-bigquery==1.6.1
In [2]:
import os
from google.cloud import bigquery
In [ ]:
%%bash
export PROJECT=$(gcloud config list project --format "value(core.project)")
echo "Your current GCP Project Name is: "$PROJECT
Our dataset is hosted in BigQuery. The CDC's Natality data has details on US births from 1969 to 2008 and is a publically available dataset, meaning anyone with a GCP account has access. Click here to access the dataset.
The natality dataset is relatively large at almost 138 million rows and 31 columns, but simple to understand. weight_pounds
is the target, the continuous value we’ll train a model to predict.
In [ ]:
%%bash
# Create a BigQuery dataset for babyweight if it doesn't exist
datasetexists=$(bq ls -d | grep -w babyweight)
if [ -n "$datasetexists" ]; then
echo -e "BigQuery dataset already exists, let's not recreate it."
else
echo "Creating BigQuery dataset titled: babyweight"
bq --location=US mk --dataset \
--description "Babyweight" \
$PROJECT:babyweight
echo "Here are your current datasets:"
bq ls
fi
## Create GCS bucket if it doesn't exist already...
exists=$(gsutil ls -d | grep -w gs://${BUCKET}/)
if [ -n "$exists" ]; then
echo -e "Bucket exists, let's not recreate it."
else
echo "Creating a new GCS bucket."
gsutil mb -l ${REGION} gs://${BUCKET}
echo "Here are your current buckets:"
gsutil ls
fi
Since there is already a publicly available dataset, we can simply create the training and evaluation data tables using this raw input data. First we are going to create a subset of the data limiting our columns to weight_pounds
, is_male
, mother_age
, plurality
, and gestation_weeks
as well as some simple filtering and a column to hash on for repeatable splitting.
We have some preprocessing and filtering we would like to do to get our data in the right format for training.
Preprocessing:
is_male
from BOOL
to STRING
plurality
from INTEGER
to STRING
where [1, 2, 3, 4, 5]
becomes ["Single(1)", "Twins(2)", "Triplets(3)", "Quadruplets(4)", "Quintuplets(5)"]
hashcolumn
hashing on year
and month
Filtering:
2000
0
0
0
0
In [6]:
%%bigquery
CREATE OR REPLACE TABLE
babyweight.babyweight_data AS
SELECT
weight_pounds,
CAST(is_male AS STRING) AS is_male,
mother_age,
CASE
WHEN plurality = 1 THEN "Single(1)"
WHEN plurality = 2 THEN "Twins(2)"
WHEN plurality = 3 THEN "Triplets(3)"
WHEN plurality = 4 THEN "Quadruplets(4)"
WHEN plurality = 5 THEN "Quintuplets(5)"
END AS plurality,
gestation_weeks,
FARM_FINGERPRINT(
CONCAT(
CAST(year AS STRING),
CAST(month AS STRING)
)
) AS hashmonth
FROM
publicdata.samples.natality
WHERE
year > 2000
AND weight_pounds > 0
AND mother_age > 0
AND plurality > 0
AND gestation_weeks > 0
Out[6]:
In [7]:
%%bigquery
CREATE OR REPLACE TABLE
babyweight.babyweight_augmented_data AS
SELECT
weight_pounds,
is_male,
mother_age,
plurality,
gestation_weeks,
hashmonth
FROM
babyweight.babyweight_data
UNION ALL
SELECT
weight_pounds,
"Unknown" AS is_male,
mother_age,
CASE
WHEN plurality = "Single(1)" THEN plurality
ELSE "Multiple(2+)"
END AS plurality,
gestation_weeks,
hashmonth
FROM
babyweight.babyweight_data
Out[7]:
In [8]:
%%bigquery
CREATE OR REPLACE TABLE
babyweight.babyweight_data_train AS
SELECT
weight_pounds,
is_male,
mother_age,
plurality,
gestation_weeks
FROM
babyweight.babyweight_augmented_data
WHERE
ABS(MOD(hashmonth, 4)) < 3
Out[8]:
In [9]:
%%bigquery
CREATE OR REPLACE TABLE
babyweight.babyweight_data_eval AS
SELECT
weight_pounds,
is_male,
mother_age,
plurality,
gestation_weeks
FROM
babyweight.babyweight_augmented_data
WHERE
ABS(MOD(hashmonth, 4)) = 3
Out[9]:
In [10]:
%%bigquery
-- LIMIT 0 is a free query; this allows us to check that the table exists.
SELECT * FROM babyweight.babyweight_data_train
LIMIT 0
Out[10]:
In [11]:
%%bigquery
-- LIMIT 0 is a free query; this allows us to check that the table exists.
SELECT * FROM babyweight.babyweight_data_eval
LIMIT 0
Out[11]:
Use BigQuery Python API to export our train and eval tables to Google Cloud Storage in the CSV format to be used later for TensorFlow/Keras training. We'll want to use the dataset we've been using above as well as repeat the process for both training and evaluation data.
In [14]:
# Construct a BigQuery client object.
client = bigquery.Client()
dataset_name = "babyweight"
# Create dataset reference object
dataset_ref = client.dataset(
dataset_id=dataset_name, project=client.project)
# Export both train and eval tables
for step in ["train", "eval"]:
destination_uri = os.path.join(
"gs://", BUCKET, dataset_name, "data", "{}*.csv".format(step))
table_name = "babyweight_data_{}".format(step)
table_ref = dataset_ref.table(table_name)
extract_job = client.extract_table(
table_ref,
destination_uri,
# Location must match that of the source table.
location="US",
) # API request
extract_job.result() # Waits for job to complete.
print("Exported {}:{}.{} to {}".format(
client.project, dataset_name, table_name, destination_uri))
In [15]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/data/*.csv
In [ ]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/data/*000000000000.csv
In [ ]:
%%bash
mkdir -p babyweight/trainer
touch babyweight/trainer/__init__.py
We then use the %%writefile
magic to write the contents of the cell below to a file called task.py
in the babyweight/trainer
folder.
The cell below writes the file babyweight/trainer/task.py
which sets up our training job. Here is where we determine which parameters of our model to pass as flags during training using the parser
module. Look at how batch_size
is passed to the model in the code below. Use this as an example to parse arguements for the following variables
nnsize
which represents the hidden layer sizes to use for DNN feature columnsnembeds
which represents the embedding size of a cross of n key real-valued parameterstrain_examples
which represents the number of examples (in thousands) to run the training jobeval_steps
which represents the positive number of steps for which to evaluate modelBe sure to include a default value for the parsed arguments above and specfy the type
if necessary.
In [ ]:
%%writefile babyweight/trainer/task.py
import argparse
import json
import os
from trainer import model
import tensorflow as tf
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--job-dir",
help="this model ignores this field, but it is required by gcloud",
default="junk"
)
parser.add_argument(
"--train_data_path",
help="GCS location of training data",
required=True
)
parser.add_argument(
"--eval_data_path",
help="GCS location of evaluation data",
required=True
)
parser.add_argument(
"--output_dir",
help="GCS location to write checkpoints and export models",
required=True
)
parser.add_argument(
"--batch_size",
help="Number of examples to compute gradient over.",
type=int,
default=512
)
parser.add_argument(
"--nnsize",
help="Hidden layer sizes for DNN -- provide space-separated layers",
nargs="+",
type=int,
default=[128, 32, 4]
)
parser.add_argument(
"--nembeds",
help="Embedding size of a cross of n key real-valued parameters",
type=int,
default=3
)
parser.add_argument(
"--num_epochs",
help="Number of epochs to train the model.",
type=int,
default=10
)
parser.add_argument(
"--train_examples",
help="""Number of examples (in thousands) to run the training job over.
If this is more than actual # of examples available, it cycles through
them. So specifying 1000 here when you have only 100k examples makes
this 10 epochs.""",
type=int,
default=5000
)
parser.add_argument(
"--eval_steps",
help="""Positive number of steps for which to evaluate model. Default
to None, which means to evaluate until input_fn raises an end-of-input
exception""",
type=int,
default=None
)
# Parse all arguments
args = parser.parse_args()
arguments = args.__dict__
# Unused args provided by service
arguments.pop("job_dir", None)
arguments.pop("job-dir", None)
# Modify some arguments
arguments["train_examples"] *= 1000
# Append trial_id to path if we are doing hptuning
# This code can be removed if you are not using hyperparameter tuning
arguments["output_dir"] = os.path.join(
arguments["output_dir"],
json.loads(
os.environ.get("TF_CONFIG", "{}")
).get("task", {}).get("trial", "")
)
# Run the training job
model.train_and_evaluate(arguments)
In the same way we can write to the file model.py
the model that we developed in the previous notebooks.
To create our model.py
, we'll use the code we wrote for the Wide & Deep model. Look back at your 9_keras_wide_and_deep_babyweight notebook and copy/paste the necessary code from that notebook into its place in the cell below.
In [ ]:
%%writefile babyweight/trainer/model.py
import datetime
import os
import shutil
import numpy as np
import tensorflow as tf
import hypertune
# Determine CSV, label, and key columns
CSV_COLUMNS = ["weight_pounds",
"is_male",
"mother_age",
"plurality",
"gestation_weeks"]
LABEL_COLUMN = "weight_pounds"
# Set default values for each CSV column.
# Treat is_male and plurality as strings.
DEFAULTS = [[0.0], ["null"], [0.0], ["null"], [0.0]]
def features_and_labels(row_data):
"""Splits features and labels from feature dictionary.
Args:
row_data: Dictionary of CSV column names and tensor values.
Returns:
Dictionary of feature tensors and label tensor.
"""
label = row_data.pop(LABEL_COLUMN)
return row_data, label # features, label
def load_dataset(pattern, batch_size=1, mode='eval'):
"""Loads dataset using the tf.data API from CSV files.
Args:
pattern: str, file pattern to glob into list of files.
batch_size: int, the number of examples per batch.
mode: 'train' | 'eval' to determine if training or evaluating.
Returns:
`Dataset` object.
"""
print("mode = {}".format(mode))
# Make a CSV dataset
dataset = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS)
# Map dataset to features and label
dataset = dataset.map(map_func=features_and_labels) # features, label
# Shuffle and repeat for training
if mode == 'train':
dataset = dataset.shuffle(buffer_size=1000).repeat()
# Take advantage of multi-threading; 1=AUTOTUNE
dataset = dataset.prefetch(buffer_size=1)
return dataset
def create_input_layers():
"""Creates dictionary of input layers for each feature.
Returns:
Dictionary of `tf.Keras.layers.Input` layers for each feature.
"""
deep_inputs = {
colname: tf.keras.layers.Input(
name=colname, shape=(), dtype="float32")
for colname in ["mother_age", "gestation_weeks"]
}
wide_inputs = {
colname: tf.keras.layers.Input(
name=colname, shape=(), dtype="string")
for colname in ["is_male", "plurality"]
}
inputs = {**wide_inputs, **deep_inputs}
return inputs
def categorical_fc(name, values):
"""Helper function to wrap categorical feature by indicator column.
Args:
name: str, name of feature.
values: list, list of strings of categorical values.
Returns:
Categorical and indicator column of categorical feature.
"""
cat_column = tf.feature_column.categorical_column_with_vocabulary_list(
key=name, vocabulary_list=values)
ind_column = tf.feature_column.indicator_column(
categorical_column=cat_column)
return cat_column, ind_column
def create_feature_columns(nembeds):
"""Creates wide and deep dictionaries of feature columns from inputs.
Args:
nembeds: int, number of dimensions to embed categorical column down to.
Returns:
Wide and deep dictionaries of feature columns.
"""
deep_fc = {
colname: tf.feature_column.numeric_column(key=colname)
for colname in ["mother_age", "gestation_weeks"]
}
wide_fc = {}
is_male, wide_fc["is_male"] = categorical_fc(
"is_male", ["True", "False", "Unknown"])
plurality, wide_fc["plurality"] = categorical_fc(
"plurality", ["Single(1)", "Twins(2)", "Triplets(3)",
"Quadruplets(4)", "Quintuplets(5)", "Multiple(2+)"])
# Bucketize the float fields. This makes them wide
age_buckets = tf.feature_column.bucketized_column(
source_column=deep_fc["mother_age"],
boundaries=np.arange(15, 45, 1).tolist())
wide_fc["age_buckets"] = tf.feature_column.indicator_column(
categorical_column=age_buckets)
gestation_buckets = tf.feature_column.bucketized_column(
source_column=deep_fc["gestation_weeks"],
boundaries=np.arange(17, 47, 1).tolist())
wide_fc["gestation_buckets"] = tf.feature_column.indicator_column(
categorical_column=gestation_buckets)
# Cross all the wide columns, have to do the crossing before we one-hot
crossed = tf.feature_column.crossed_column(
keys=[age_buckets, gestation_buckets],
hash_bucket_size=1000)
deep_fc["crossed_embeds"] = tf.feature_column.embedding_column(
categorical_column=crossed, dimension=nembeds)
return wide_fc, deep_fc
def get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units):
"""Creates model architecture and returns outputs.
Args:
wide_inputs: Dense tensor used as inputs to wide side of model.
deep_inputs: Dense tensor used as inputs to deep side of model.
dnn_hidden_units: List of integers where length is number of hidden
layers and ith element is the number of neurons at ith layer.
Returns:
Dense tensor output from the model.
"""
# Hidden layers for the deep side
layers = [int(x) for x in dnn_hidden_units]
deep = deep_inputs
for layerno, numnodes in enumerate(layers):
deep = tf.keras.layers.Dense(
units=numnodes,
activation="relu",
name="dnn_{}".format(layerno+1))(deep)
deep_out = deep
# Linear model for the wide side
wide_out = tf.keras.layers.Dense(
units=10, activation="relu", name="linear")(wide_inputs)
# Concatenate the two sides
both = tf.keras.layers.concatenate(
inputs=[deep_out, wide_out], name="both")
# Final output is a linear activation because this is regression
output = tf.keras.layers.Dense(
units=1, activation="linear", name="weight")(both)
return output
def rmse(y_true, y_pred):
"""Calculates RMSE evaluation metric.
Args:
y_true: tensor, true labels.
y_pred: tensor, predicted labels.
Returns:
Tensor with value of RMSE between true and predicted labels.
"""
return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))
def build_wide_deep_model(dnn_hidden_units=[64, 32], nembeds=3):
"""Builds wide and deep model using Keras Functional API.
Returns:
`tf.keras.models.Model` object.
"""
# Create input layers
inputs = create_input_layers()
# Create feature columns for both wide and deep
wide_fc, deep_fc = create_feature_columns(nembeds)
# The constructor for DenseFeatures takes a list of numeric columns
# The Functional API in Keras requires: LayerConstructor()(inputs)
wide_inputs = tf.keras.layers.DenseFeatures(
feature_columns=wide_fc.values(), name="wide_inputs")(inputs)
deep_inputs = tf.keras.layers.DenseFeatures(
feature_columns=deep_fc.values(), name="deep_inputs")(inputs)
# Get output of model given inputs
output = get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units)
# Build model and compile it all together
model = tf.keras.models.Model(inputs=inputs, outputs=output)
model.compile(optimizer="adam", loss="mse", metrics=[rmse, "mse"])
return model
def train_and_evaluate(args):
model = build_wide_deep_model(args["nnsize"], args["nembeds"])
print("Here is our Wide-and-Deep architecture so far:\n")
print(model.summary())
trainds = load_dataset(
args["train_data_path"],
args["batch_size"],
'train')
evalds = load_dataset(
args["eval_data_path"], 1000, 'eval')
if args["eval_steps"]:
evalds = evalds.take(count=args["eval_steps"])
num_batches = args["batch_size"] * args["num_epochs"]
steps_per_epoch = args["train_examples"] // num_batches
checkpoint_path = os.path.join(args["output_dir"], "checkpoints/babyweight")
cp_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_path, verbose=1, save_weights_only=True)
history = model.fit(
trainds,
validation_data=evalds,
epochs=args["num_epochs"],
steps_per_epoch=steps_per_epoch,
verbose=2, # 0=silent, 1=progress bar, 2=one line per epoch
callbacks=[cp_callback])
EXPORT_PATH = os.path.join(
args["output_dir"], datetime.datetime.now().strftime("%Y%m%d%H%M%S"))
tf.saved_model.save(
obj=model, export_dir=EXPORT_PATH) # with default serving function
hp_metric = history.history['val_rmse'][-1]
hpt = hypertune.HyperTune()
hpt.report_hyperparameter_tuning_metric(
hyperparameter_metric_tag='rmse',
metric_value=hp_metric,
global_step=args['num_epochs'])
print("Exported trained model to {}".format(EXPORT_PATH))
To submit to the Cloud we use gcloud ai-platform jobs submit training [jobname]
and simply specify some additional parameters for AI Platform Training Service:
Below the -- \
we add in the arguments for our task.py
file.
In [ ]:
%%bash
OUTDIR=gs://${BUCKET}/babyweight/trained_model
JOBID=babyweight_$(date -u +%y%m%d_%H%M%S)
gcloud ai-platform jobs submit training ${JOBID} \
--region=${REGION} \
--module-name=trainer.task \
--package-path=$(pwd)/babyweight/trainer \
--job-dir=${OUTDIR} \
--staging-bucket=gs://${BUCKET} \
--master-machine-type=n1-standard-8 \
--scale-tier=CUSTOM \
--runtime-version=${TFVERSION} \
--python-version=${PYTHONVERSION} \
-- \
--train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \
--eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \
--output_dir=${OUTDIR} \
--num_epochs=10 \
--train_examples=10000 \
--eval_steps=100 \
--batch_size=32 \
--nembeds=8
The training job should complete within 15 to 20 minutes. You do not need to wait for this training job to finish before moving forward in the notebook, but will need a trained model.
Let's check the directory structure of our outputs of our trained model in folder we exported. We'll want to deploy the saved_model.pb within the timestamped directory as well as the variable values in the variables folder. Therefore, we need the path of the timestamped directory so that everything within it can be found by Cloud AI Platform's model deployment service.
In [10]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/trained_model
Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License