The purporse of this tutorial is to explain the details of how to create a premade TensorFlow estimator, how trainining and evaluation work with different configurations, and how the model is exported for serving. The tutorial covers the following points:
In [ ]:
try:
COLAB = True
from google.colab import auth
auth.authenticate_user()
except:
pass
RANDOM_SEED = 19831006
In [ ]:
import os
import math
import multiprocessing
import pandas as pd
from datetime import datetime
import tensorflow as tf
print "TensorFlow : {}".format(tf.__version__)
tf.enable_eager_execution()
print "Eager Execution Enabled: {}".format(tf.executing_eagerly())
Predict whether income exceeds $50K/yr based on census data. Also known as "Census Income" dataset.
In [ ]:
DATA_DIR='data'
!mkdir $DATA_DIR
!gsutil cp gs://cloud-samples-data/ml-engine/census/data/adult.data.csv $DATA_DIR
!gsutil cp gs://cloud-samples-data/ml-engine/census/data/adult.test.csv $DATA_DIR
In [ ]:
TRAIN_DATA_FILE = os.path.join(DATA_DIR, 'adult.data.csv')
EVAL_DATA_FILE = os.path.join(DATA_DIR, 'adult.test.csv')
In [ ]:
!wc -l $TRAIN_DATA_FILE
!wc -l $EVAL_DATA_FILE
The training data includes 32,561 records, while the evaluation data includes 16,278 records.
In [ ]:
HEADER = ['age', 'workclass', 'fnlwgt', 'education', 'education_num',
'marital_status', 'occupation', 'relationship', 'race', 'gender',
'capital_gain', 'capital_loss', 'hours_per_week',
'native_country', 'income_bracket']
pd.read_csv(TRAIN_DATA_FILE, names=HEADER).head()
In [ ]:
HEADER = ['age', 'workclass', 'fnlwgt', 'education', 'education_num',
'marital_status', 'occupation', 'relationship', 'race', 'gender',
'capital_gain', 'capital_loss', 'hours_per_week',
'native_country', 'income_bracket']
HEADER_DEFAULTS = [[0], [''], [0], [''], [0], [''], [''], [''], [''], [''],
[0], [0], [0], [''], ['']]
NUMERIC_FEATURE_NAMES = ['age', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
CATEGORICAL_FEATURE_WITH_VOCABULARY = {
'workclass': ['State-gov', 'Self-emp-not-inc', 'Private', 'Federal-gov', 'Local-gov', '?', 'Self-emp-inc', 'Without-pay', 'Never-worked'],
'relationship': ['Not-in-family', 'Husband', 'Wife', 'Own-child', 'Unmarried', 'Other-relative'],
'gender': [' Male', 'Female'], 'marital_status': [' Never-married', 'Married-civ-spouse', 'Divorced', 'Married-spouse-absent', 'Separated', 'Married-AF-spouse', 'Widowed'],
'race': [' White', 'Black', 'Asian-Pac-Islander', 'Amer-Indian-Eskimo', 'Other'],
'education': ['Bachelors', 'HS-grad', '11th', 'Masters', '9th', 'Some-college', 'Assoc-acdm', 'Assoc-voc', '7th-8th', 'Doctorate', 'Prof-school', '5th-6th', '10th', '1st-4th', 'Preschool', '12th'],
}
CATEGORICAL_FEATURE_WITH_HASH_BUCKETS = {
'native_country': 60,
'occupation': 20
}
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_WITH_VOCABULARY.keys() + CATEGORICAL_FEATURE_WITH_HASH_BUCKETS.keys()
TARGET_NAME = 'income_bracket'
TARGET_LABELS = [' <=50K', ' >50K']
WEIGHT_COLUMN_NAME = 'fnlwgt'
In [ ]:
def process_features(features, target):
for feature_name in CATEGORICAL_FEATURE_WITH_VOCABULARY.keys() + CATEGORICAL_FEATURE_WITH_HASH_BUCKETS.keys():
features[feature_name] = tf.strings.strip(features[feature_name])
features['capital_total'] = features['capital_gain'] - features['capital_loss']
return features, target
def make_input_fn(file_pattern, batch_size, num_epochs=1, shuffle=False):
def _input_fn():
dataset = tf.data.experimental.make_csv_dataset(
file_pattern=file_pattern,
batch_size=batch_size,
column_names=HEADER,
column_defaults=HEADER_DEFAULTS,
label_name=TARGET_NAME,
field_delim=',',
use_quote_delim=True,
header=False,
num_epochs=num_epochs,
shuffle=shuffle,
shuffle_buffer_size=(5 * batch_size),
shuffle_seed=RANDOM_SEED,
num_parallel_reads=multiprocessing.cpu_count(),
sloppy=True,
)
return dataset.map(process_features).cache()
return _input_fn
In [ ]:
# You need to run tf.enable_eager_execution() at the top.
dataset = make_input_fn(TRAIN_DATA_FILE, batch_size=1)()
for features, target in dataset.take(1):
print "Input Features:"
for key in features:
print "{}:{}".format(key, features[key])
print ""
print "Target:"
print target
Base feature columns
Extended feature columns
In [ ]:
def create_feature_columns():
wide_columns = []
deep_columns = []
for column in NUMERIC_FEATURE_NAMES:
# Create numeric columns.
numeric_column = tf.feature_column.numeric_column(column)
deep_columns.append(numeric_column)
for column in CATEGORICAL_FEATURE_WITH_VOCABULARY:
# Create categorical columns with vocab.
vocabolary = CATEGORICAL_FEATURE_WITH_VOCABULARY[column]
categorical_column = tf.feature_column.categorical_column_with_vocabulary_list(
column, vocabolary)
wide_columns.append(categorical_column)
# Create embeddings of the categorical columns.
embed_size = int(math.sqrt(len(vocabolary)))
embedding_column = tf.feature_column.embedding_column(
categorical_column, embed_size)
deep_columns.append(embedding_column)
for column in CATEGORICAL_FEATURE_WITH_HASH_BUCKETS:
# Create categorical columns with hashing.
hash_columns = tf.feature_column.categorical_column_with_hash_bucket(
column,
hash_bucket_size=CATEGORICAL_FEATURE_WITH_HASH_BUCKETS[column])
wide_columns.append(hash_columns)
# Create indicators for hashing columns.
indicator_column = tf.feature_column.indicator_column(hash_columns)
deep_columns.append(indicator_column)
# Create bucktized column.
age_bucketized = tf.feature_column.bucketized_column(
deep_columns[0], boundaries = [18, 25, 30, 35, 40, 45, 50, 55, 60]
)
wide_columns.append(age_bucketized)
# Create crossing column.
education_X_occupation = tf.feature_column.crossed_column(
['education', 'workclass'], hash_bucket_size=int(1e4))
wide_columns.append(education_X_occupation)
# Create embeddings for crossing column.
education_X_occupation_embedded = tf.feature_column.embedding_column(
education_X_occupation, dimension=10)
deep_columns.append(education_X_occupation_embedded)
return wide_columns, deep_columns
In [ ]:
wide_columns, deep_columns = create_feature_columns()
print ""
print "Wide columns:"
for column in wide_columns:
print column
print ""
print "Deep columns:"
for column in deep_columns:
print column
In [ ]:
def create_estimator(params, run_config):
wide_columns, deep_columns = create_feature_columns()
estimator = tf.estimator.DNNLinearCombinedClassifier(
n_classes=len(TARGET_LABELS),
label_vocabulary=TARGET_LABELS,
weight_column=WEIGHT_COLUMN_NAME,
dnn_feature_columns=deep_columns,
dnn_optimizer=tf.train.AdamOptimizer(
learning_rate=params.learning_rate),
dnn_hidden_units=params.hidden_units,
dnn_dropout=params.dropout,
dnn_activation_fn=tf.nn.relu,
batch_norm=True,
linear_feature_columns=wide_columns,
linear_optimizer='Ftrl',
config=run_config
)
return estimator
Delete the model_dir file if you don't want a Warm Start
In [ ]:
def run_experiment(estimator, params, run_config,
resume=False, train_hooks=None, exporters=None):
print "Resume training {}: ".format(resume)
print "Epochs: {}".format(epochs)
print "Batch size: {}".format(params.batch_size)
print "Steps per epoch: {}".format(steps_per_epoch)
print "Training steps: {}".format(params.max_steps)
print "Learning rate: {}".format(params.learning_rate)
print "Hidden Units: {}".format(params.hidden_units)
print "Dropout probability: {}".format(params.dropout)
print "Save a checkpoint and evaluate afer {} step(s)".format(run_config.save_checkpoints_steps)
print "Keep the last {} checkpoint(s)".format(run_config.keep_checkpoint_max)
print ""
tf.logging.set_verbosity(tf.logging.INFO)
if not resume:
if tf.gfile.Exists(run_config.model_dir):
print "Removing previous artefacts..."
tf.gfile.DeleteRecursively(run_config.model_dir)
else:
print "Resuming training..."
# Create train specs.
train_spec = tf.estimator.TrainSpec(
input_fn = make_input_fn(
TRAIN_DATA_FILE,
batch_size=params.batch_size,
num_epochs=None, # Run until the max_steps is reached.
shuffle=True
),
max_steps=params.max_steps,
hooks=train_hooks
)
# Create eval specs.
eval_spec = tf.estimator.EvalSpec(
input_fn = make_input_fn(
EVAL_DATA_FILE,
batch_size=params.batch_size,
),
exporters=exporters,
start_delay_secs=0,
throttle_secs=0,
steps=None # Set to limit number of steps for evaluation.
)
time_start = datetime.utcnow()
print "Experiment started at {}".format(time_start.strftime("%H:%M:%S"))
print "......................................."
# Run train and evaluate.
tf.estimator.train_and_evaluate(
estimator=estimator,
train_spec=train_spec,
eval_spec=eval_spec)
time_end = datetime.utcnow()
print "......................................."
print "Experiment finished at {}".format(time_end.strftime("%H:%M:%S"))
print ""
time_elapsed = time_end - time_start
print "Experiment elapsed time: {} seconds".format(time_elapsed.total_seconds())
To change this behaviour, set one of the following parameters in the run_config
Set the number of the checkpoints to keep using keep_checkpoint_max
In [ ]:
class Parameters():
pass
MODELS_LOCATION = 'gs://ksalama-gcs-cloudml/others/models/census'
MODEL_NAME = 'dnn_classifier'
model_dir = os.path.join(MODELS_LOCATION, MODEL_NAME)
os.environ['MODEL_DIR'] = model_dir
TRAIN_DATA_SIZE = 32561
params = Parameters()
params.learning_rate = 0.001
params.hidden_units = [128, 128, 128]
params.dropout = 0.15
params.batch_size = 128
# Set number of steps with respect to epochs.
epochs = 5
steps_per_epoch = int(math.ceil(TRAIN_DATA_SIZE / params.batch_size))
params.max_steps = steps_per_epoch * epochs
run_config = tf.estimator.RunConfig(
tf_random_seed=RANDOM_SEED,
save_checkpoints_steps=steps_per_epoch, # Save a checkpoint after each epoch, evaluate the model after each epoch.
keep_checkpoint_max=3, # Keep the 3 most recently produced checkpoints.
model_dir=model_dir,
save_summary_steps=100, # Summary steps for Tensorboard.
log_step_count_steps=50
)
In [ ]:
if COLAB:
from tensorboardcolab import *
TensorBoardColab(graph_path=model_dir)
In [ ]:
estimator = create_estimator(params, run_config)
run_experiment(estimator, params, run_config)
In [ ]:
print model_dir
!gsutil ls {model_dir}
In [ ]:
def make_serving_input_receiver_fn():
inputs = {}
for feature_name in FEATURE_NAMES:
dtype = tf.float32 if feature_name in NUMERIC_FEATURE_NAMES else tf.string
inputs[feature_name] = tf.placeholder(shape=[None], dtype=dtype)
# What is wrong here?
return tf.estimator.export.build_raw_serving_input_receiver_fn(inputs)
In [ ]:
export_dir = os.path.join(model_dir, 'export')
# Delete export directory if exists.
if tf.gfile.Exists(export_dir):
tf.gfile.DeleteRecursively(export_dir)
# Export the estimator as a saved_model.
estimator.export_savedmodel(
export_dir_base=export_dir,
serving_input_receiver_fn=make_serving_input_receiver_fn()
)
In [ ]:
!gsutil ls gs://ksalama-gcs-cloudml/others/models/census/dnn_classifier/export/1552582374
In [ ]:
%%bash
saved_models_base=${MODEL_DIR}/export/
saved_model_dir=$(gsutil ls ${saved_models_base} | tail -n 1)
saved_model_cli show --dir=${saved_model_dir} --all
In [ ]:
export_dir = os.path.join(model_dir, 'export')
tf.gfile.ListDirectory(export_dir)[-1]
saved_model_dir = os.path.join(export_dir, tf.gfile.ListDirectory(export_dir)[-1])
print(saved_model_dir)
print ""
predictor_fn = tf.contrib.predictor.from_saved_model(
export_dir = saved_model_dir,
signature_def_key="predict"
)
output = predictor_fn(
{
'age': [34.0],
'workclass': ['Private'],
'education': ['Doctorate'],
'education_num': [10.0],
'marital_status': ['Married-civ-spouse'],
'occupation': ['Prof-specialty'],
'relationship': ['Husband'],
'race': ['White'],
'gender': ['Male'],
'capital_gain': [0.0],
'capital_loss': [0.0],
'hours_per_week': [40.0],
'native_country':['Egyptian']
}
)
print(output)
Saved models are exported under
In [ ]:
def _accuracy_bigger(best_eval_result, current_eval_result):
metric = 'accuracy'
return best_eval_result[metric] < current_eval_result[metric]
params.max_steps = 1000
params.hidden_units = [128, 128]
params.dropout = 0
run_config = tf.estimator.RunConfig(
tf_random_seed=RANDOM_SEED,
save_checkpoints_steps=200,
keep_checkpoint_max=1,
model_dir=model_dir,
log_step_count_steps=50
)
exporter = tf.estimator.BestExporter(
compare_fn=_accuracy_bigger,
event_file_pattern='eval_{}/*.tfevents.*'.format(datetime.utcnow().strftime("%H%M%S")),
name="estimate", # Saved models are exported under /export/estimate/
serving_input_receiver_fn=make_serving_input_receiver_fn(),
exports_to_keep=1
)
estimator = create_estimator(params, run_config)
run_experiment(estimator, params, run_config, exporters = [exporter])
In [ ]:
!gsutil ls {model_dir}/export/estimate
In [ ]:
early_stopping_hook = tf.contrib.estimator.stop_if_no_increase_hook(
estimator,
'accuracy',
max_steps_without_increase=100,
run_every_secs=None,
run_every_steps=500
)
params.max_steps = 1000000
params.hidden_units = [128, 128]
params.dropout = 0
run_config = tf.estimator.RunConfig(
tf_random_seed=RANDOM_SEED,
save_checkpoints_steps=500,
keep_checkpoint_max=1,
model_dir=model_dir,
log_step_count_steps=100
)
run_experiment(estimator, params, run_config, exporters = [exporter], train_hooks=[early_stopping_hook])
In [ ]:
strategy = None
num_gpus = len([device_name for device_name in tf.contrib.eager.list_devices()
if '/device:GPU' in device_name])
print "GPUs available: {}".format(num_gpus)
if num_gpus > 1:
strategy = tf.distribute.MirroredStrategy()
params.batch_size = int(math.ceil(params.batch_size / num_gpus))
run_config = tf.estimator.RunConfig(
tf_random_seed=RANDOM_SEED,
save_checkpoints_steps=200,
model_dir=model_dir,
train_distribute=strategy
)
estimator = create_estimator(params, run_config)
run_experiment(estimator, params, run_config)
In [ ]:
def metric_fn(labels, predictions):
metrics = {}
label_index = tf.contrib.lookup.index_table_from_tensor(tf.constant(TARGET_LABELS)).lookup(labels)
one_hot_labels = tf.one_hot(label_index, len(TARGET_LABELS))
metrics['mirco_accuracy'] = tf.metrics.mean_per_class_accuracy(
labels=label_index,
predictions=predictions['class_ids'],
num_classes=2)
metrics['f1_score'] = tf.contrib.metrics.f1_score(
labels=one_hot_labels,
predictions=predictions['probabilities'])
return metrics
params.max_steps = 1
estimator = create_estimator(params, run_config)
estimator = tf.contrib.estimator.add_metrics(estimator, metric_fn)
run_experiment(estimator, params, run_config)
In [ ]:
estimator = tf.contrib.estimator.forward_features(estimator, keys="row_identifier")
In [ ]:
def make_serving_input_receiver_fn():
inputs = {}
for feature_name in FEATURE_NAMES:
dtype = tf.float32 if feature_name in NUMERIC_FEATURE_NAMES else tf.string
inputs[feature_name] = tf.placeholder(shape=[None], dtype=dtype)
processed_inputs,_ = process_features(inputs, None)
processed_inputs['row_identifier'] = tf.placeholder(shape=[None], dtype=tf.string)
return tf.estimator.export.build_raw_serving_input_receiver_fn(processed_inputs)
export_dir = os.path.join(model_dir, 'export')
if tf.gfile.Exists(export_dir):
tf.gfile.DeleteRecursively(export_dir)
estimator.export_savedmodel(
export_dir_base=export_dir,
serving_input_receiver_fn=make_serving_input_receiver_fn()
)
In [ ]:
%%bash
saved_models_base=${MODEL_DIR}/export/
saved_model_dir=$(gsutil ls ${saved_models_base} | tail -n 1)
saved_model_cli show --dir=${saved_model_dir} --all
In [ ]:
export_dir = os.path.join(model_dir, 'export')
tf.gfile.ListDirectory(export_dir)[-1]
saved_model_dir = os.path.join(export_dir, tf.gfile.ListDirectory(export_dir)[-1])
print(saved_model_dir)
print ""
predictor_fn = tf.contrib.predictor.from_saved_model(
export_dir = saved_model_dir,
signature_def_key="predict"
)
output = predictor_fn(
{ 'row_identifier': ['key0123'],
'age': [34.0],
'workclass': ['Private'],
'education': ['Doctorate'],
'education_num': [10.0],
'marital_status': ['Married-civ-spouse'],
'occupation': ['Prof-specialty'],
'relationship': ['Husband'],
'race': ['White'],
'gender': ['Male'],
'capital_gain': [0.0],
'capital_loss': [0.0],
'hours_per_week': [40.0],
'native_country':['Egyptian']
}
)
print(output)
In [ ]:
def create_estimator(params, run_config):
wide_columns, deep_columns = create_feature_columns()
def _update_optimizer(initial_learning_rate, decay_steps):
# learning_rate = tf.train.exponential_decay(
# initial_learning_rate,
# global_step=tf.train.get_global_step(),
# decay_steps=decay_steps,
# decay_rate=0.9
# )
learning_rate = tf.train.cosine_decay_restarts(
initial_learning_rate,
tf.train.get_global_step(),
first_decay_steps=50,
t_mul=2.0,
m_mul=1.0,
alpha=0.0,
)
tf.summary.scalar('learning_rate', learning_rate)
return tf.train.AdamOptimizer(learning_rate=learning_rate)
estimator = tf.estimator.DNNLinearCombinedClassifier(
n_classes=len(TARGET_LABELS),
label_vocabulary=TARGET_LABELS,
weight_column=WEIGHT_COLUMN_NAME,
dnn_feature_columns=deep_columns,
dnn_optimizer=lambda: _update_optimizer(params.learning_rate, params.max_steps),
dnn_hidden_units=params.hidden_units,
dnn_dropout=params.dropout,
batch_norm=True,
linear_feature_columns=wide_columns,
linear_optimizer='Ftrl',
config=run_config
)
return estimator
In [ ]:
params.learning_rate = 0.1
params.max_steps = 1000
run_config = tf.estimator.RunConfig(
tf_random_seed=RANDOM_SEED,
save_checkpoints_steps=200,
model_dir=model_dir,
)
if COLAB:
from tensorboardcolab import *
TensorBoardColab(graph_path=model_dir)
estimator = create_estimator(params, run_config)
run_experiment(estimator, params, run_config)
Author: Khalid Salama
Disclaimer: This is not an official Google product. The sample code provided for an educational purpose.
Copyright 2019 Google LLC
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.