In [1]:
BUCKET = 'cloud-training-demos-ml'
PROJECT = 'cloud-training-demos'
REGION = 'us-central1'
In [2]:
import os
os.environ['BUCKET'] = BUCKET
os.environ['PROJECT'] = PROJECT
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = '1.13'
In [ ]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION
In [ ]:
%%bash
if ! gsutil ls | grep -q gs://${BUCKET}/babyweight/preproc; then
gsutil mb -l ${REGION} gs://${BUCKET}
# copy canonical set of preprocessed files if you didn't do previous notebook
gsutil -m cp -R gs://cloud-training-demos/babyweight gs://${BUCKET}
fi
In [ ]:
%%bash
gsutil ls gs://${BUCKET}/babyweight/preproc/*-00000*
In [ ]:
%%writefile babyweight/trainer/task.py
import argparse
import json
import os
from . import model
import tensorflow as tf
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--bucket',
help = 'GCS path to data. We assume that data is in gs://BUCKET/babyweight/preproc/',
required = True
)
parser.add_argument(
'--output_dir',
help = 'GCS location to write checkpoints and export models',
required = True
)
parser.add_argument(
'--batch_size',
help = 'Number of examples to compute gradient over.',
type = int,
default = 512
)
parser.add_argument(
'--job-dir',
help = 'this model ignores this field, but it is required by gcloud',
default = 'junk'
)
parser.add_argument(
'--nnsize',
help = 'Hidden layer sizes to use for DNN feature columns -- provide space-separated layers',
nargs = '+',
type = int,
default=[128, 32, 4]
)
parser.add_argument(
'--nembeds',
help = 'Embedding size of a cross of n key real-valued parameters',
type = int,
default = 3
)
## TODO 1: add the new arguments here
parser.add_argument(
'--train_examples',
help = 'Number of examples (in thousands) to run the training job over. If this is more than actual # of examples available, it cycles through them. So specifying 1000 here when you have only 100k examples makes this 10 epochs.',
type = int,
default = 5000
)
parser.add_argument(
'--pattern',
help = 'Specify a pattern that has to be in input files. For example 00001-of will process only one shard',
default = 'of'
)
parser.add_argument(
'--eval_steps',
help = 'Positive number of steps for which to evaluate model. Default to None, which means to evaluate until input_fn raises an end-of-input exception',
type = int,
default = None
)
## parse all arguments
args = parser.parse_args()
arguments = args.__dict__
# unused args provided by service
arguments.pop('job_dir', None)
arguments.pop('job-dir', None)
## assign the arguments to the model variables
output_dir = arguments.pop('output_dir')
model.BUCKET = arguments.pop('bucket')
model.BATCH_SIZE = arguments.pop('batch_size')
model.TRAIN_STEPS = (arguments.pop('train_examples') * 1000) / model.BATCH_SIZE
model.EVAL_STEPS = arguments.pop('eval_steps')
print ("Will train for {} steps using batch_size={}".format(model.TRAIN_STEPS, model.BATCH_SIZE))
model.PATTERN = arguments.pop('pattern')
model.NEMBEDS= arguments.pop('nembeds')
model.NNSIZE = arguments.pop('nnsize')
print ("Will use DNN size of {}".format(model.NNSIZE))
# Append trial_id to path if we are doing hptuning
# This code can be removed if you are not using hyperparameter tuning
output_dir = os.path.join(
output_dir,
json.loads(
os.environ.get('TF_CONFIG', '{}')
).get('task', {}).get('trial', '')
)
# Run the training job
model.train_and_evaluate(output_dir)
In [ ]:
%%writefile babyweight/trainer/model.py
import shutil
import numpy as np
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.INFO)
BUCKET = None # set from task.py
PATTERN = 'of' # gets all files
# Determine CSV, label, and key columns
CSV_COLUMNS = 'weight_pounds,is_male,mother_age,plurality,gestation_weeks,key'.split(',')
LABEL_COLUMN = 'weight_pounds'
KEY_COLUMN = 'key'
# Set default values for each CSV column
DEFAULTS = [[0.0], ['null'], [0.0], ['null'], [0.0], ['nokey']]
# Define some hyperparameters
TRAIN_STEPS = 10000
EVAL_STEPS = None
BATCH_SIZE = 512
NEMBEDS = 3
NNSIZE = [64, 16, 4]
# Create an input function reading a file using the Dataset API
# Then provide the results to the Estimator API
def read_dataset(prefix, mode, batch_size):
def _input_fn():
def decode_csv(value_column):
columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)
features = dict(zip(CSV_COLUMNS, columns))
label = features.pop(LABEL_COLUMN)
return features, label
# Use prefix to create file path
file_path = 'gs://{}/babyweight/preproc/{}*{}*'.format(BUCKET, prefix, PATTERN)
# Create list of files that match pattern
file_list = tf.gfile.Glob(file_path)
# Create dataset from file list
dataset = (tf.data.TextLineDataset(file_list) # Read text file
.map(decode_csv)) # Transform each elem by applying decode_csv fn
if mode == tf.estimator.ModeKeys.TRAIN:
num_epochs = None # indefinitely
dataset = dataset.shuffle(buffer_size = 10 * batch_size)
else:
num_epochs = 1 # end-of-input after this
dataset = dataset.repeat(num_epochs).batch(batch_size)
return dataset.make_one_shot_iterator().get_next()
return _input_fn
# Define feature columns
def get_wide_deep():
# Define column types
is_male,mother_age,plurality,gestation_weeks = \
[\
tf.feature_column.categorical_column_with_vocabulary_list('is_male',
['True', 'False', 'Unknown']),
tf.feature_column.numeric_column('mother_age'),
tf.feature_column.categorical_column_with_vocabulary_list('plurality',
['Single(1)', 'Twins(2)', 'Triplets(3)',
'Quadruplets(4)', 'Quintuplets(5)','Multiple(2+)']),
tf.feature_column.numeric_column('gestation_weeks')
]
# Discretize
age_buckets = tf.feature_column.bucketized_column(mother_age,
boundaries=np.arange(15,45,1).tolist())
gestation_buckets = tf.feature_column.bucketized_column(gestation_weeks,
boundaries=np.arange(17,47,1).tolist())
# Sparse columns are wide, have a linear relationship with the output
wide = [is_male,
plurality,
age_buckets,
gestation_buckets]
# Feature cross all the wide columns and embed into a lower dimension
crossed = tf.feature_column.crossed_column(wide, hash_bucket_size=20000)
embed = tf.feature_column.embedding_column(crossed, NEMBEDS)
# Continuous columns are deep, have a complex relationship with the output
deep = [mother_age,
gestation_weeks,
embed]
return wide, deep
# Create serving input function to be able to serve predictions later using provided inputs
def serving_input_fn():
feature_placeholders = {
'is_male': tf.placeholder(tf.string, [None]),
'mother_age': tf.placeholder(tf.float32, [None]),
'plurality': tf.placeholder(tf.string, [None]),
'gestation_weeks': tf.placeholder(tf.float32, [None]),
KEY_COLUMN: tf.placeholder_with_default(tf.constant(['nokey']), [None])
}
features = {
key: tf.expand_dims(tensor, -1)
for key, tensor in feature_placeholders.items()
}
return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)
# create metric for hyperparameter tuning
def my_rmse(labels, predictions):
pred_values = predictions['predictions']
return {'rmse': tf.metrics.root_mean_squared_error(labels, pred_values)}
# Create estimator to train and evaluate
def train_and_evaluate(output_dir):
tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file
wide, deep = get_wide_deep()
EVAL_INTERVAL = 300 # seconds
## TODO 2a: set the save_checkpoints_secs to the EVAL_INTERVAL
run_config = tf.estimator.RunConfig(save_checkpoints_secs = EVAL_INTERVAL,
keep_checkpoint_max = 3)
## TODO 2b: change the dnn_hidden_units to NNSIZE
estimator = tf.estimator.DNNLinearCombinedRegressor(
model_dir = output_dir,
linear_feature_columns = wide,
dnn_feature_columns = deep,
dnn_hidden_units = NNSIZE,
config = run_config)
# illustrates how to add an extra metric
estimator = tf.contrib.estimator.add_metrics(estimator, my_rmse)
# for batch prediction, you need a key associated with each instance
estimator = tf.contrib.estimator.forward_features(estimator, KEY_COLUMN)
## TODO 2c: Set the third argument of read_dataset to BATCH_SIZE
## TODO 2d: and set max_steps to TRAIN_STEPS
train_spec = tf.estimator.TrainSpec(
input_fn = read_dataset('train', tf.estimator.ModeKeys.TRAIN, BATCH_SIZE),
max_steps = TRAIN_STEPS)
exporter = tf.estimator.LatestExporter('exporter', serving_input_fn, exports_to_keep=None)
## TODO 2e: Lastly, set steps equal to EVAL_STEPS
eval_spec = tf.estimator.EvalSpec(
input_fn = read_dataset('eval', tf.estimator.ModeKeys.EVAL, 2**15), # no need to batch in eval
steps = EVAL_STEPS,
start_delay_secs = 60, # start evaluating after N seconds
throttle_secs = EVAL_INTERVAL, # evaluate every N seconds
exporters = exporter)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
In [ ]:
In [ ]:
%%bash
echo "bucket=${BUCKET}"
rm -rf babyweight_trained
export PYTHONPATH=${PYTHONPATH}:${PWD}/babyweight
python -m trainer.task \
--bucket=${BUCKET} \
--output_dir=babyweight_trained \
--job-dir=./tmp \
--pattern="00000-of-" --train_examples=1 --eval_steps=1
In [ ]:
In [ ]:
%%writefile inputs.json
{"key": "b1", "is_male": "True", "mother_age": 26.0, "plurality": "Single(1)", "gestation_weeks": 39}
{"key": "g1", "is_male": "False", "mother_age": 26.0, "plurality": "Single(1)", "gestation_weeks": 39}
In [ ]:
In [ ]:
%%bash
MODEL_LOCATION=$(ls -d $(pwd)/babyweight_trained/export/exporter/* | tail -1)
echo $MODEL_LOCATION1
gcloud ai-platform local predict --model-dir=$MODEL_LOCATION --json-instances=inputs.json
In [ ]:
In [ ]:
%%bash
OUTDIR=gs://${BUCKET}/babyweight/trained_model
JOBNAME=babyweight_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--package-path=$(pwd)/babyweight/trainer \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=STANDARD_1 \
--runtime-version=$TFVERSION \
-- \
--bucket=${BUCKET} \
--output_dir=${OUTDIR} \
--train_examples=200000
In [ ]:
%%writefile hyperparam.yaml
trainingInput:
scaleTier: STANDARD_1
hyperparameters:
hyperparameterMetricTag: rmse
goal: MINIMIZE
maxTrials: 20
maxParallelTrials: 5
enableTrialEarlyStopping: True
params:
- parameterName: batch_size
type: INTEGER
minValue: 8
maxValue: 512
scaleType: UNIT_LOG_SCALE
- parameterName: nembeds
type: INTEGER
minValue: 3
maxValue: 30
scaleType: UNIT_LINEAR_SCALE
- parameterName: nnsize
type: INTEGER
minValue: 64
maxValue: 512
scaleType: UNIT_LOG_SCALE
In [ ]:
%%bash
OUTDIR=gs://${BUCKET}/babyweight/hyperparam
JOBNAME=babyweight_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--package-path=$(pwd)/babyweight/trainer \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=STANDARD_1 \
--config=hyperparam.yaml \
--runtime-version=$TFVERSION \
-- \
--bucket=${BUCKET} \
--output_dir=${OUTDIR} \
--eval_steps=10 \
--train_examples=20000
In [ ]:
%%bash
OUTDIR=gs://${BUCKET}/babyweight/trained_model_tuned
JOBNAME=babyweight_$(date -u +%y%m%d_%H%M%S)
echo $OUTDIR $REGION $JOBNAME
gsutil -m rm -rf $OUTDIR
gcloud ai-platform jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--package-path=$(pwd)/babyweight/trainer \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=STANDARD_1 \
--runtime-version=$TFVERSION \
-- \
--bucket=${BUCKET} \
--output_dir=${OUTDIR} \
--train_examples=20000 --batch_size=35 --nembeds=16 --nnsize=281