This lab gives you an introductory, end-to-end experience of training and prediction on AI Platform. The lab will use a census dataset to:
In [ ]:
import os
The relevant data files, adult.data and adult.test, are hosted in a public Cloud Storage bucket.
You can read the files directly from Cloud Storage or copy them to your local environment. For this lab you will download the samples for local training, and later upload them to your own Cloud Storage bucket for cloud training.
Run the following command to download the data to a local file directory and set variables that point to the downloaded data files:
In [ ]:
%%bash
mkdir data
gsutil -m cp gs://cloud-samples-data/ml-engine/census/data/* data/
In [ ]:
%%bash
export TRAIN_DATA=$(pwd)/data/adult.data.csv
export EVAL_DATA=$(pwd)/data/adult.test.csv
Inspect what the data looks like by looking at the first couple of rows:
In [ ]:
%%bash
head data/adult.data.csv
In [ ]:
%%bash
mkdir -p trainer
touch trainer/__init__.py
In [ ]:
%%writefile trainer/util.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
from six.moves import urllib
import tempfile
import numpy as np
import pandas as pd
import tensorflow as tf
# Storage directory
DATA_DIR = os.path.join(tempfile.gettempdir(), 'census_data')
# Download options.
DATA_URL = (
'https://storage.googleapis.com/cloud-samples-data/ai-platform/census'
'/data')
TRAINING_FILE = 'adult.data.csv'
EVAL_FILE = 'adult.test.csv'
TRAINING_URL = '%s/%s' % (DATA_URL, TRAINING_FILE)
EVAL_URL = '%s/%s' % (DATA_URL, EVAL_FILE)
# These are the features in the dataset.
# Dataset information: https://archive.ics.uci.edu/ml/datasets/census+income
_CSV_COLUMNS = [
'age', 'workclass', 'fnlwgt', 'education', 'education_num',
'marital_status', 'occupation', 'relationship', 'race', 'gender',
'capital_gain', 'capital_loss', 'hours_per_week', 'native_country',
'income_bracket'
]
# This is the label (target) we want to predict.
_LABEL_COLUMN = 'income_bracket'
# These are columns we will not use as features for training. There are many
# reasons not to use certain attributes of data for training. Perhaps their
# values are noisy or inconsistent, or perhaps they encode bias that we do not
# want our model to learn. For a deep dive into the features of this Census
# dataset and the challenges they pose, see the Introduction to ML Fairness
# Notebook: https://colab.research.google.com/github/google/eng-edu/blob
# /master/ml/cc/exercises/intro_to_fairness.ipynb
UNUSED_COLUMNS = ['fnlwgt', 'education', 'gender']
_CATEGORICAL_TYPES = {
'workclass': pd.api.types.CategoricalDtype(categories=[
'Federal-gov', 'Local-gov', 'Never-worked', 'Private', 'Self-emp-inc',
'Self-emp-not-inc', 'State-gov', 'Without-pay'
]),
'marital_status': pd.api.types.CategoricalDtype(categories=[
'Divorced', 'Married-AF-spouse', 'Married-civ-spouse',
'Married-spouse-absent', 'Never-married', 'Separated', 'Widowed'
]),
'occupation': pd.api.types.CategoricalDtype([
'Adm-clerical', 'Armed-Forces', 'Craft-repair', 'Exec-managerial',
'Farming-fishing', 'Handlers-cleaners', 'Machine-op-inspct',
'Other-service', 'Priv-house-serv', 'Prof-specialty', 'Protective-serv',
'Sales', 'Tech-support', 'Transport-moving'
]),
'relationship': pd.api.types.CategoricalDtype(categories=[
'Husband', 'Not-in-family', 'Other-relative', 'Own-child', 'Unmarried',
'Wife'
]),
'race': pd.api.types.CategoricalDtype(categories=[
'Amer-Indian-Eskimo', 'Asian-Pac-Islander', 'Black', 'Other', 'White'
]),
'native_country': pd.api.types.CategoricalDtype(categories=[
'Cambodia', 'Canada', 'China', 'Columbia', 'Cuba', 'Dominican-Republic',
'Ecuador', 'El-Salvador', 'England', 'France', 'Germany', 'Greece',
'Guatemala', 'Haiti', 'Holand-Netherlands', 'Honduras', 'Hong',
'Hungary',
'India', 'Iran', 'Ireland', 'Italy', 'Jamaica', 'Japan', 'Laos',
'Mexico',
'Nicaragua', 'Outlying-US(Guam-USVI-etc)', 'Peru', 'Philippines',
'Poland',
'Portugal', 'Puerto-Rico', 'Scotland', 'South', 'Taiwan', 'Thailand',
'Trinadad&Tobago', 'United-States', 'Vietnam', 'Yugoslavia'
]),
'income_bracket': pd.api.types.CategoricalDtype(categories=[
'<=50K', '>50K'
])
}
def _download_and_clean_file(filename, url):
"""Downloads data from url, and makes changes to match the CSV format.
The CSVs may use spaces after the comma delimters (non-standard) or include
rows which do not represent well-formed examples. This function strips out
some of these problems.
Args:
filename: filename to save url to
url: URL of resource to download
"""
temp_file, _ = urllib.request.urlretrieve(url)
with tf.io.gfile.GFile(temp_file, 'r') as temp_file_object:
with tf.io.gfile.GFile(filename, 'w') as file_object:
for line in temp_file_object:
line = line.strip()
line = line.replace(', ', ',')
if not line or ',' not in line:
continue
if line[-1] == '.':
line = line[:-1]
line += '\n'
file_object.write(line)
tf.io.gfile.remove(temp_file)
def download(data_dir):
"""Downloads census data if it is not already present.
Args:
data_dir: directory where we will access/save the census data
"""
tf.io.gfile.makedirs(data_dir)
training_file_path = os.path.join(data_dir, TRAINING_FILE)
if not tf.io.gfile.exists(training_file_path):
_download_and_clean_file(training_file_path, TRAINING_URL)
eval_file_path = os.path.join(data_dir, EVAL_FILE)
if not tf.io.gfile.exists(eval_file_path):
_download_and_clean_file(eval_file_path, EVAL_URL)
return training_file_path, eval_file_path
def preprocess(dataframe):
"""Converts categorical features to numeric. Removes unused columns.
Args:
dataframe: Pandas dataframe with raw data
Returns:
Dataframe with preprocessed data
"""
dataframe = dataframe.drop(columns=UNUSED_COLUMNS)
# Convert integer valued (numeric) columns to floating point
numeric_columns = dataframe.select_dtypes(['int64']).columns
dataframe[numeric_columns] = dataframe[numeric_columns].astype('float32')
# Convert categorical columns to numeric
cat_columns = dataframe.select_dtypes(['object']).columns
dataframe[cat_columns] = dataframe[cat_columns].apply(lambda x: x.astype(
_CATEGORICAL_TYPES[x.name]))
dataframe[cat_columns] = dataframe[cat_columns].apply(lambda x: x.cat.codes)
return dataframe
def standardize(dataframe):
"""Scales numerical columns using their means and standard deviation to get
z-scores: the mean of each numerical column becomes 0, and the standard
deviation becomes 1. This can help the model converge during training.
Args:
dataframe: Pandas dataframe
Returns:
Input dataframe with the numerical columns scaled to z-scores
"""
dtypes = list(zip(dataframe.dtypes.index, map(str, dataframe.dtypes)))
# Normalize numeric columns.
for column, dtype in dtypes:
if dtype == 'float32':
dataframe[column] -= dataframe[column].mean()
dataframe[column] /= dataframe[column].std()
return dataframe
def load_data():
"""Loads data into preprocessed (train_x, train_y, eval_y, eval_y)
dataframes.
Returns:
A tuple (train_x, train_y, eval_x, eval_y), where train_x and eval_x are
Pandas dataframes with features for training and train_y and eval_y are
numpy arrays with the corresponding labels.
"""
# Download Census dataset: Training and eval csv files.
training_file_path, eval_file_path = download(DATA_DIR)
# This census data uses the value '?' for missing entries. We use
# na_values to
# find ? and set it to NaN.
# https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv
# .html
train_df = pd.read_csv(training_file_path, names=_CSV_COLUMNS,
na_values='?')
eval_df = pd.read_csv(eval_file_path, names=_CSV_COLUMNS, na_values='?')
train_df = preprocess(train_df)
eval_df = preprocess(eval_df)
# Split train and eval data with labels. The pop method copies and removes
# the label column from the dataframe.
train_x, train_y = train_df, train_df.pop(_LABEL_COLUMN)
eval_x, eval_y = eval_df, eval_df.pop(_LABEL_COLUMN)
# Join train_x and eval_x to normalize on overall means and standard
# deviations. Then separate them again.
all_x = pd.concat([train_x, eval_x], keys=['train', 'eval'])
all_x = standardize(all_x)
train_x, eval_x = all_x.xs('train'), all_x.xs('eval')
# Reshape label columns for use with tf.data.Dataset
train_y = np.asarray(train_y).astype('float32').reshape((-1, 1))
eval_y = np.asarray(eval_y).astype('float32').reshape((-1, 1))
return train_x, train_y, eval_x, eval_y
The second file, called model.py, defines the input function and the model architecture. In this example, we use tf.data API for the data pipeline and create the model using the Keras Sequential API. We define a DNN with an input layer and 3 additonal layers using the Relu activation function. Since the task is a binary classification, the output layer uses the sigmoid activation.
In [ ]:
%%writefile trainer/model.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
def input_fn(features, labels, shuffle, num_epochs, batch_size):
"""Generates an input function to be used for model training.
Args:
features: numpy array of features used for training or inference
labels: numpy array of labels for each example
shuffle: boolean for whether to shuffle the data or not (set True for
training, False for evaluation)
num_epochs: number of epochs to provide the data for
batch_size: batch size for training
Returns:
A tf.data.Dataset that can provide data to the Keras model for training or
evaluation
"""
if labels is None:
inputs = features
else:
inputs = (features, labels)
dataset = tf.data.Dataset.from_tensor_slices(inputs)
if shuffle:
dataset = dataset.shuffle(buffer_size=len(features))
# We call repeat after shuffling, rather than before, to prevent separate
# epochs from blending together.
dataset = dataset.repeat(num_epochs)
dataset = dataset.batch(batch_size)
return dataset
def create_keras_model(input_dim, learning_rate):
"""Creates Keras Model for Binary Classification.
The single output node + Sigmoid activation makes this a Logistic
Regression.
Args:
input_dim: How many features the input has
learning_rate: Learning rate for training
Returns:
The compiled Keras model (still needs to be trained)
"""
Dense = tf.keras.layers.Dense
model = tf.keras.Sequential(
[
Dense(100, activation=tf.nn.relu, kernel_initializer='uniform',
input_shape=(input_dim,)),
Dense(75, activation=tf.nn.relu),
Dense(50, activation=tf.nn.relu),
Dense(25, activation=tf.nn.relu),
Dense(1, activation=tf.nn.sigmoid)
])
# Custom Optimizer:
# https://www.tensorflow.org/api_docs/python/tf/train/RMSPropOptimizer
optimizer = tf.keras.optimizers.RMSprop(lr=learning_rate)
# Compile Keras model
model.compile(
loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])
return model
The last file, called task.py, trains on data loaded and preprocessed in util.py. Using the tf.distribute.MirroredStrategy() sope, it is possible to train on a distributed fashion. The trained model is then saved in a TensorFlow SavedModel format.
In [ ]:
%%writefile trainer/task.py
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import os
from . import model
from . import util
import tensorflow as tf
def get_args():
"""Argument parser.
Returns:
Dictionary of arguments.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--job-dir',
type=str,
required=True,
help='local or GCS location for writing checkpoints and exporting '
'models')
parser.add_argument(
'--num-epochs',
type=int,
default=20,
help='number of times to go through the data, default=20')
parser.add_argument(
'--batch-size',
default=128,
type=int,
help='number of records to read during each training step, default=128')
parser.add_argument(
'--learning-rate',
default=.01,
type=float,
help='learning rate for gradient descent, default=.01')
parser.add_argument(
'--verbosity',
choices=['DEBUG', 'ERROR', 'FATAL', 'INFO', 'WARN'],
default='INFO')
args, _ = parser.parse_known_args()
return args
def train_and_evaluate(args):
"""Trains and evaluates the Keras model.
Uses the Keras model defined in model.py and trains on data loaded and
preprocessed in util.py. Saves the trained model in TensorFlow SavedModel
format to the path defined in part by the --job-dir argument.
Args:
args: dictionary of arguments - see get_args() for details
"""
train_x, train_y, eval_x, eval_y = util.load_data()
# dimensions
num_train_examples, input_dim = train_x.shape
num_eval_examples = eval_x.shape[0]
# Create the Keras Model
keras_model = model.create_keras_model(
input_dim=input_dim, learning_rate=args.learning_rate)
# Pass a numpy array by passing DataFrame.values
training_dataset = model.input_fn(
features=train_x.values,
labels=train_y,
shuffle=True,
num_epochs=args.num_epochs,
batch_size=args.batch_size)
# Pass a numpy array by passing DataFrame.values
validation_dataset = model.input_fn(
features=eval_x.values,
labels=eval_y,
shuffle=False,
num_epochs=args.num_epochs,
batch_size=num_eval_examples)
# Setup Learning Rate decay.
lr_decay_cb = tf.keras.callbacks.LearningRateScheduler(
lambda epoch: args.learning_rate + 0.02 * (0.5 ** (1 + epoch)),
verbose=True)
# Setup TensorBoard callback.
tensorboard_cb = tf.keras.callbacks.TensorBoard(
os.path.join(args.job_dir, 'keras_tensorboard'),
histogram_freq=1)
# Train model
keras_model.fit(
training_dataset,
steps_per_epoch=int(num_train_examples / args.batch_size),
epochs=args.num_epochs,
validation_data=validation_dataset,
validation_steps=1,
verbose=1,
callbacks=[lr_decay_cb, tensorboard_cb])
export_path = os.path.join(args.job_dir, 'keras_export')
tf.keras.models.save_model(keras_model, export_path)
print('Model exported to: {}'.format(export_path))
if __name__ == '__main__':
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
args = get_args()
tf.compat.v1.logging.set_verbosity(args.verbosity)
train_and_evaluate(args)
NOTE When you run the same training job on AI Platform later in the lab, you'll see that the command is not much different from the above.
Specify an output directory and set a MODEL_DIR variable to hold the trained model, then run the training job locally by running the following command (by default, verbose logging is turned off. You can enable it by setting the --verbosity tag to DEBUG):
In [ ]:
%%bash
MODEL_DIR=output
gcloud ai-platform local train \
--module-name trainer.task \
--package-path trainer/ \
--job-dir $MODEL_DIR \
-- \
--train-files $TRAIN_DATA \
--eval-files $EVAL_DATA \
--train-steps 1000 \
--eval-steps 100
Check if the output has been written to the output folder:
In [ ]:
%%bash
ls output/keras_export/
To receive valid and useful predictions, you must preprocess input for prediction in the same way that training data was preprocessed. In a production system, you may want to create a preprocessing pipeline that can be used identically at training time and prediction time.
For this exercise, use the training package's data-loading code to select a random sample from the evaluation data. This data is in the form that was used to evaluate accuracy after each epoch of training, so it can be used to send test predictions without further preprocessing.
Run the following snippet of code to preprocess the raw data from the adult.test.csv file. Here, we are grabbing 5 examples to run predictions on:
In [ ]:
from trainer import util
_, _, eval_x, eval_y = util.load_data()
prediction_input = eval_x.sample(5)
prediction_targets = eval_y[prediction_input.index]
Check the numerical representation of the features by printing the preprocessed data:
In [ ]:
print(prediction_input)
Notice that categorical fields, like occupation, have already been converted to integers (with the same mapping that was used for training). Numerical fields, like age, have been scaled to a z-score. Some fields have been dropped from the original data.
Export the prediction input to a newline-delimited JSON file:
In [ ]:
import json
with open('test.json', 'w') as json_file:
for row in prediction_input.values.tolist():
json.dump(row, json_file)
json_file.write('\n')
Inspect the .json file:
In [ ]:
%%bash
cat test.json
Once you've trained your TensorFlow model, you can use it for prediction on new data. In this case, you've trained a census model to predict income category given some information about a person.
Run the following command to run prediction on the test.json file we created above:
Note: If you get a "Bad magic number in .pyc file" error, go to the terminal and run:
cd ../../usr/lib/google-cloud-sdk/lib/googlecloudsdk/command_lib/ml_engine/
sudo rm *.pyc
In [ ]:
%%bash
gcloud ai-platform local predict \
--model-dir output/keras_export/ \
--json-instances ./test.json
Since the model's last layer uses a sigmoid function for its activation, outputs between 0 and 0.5 represent negative predictions ("<=50K") and outputs between 0.5 and 1 represent positive ones (">50K").
Now that you've validated your model by running it locally, you will now get practice training using Cloud AI Platform.
Note: The initial job request will take several minutes to start, but subsequent jobs run more quickly. This enables quick iteration as you develop and validate your training job.
First, set the following variables:
In [ ]:
%%bash
export PROJECT=$(gcloud config list project --format "value(core.project)")
echo "Your current GCP Project Name is: "${PROJECT}
In [ ]:
PROJECT = "YOUR_PROJECT_NAME" # Replace with your project name
BUCKET_NAME=PROJECT+"-aiplatform"
REGION="us-central1"
In [ ]:
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET_NAME"] = BUCKET_NAME
os.environ["REGION"] = REGION
os.environ["TFVERSION"] = "2.1"
os.environ["PYTHONVERSION"] = "3.7"
In [ ]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET_NAME}; then
gsutil mb -l ${REGION} gs://${BUCKET_NAME}
fi
gsutil cp -r data gs://$BUCKET_NAME/data
Set the TRAIN_DATA and EVAL_DATA variables to point to the files:
In [ ]:
%%bash
export TRAIN_DATA=gs://$BUCKET_NAME/data/adult.data.csv
export EVAL_DATA=gs://$BUCKET_NAME/data/adult.test.csv
Use gsutil again to copy the JSON test file test.json to your Cloud Storage bucket:
In [ ]:
%%bash
gsutil cp test.json gs://$BUCKET_NAME/data/test.json
Set the TEST_JSON variable to point to that file:
In [ ]:
%%bash
export TEST_JSON=gs://$BUCKET_NAME/data/test.json
Go back to the lab instructions and check your progress by testing the completed tasks:
- "Set up a Google Cloud Storage".
- "Upload the data files to your Cloud Storage bucket".
With a validated training job that runs in both single-instance and distributed mode, you're now ready to run a training job in the cloud. For this example, we will be requesting a single-instance training job.
Use the default BASIC scale tier to run a single-instance training job. The initial job request can take a few minutes to start, but subsequent jobs run more quickly. This enables quick iteration as you develop and validate your training job.
Select a name for the initial training run that distinguishes it from any subsequent training runs. For example, we can use date and time to compose the job id.
Specify a directory for output generated by AI Platform by setting an OUTPUT_PATH variable to include when requesting training and prediction jobs. The OUTPUT_PATH represents the fully qualified Cloud Storage location for model checkpoints, summaries, and exports. You can use the BUCKET_NAME variable you defined in a previous step. It's a good practice to use the job name as the output directory.
Run the following command to submit a training job in the cloud that uses a single process. This time, set the --verbosity tag to DEBUG so that you can inspect the full logging output and retrieve accuracy, loss, and other metrics. The output also contains a number of other warning messages that you can ignore for the purposes of this sample:
In [ ]:
%%bash
JOB_ID=census_$(date -u +%y%m%d_%H%M%S)
OUTPUT_PATH=gs://$BUCKET_NAME/$JOB_ID
gcloud ai-platform jobs submit training $JOB_ID \
--job-dir $OUTPUT_PATH \
--runtime-version $TFVERSION \
--python-version $PYTHONVERSION \
--module-name trainer.task \
--package-path trainer/ \
--region $REGION \
-- \
--train-files $TRAIN_DATA \
--eval-files $EVAL_DATA \
--train-steps 1000 \
--eval-steps 100 \
--verbosity DEBUG
Set an environment variable with the jobId generated above:
In [ ]:
os.environ["JOB_ID"] = "YOUR_JOB_ID" # Replace with your job id
You can monitor the progress of your training job by watching the logs on the command line by running:
gcloud ai-platform jobs stream-logs $JOB_ID
Or monitor it in the Console at AI Platform > Jobs
. Wait until your AI Platform training job is done. It is finished when you see a green check mark by the jobname in the Cloud Console, or when you see the message Job completed successfully from the Cloud Shell command line.
Go back to the lab instructions and check your progress by testing the completed task:
- "Run a single-instance trainer in the cloud".
By deploying your trained model to AI Platform to serve online prediction requests, you get the benefit of scalable serving. This is useful if you expect your trained model to be hit with many prediction requests in a short period of time.
Create an AI Platform model:
In [ ]:
os.environ["MODEL_NAME"] = "census"
In [ ]:
%%bash
gcloud ai-platform models create $MODEL_NAME --regions=$REGION
Set the environment variable MODEL_BINARIES to the full path of your exported trained model binaries $OUTPUT_PATH/keras_export/
.
You'll deploy this trained model.
Run the following command to create a version v1 of your model:
In [ ]:
%%bash
OUTPUT_PATH=gs://$BUCKET_NAME/$JOB_ID
MODEL_BINARIES=$OUTPUT_PATH/keras_export/
gcloud ai-platform versions create v1 \
--model $MODEL_NAME \
--origin $MODEL_BINARIES \
--runtime-version $TFVERSION \
--python-version $PYTHONVERSION
It may take several minutes to deploy your trained model. When done, you can see a list of your models using the models list command:
In [ ]:
%%bash
gcloud ai-platform models list
Go back to the lab instructions and check your progress by testing the completed tasks:
- "Create an AI Platform model".
- "Create a version v1 of your model".
You can now send prediction requests to your deployed model. The following command sends a prediction request using the test.json.
The response includes the probabilities of each label (>50K and <=50K) based on the data entry in test.json, thus indicating whether the predicted income is greater than or less than 50,000 dollars.
In [ ]:
%%bash
gcloud ai-platform predict \
--model $MODEL_NAME \
--version v1 \
--json-instances ./test.json
Note: AI Platform supports batch prediction, too, but it's not included in this lab. See the documentation for more info.
Go back to the lab instructions to answer some multiple choice questions to reinforce your uncerstanding of some of these lab's concepts.