TensorFlow Estimators Deep Dive

The purporse of this tutorial is to explain the details of how to create a premade TensorFlow estimator, how trainining and evaluation work with different configurations, and how the model is exported for serving. The tutorial covers the following points:

  1. Implementing Input function with tf.data APIs.
  2. Creating Feature columns.
  3. Creating a Wide and Deep model with a premade estimator.
  4. Configuring Train and evaluate parameters.
  5. Exporting trained models for serving.
  6. Implementing Early stopping.
  7. Distribution Strategy for multi-GPUs.
  8. Extending premade estimators.
  9. Adaptive learning rate.


In [ ]:
try:
  COLAB = True
  from google.colab import auth
  auth.authenticate_user()
except:
    pass

RANDOM_SEED = 19831006

In [ ]:
import os
import math
import multiprocessing
import pandas as pd
from datetime import datetime

import tensorflow as tf
print "TensorFlow : {}".format(tf.__version__)

tf.enable_eager_execution()
print "Eager Execution Enabled: {}".format(tf.executing_eagerly())

Download Data

UCI Adult Dataset: https://archive.ics.uci.edu/ml/datasets/adult

Predict whether income exceeds $50K/yr based on census data. Also known as "Census Income" dataset.


In [ ]:
DATA_DIR='data'
!mkdir $DATA_DIR
!gsutil cp gs://cloud-samples-data/ml-engine/census/data/adult.data.csv $DATA_DIR
!gsutil cp gs://cloud-samples-data/ml-engine/census/data/adult.test.csv $DATA_DIR

In [ ]:
TRAIN_DATA_FILE = os.path.join(DATA_DIR, 'adult.data.csv')
EVAL_DATA_FILE = os.path.join(DATA_DIR, 'adult.test.csv')

In [ ]:
!wc -l $TRAIN_DATA_FILE
!wc -l $EVAL_DATA_FILE

The training data includes 32,561 records, while the evaluation data includes 16,278 records.


In [ ]:
HEADER = ['age', 'workclass', 'fnlwgt', 'education', 'education_num',
               'marital_status', 'occupation', 'relationship', 'race', 'gender',
               'capital_gain', 'capital_loss', 'hours_per_week',
               'native_country', 'income_bracket']
               
pd.read_csv(TRAIN_DATA_FILE, names=HEADER).head()

Dataset Metadata


In [ ]:
HEADER = ['age', 'workclass', 'fnlwgt', 'education', 'education_num',
               'marital_status', 'occupation', 'relationship', 'race', 'gender',
               'capital_gain', 'capital_loss', 'hours_per_week',
               'native_country', 'income_bracket']

HEADER_DEFAULTS = [[0], [''], [0], [''], [0], [''], [''], [''], [''], [''],
                       [0], [0], [0], [''], ['']]

NUMERIC_FEATURE_NAMES = ['age', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week']
CATEGORICAL_FEATURE_WITH_VOCABULARY = {
  'workclass': ['State-gov', 'Self-emp-not-inc', 'Private', 'Federal-gov', 'Local-gov', '?', 'Self-emp-inc', 'Without-pay', 'Never-worked'], 
  'relationship': ['Not-in-family', 'Husband', 'Wife', 'Own-child', 'Unmarried', 'Other-relative'], 
  'gender': [' Male', 'Female'], 'marital_status': [' Never-married', 'Married-civ-spouse', 'Divorced', 'Married-spouse-absent', 'Separated', 'Married-AF-spouse', 'Widowed'], 
  'race': [' White', 'Black', 'Asian-Pac-Islander', 'Amer-Indian-Eskimo', 'Other'], 
  'education': ['Bachelors', 'HS-grad', '11th', 'Masters', '9th', 'Some-college', 'Assoc-acdm', 'Assoc-voc', '7th-8th', 'Doctorate', 'Prof-school', '5th-6th', '10th', '1st-4th', 'Preschool', '12th'], 
}

CATEGORICAL_FEATURE_WITH_HASH_BUCKETS = {
  'native_country': 60,
  'occupation': 20
}

FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_WITH_VOCABULARY.keys() + CATEGORICAL_FEATURE_WITH_HASH_BUCKETS.keys()
TARGET_NAME = 'income_bracket'
TARGET_LABELS = [' <=50K', ' >50K']
WEIGHT_COLUMN_NAME = 'fnlwgt'

1. Data Input Function


In [ ]:
def process_features(features, target):
  for feature_name in CATEGORICAL_FEATURE_WITH_VOCABULARY.keys() + CATEGORICAL_FEATURE_WITH_HASH_BUCKETS.keys():
    features[feature_name] = tf.strings.strip(features[feature_name])
  
  features['capital_total'] = features['capital_gain'] - features['capital_loss']
  return features, target

def make_input_fn(file_pattern, batch_size, num_epochs=1, shuffle=False):

    def _input_fn():
        dataset = tf.data.experimental.make_csv_dataset(
            file_pattern=file_pattern,
            batch_size=batch_size,
            column_names=HEADER,
            column_defaults=HEADER_DEFAULTS,
            label_name=TARGET_NAME,
            field_delim=',',
            use_quote_delim=True,
            header=False,
            num_epochs=num_epochs,
            shuffle=shuffle,
            shuffle_buffer_size=(5 * batch_size),
            shuffle_seed=RANDOM_SEED,
            num_parallel_reads=multiprocessing.cpu_count(),
            sloppy=True,
        )
        return dataset.map(process_features).cache()
    
    return _input_fn

In [ ]:
# You need to run tf.enable_eager_execution() at the top.

dataset = make_input_fn(TRAIN_DATA_FILE, batch_size=1)()
for features, target in dataset.take(1):
  print "Input Features:"
  for key in features:
    print "{}:{}".format(key, features[key])

  print ""
  print "Target:"
  print target

In [ ]:
def create_feature_columns():
    
    wide_columns = []
    deep_columns = []
    
    for column in NUMERIC_FEATURE_NAMES:
        # Create numeric columns.
        numeric_column = tf.feature_column.numeric_column(column)
        deep_columns.append(numeric_column)
        
    for column in CATEGORICAL_FEATURE_WITH_VOCABULARY:
      # Create categorical columns with vocab.
      vocabolary = CATEGORICAL_FEATURE_WITH_VOCABULARY[column]
      categorical_column = tf.feature_column.categorical_column_with_vocabulary_list(
        column, vocabolary)
      wide_columns.append(categorical_column)
  
      # Create embeddings of the categorical columns.
      embed_size = int(math.sqrt(len(vocabolary)))
      embedding_column = tf.feature_column.embedding_column(
        categorical_column, embed_size)
      deep_columns.append(embedding_column)

    for column in CATEGORICAL_FEATURE_WITH_HASH_BUCKETS:
      # Create categorical columns with hashing.
      hash_columns = tf.feature_column.categorical_column_with_hash_bucket(
        column, 
        hash_bucket_size=CATEGORICAL_FEATURE_WITH_HASH_BUCKETS[column])
      wide_columns.append(hash_columns)

      # Create indicators for hashing columns.
      indicator_column = tf.feature_column.indicator_column(hash_columns) 
      deep_columns.append(indicator_column)

    # Create bucktized column.
    age_bucketized = tf.feature_column.bucketized_column(
      deep_columns[0], boundaries = [18, 25, 30, 35, 40, 45, 50, 55, 60]
    )
    wide_columns.append(age_bucketized)

    # Create crossing column.
    education_X_occupation = tf.feature_column.crossed_column(
     ['education', 'workclass'], hash_bucket_size=int(1e4))
    wide_columns.append(education_X_occupation)
    
    # Create embeddings for crossing column.
    education_X_occupation_embedded = tf.feature_column.embedding_column(
      education_X_occupation, dimension=10)
    deep_columns.append(education_X_occupation_embedded)
  
    return wide_columns, deep_columns

In [ ]:
wide_columns, deep_columns = create_feature_columns()

print ""
print "Wide columns:"
for column in wide_columns:
  print column

print ""
print "Deep columns:"
for column in deep_columns:
  print column

3. Instantiate a Wide and Deep Estimator



In [ ]:
def create_estimator(params, run_config):
    
    wide_columns, deep_columns = create_feature_columns()
    
    estimator = tf.estimator.DNNLinearCombinedClassifier(

        n_classes=len(TARGET_LABELS),
        label_vocabulary=TARGET_LABELS,
        weight_column=WEIGHT_COLUMN_NAME,

        dnn_feature_columns=deep_columns,
        dnn_optimizer=tf.train.AdamOptimizer(
          learning_rate=params.learning_rate),
        dnn_hidden_units=params.hidden_units,
        dnn_dropout=params.dropout,
        dnn_activation_fn=tf.nn.relu,
        batch_norm=True,

        linear_feature_columns=wide_columns,
        linear_optimizer='Ftrl',

        config=run_config
    )
    
    return estimator

4. Implement Train and Evaluate Experiment

Delete the model_dir file if you don't want a Warm Start

  • If not deleted, and you change the model, it will error.

TrainSpec

  • Set shuffle in the input_fn to True
  • Set num_epochs in the input_fn to None
  • Set max_steps. One batch (feed-forward pass & backpropagation) corresponds to 1 training step.

EvalSpec

  • Set shuffle in the input_fn to False
  • Set Set num_epochs in the input_fn to 1
  • Set steps to None if you want to use all the evaluation data.
  • Otherwise, set steps to the number of batches you want to use for evaluation, and set shuffle to True.
  • Set start_delay_secs to 0 to start evaluation as soon as a checkpoint is produced.
  • Set throttle_secs to 0 to re-evaluate as soon as a new checkpoint is produced.

In [ ]:
def run_experiment(estimator, params, run_config, 
  resume=False, train_hooks=None, exporters=None):

  print "Resume training {}: ".format(resume)
  print "Epochs: {}".format(epochs)
  print "Batch size: {}".format(params.batch_size)
  print "Steps per epoch: {}".format(steps_per_epoch)
  print "Training steps: {}".format(params.max_steps)
  print "Learning rate: {}".format(params.learning_rate)
  print "Hidden Units: {}".format(params.hidden_units)
  print "Dropout probability: {}".format(params.dropout)
  print "Save a checkpoint and evaluate afer {} step(s)".format(run_config.save_checkpoints_steps)
  print "Keep the last {} checkpoint(s)".format(run_config.keep_checkpoint_max)
  print ""
  
  tf.logging.set_verbosity(tf.logging.INFO)

  if not resume: 
    if tf.gfile.Exists(run_config.model_dir):
      print "Removing previous artefacts..."
      tf.gfile.DeleteRecursively(run_config.model_dir)
  else:
    print "Resuming training..."

  # Create train specs.
  train_spec = tf.estimator.TrainSpec(
      input_fn = make_input_fn(
          TRAIN_DATA_FILE,
          batch_size=params.batch_size,
          num_epochs=None, # Run until the max_steps is reached.
          shuffle=True
      ),
      max_steps=params.max_steps,
      hooks=train_hooks
  )

  # Create eval specs.
  eval_spec = tf.estimator.EvalSpec(
      input_fn = make_input_fn(
          EVAL_DATA_FILE,
          batch_size=params.batch_size,     
      ),
      exporters=exporters,
      start_delay_secs=0,
      throttle_secs=0,
      steps=None # Set to limit number of steps for evaluation.
  )
  
  time_start = datetime.utcnow() 
  print "Experiment started at {}".format(time_start.strftime("%H:%M:%S"))
  print "......................................."
  
  # Run train and evaluate.
  tf.estimator.train_and_evaluate(
    estimator=estimator,
    train_spec=train_spec, 
    eval_spec=eval_spec)

  time_end = datetime.utcnow() 
  print "......................................."
  print "Experiment finished at {}".format(time_end.strftime("%H:%M:%S"))
  print ""
  
  time_elapsed = time_end - time_start
  print "Experiment elapsed time: {} seconds".format(time_elapsed.total_seconds())

Set Parameters and Run Configurations.

  • Set model_dir in the run_config
  • If the data size is known, training steps, with respect to epochs would be: (training_size / batch_size) * epochs
  • By default, a checkpoint is saved every 600 secs. That is, the model is evaluated only every 10mins.
  • To change this behaviour, set one of the following parameters in the run_config

    • save_checkpoints_secs: Save checkpoints every this many seconds.
    • save_checkpoints_steps: Save checkpoints every this many steps.
  • Set the number of the checkpoints to keep using keep_checkpoint_max


In [ ]:
class Parameters():
  pass

MODELS_LOCATION = 'gs://ksalama-gcs-cloudml/others/models/census'
MODEL_NAME = 'dnn_classifier'
model_dir = os.path.join(MODELS_LOCATION, MODEL_NAME)
os.environ['MODEL_DIR'] = model_dir

TRAIN_DATA_SIZE = 32561

params = Parameters()
params.learning_rate = 0.001
params.hidden_units = [128, 128, 128]
params.dropout = 0.15
params.batch_size =  128

# Set number of steps with respect to epochs.
epochs = 5
steps_per_epoch = int(math.ceil(TRAIN_DATA_SIZE / params.batch_size))
params.max_steps = steps_per_epoch * epochs

run_config = tf.estimator.RunConfig(
    tf_random_seed=RANDOM_SEED,
    save_checkpoints_steps=steps_per_epoch, # Save a checkpoint after each epoch, evaluate the model after each epoch.
    keep_checkpoint_max=3, # Keep the 3 most recently  produced checkpoints.
    model_dir=model_dir,
    save_summary_steps=100, # Summary steps for Tensorboard.
    log_step_count_steps=50
)

Run Experiment


In [ ]:
if COLAB:
  from tensorboardcolab import *
  TensorBoardColab(graph_path=model_dir)

In [ ]:
estimator = create_estimator(params, run_config)
run_experiment(estimator, params, run_config)

In [ ]:
print model_dir
!gsutil ls {model_dir}

5. Export your trained model

Implement serving input receiver function


In [ ]:
def make_serving_input_receiver_fn():
  inputs = {}
  for feature_name in FEATURE_NAMES:
    dtype = tf.float32 if feature_name in NUMERIC_FEATURE_NAMES else tf.string
    inputs[feature_name] = tf.placeholder(shape=[None], dtype=dtype)
  
  # What is wrong here? 
  return tf.estimator.export.build_raw_serving_input_receiver_fn(inputs)

Export to saved_model


In [ ]:
export_dir = os.path.join(model_dir, 'export')

# Delete export directory if exists.
if tf.gfile.Exists(export_dir):
    tf.gfile.DeleteRecursively(export_dir)

# Export the estimator as a saved_model.
estimator.export_savedmodel(
    export_dir_base=export_dir,
    serving_input_receiver_fn=make_serving_input_receiver_fn()
)

In [ ]:
!gsutil ls gs://ksalama-gcs-cloudml/others/models/census/dnn_classifier/export/1552582374

In [ ]:
%%bash

saved_models_base=${MODEL_DIR}/export/
saved_model_dir=$(gsutil ls ${saved_models_base} | tail -n 1)
saved_model_cli show --dir=${saved_model_dir} --all

Test saved_model


In [ ]:
export_dir = os.path.join(model_dir, 'export')
tf.gfile.ListDirectory(export_dir)[-1]
saved_model_dir = os.path.join(export_dir, tf.gfile.ListDirectory(export_dir)[-1])
print(saved_model_dir)
print ""

predictor_fn = tf.contrib.predictor.from_saved_model(
    export_dir = saved_model_dir,
    signature_def_key="predict"
)

output = predictor_fn(
    {
        'age': [34.0],
        'workclass': ['Private'],
        'education': ['Doctorate'],
        'education_num': [10.0],
        'marital_status': ['Married-civ-spouse'],
        'occupation': ['Prof-specialty'],
        'relationship': ['Husband'],
        'race': ['White'],
        'gender': ['Male'],
        'capital_gain': [0.0], 
        'capital_loss': [0.0], 
        'hours_per_week': [40.0],
        'native_country':['Egyptian']
    }
)
print(output)

Export the Model during Training and Evaluation

Saved models are exported under /export/.

  • Latest Exporter: exports a model after each evaluation.
    • specify the maximum number of exported models to keep using exports_to_keep param.
  • Final Exporter: exports only the very last evaluated checkpoint. of the model.
  • Best exporter: runs everytime when the newly evaluted checkpoint is better than any exsiting model.
    • specify the maximum number of exported models to keep using exports_to_keep param.
    • It uses the evaluation events stored under the eval folder.

In [ ]:
def _accuracy_bigger(best_eval_result, current_eval_result):
  
  metric = 'accuracy'
  return best_eval_result[metric] < current_eval_result[metric]


params.max_steps = 1000
params.hidden_units = [128, 128]
params.dropout = 0
run_config = tf.estimator.RunConfig(
    tf_random_seed=RANDOM_SEED,
    save_checkpoints_steps=200,
    keep_checkpoint_max=1,
    model_dir=model_dir,
    log_step_count_steps=50
)

exporter = tf.estimator.BestExporter(
    compare_fn=_accuracy_bigger,
    event_file_pattern='eval_{}/*.tfevents.*'.format(datetime.utcnow().strftime("%H%M%S")),
    name="estimate", # Saved models are exported under /export/estimate/
    serving_input_receiver_fn=make_serving_input_receiver_fn(),
    exports_to_keep=1
)

estimator = create_estimator(params, run_config)
run_experiment(estimator, params, run_config, exporters = [exporter])

In [ ]:
!gsutil ls {model_dir}/export/estimate

In [ ]:
early_stopping_hook = tf.contrib.estimator.stop_if_no_increase_hook(
    estimator,
    'accuracy',
    max_steps_without_increase=100,
    run_every_secs=None,
    run_every_steps=500
)

params.max_steps = 1000000
params.hidden_units = [128, 128]
params.dropout = 0
run_config = tf.estimator.RunConfig(
    tf_random_seed=RANDOM_SEED,
    save_checkpoints_steps=500,
    keep_checkpoint_max=1,
    model_dir=model_dir,
    log_step_count_steps=100
)

run_experiment(estimator, params, run_config, exporters = [exporter], train_hooks=[early_stopping_hook])

7. Using Distribution Strategy for Utilising Multiple GPUs


In [ ]:
strategy = None
num_gpus = len([device_name for device_name in tf.contrib.eager.list_devices()
                if '/device:GPU' in device_name])

print "GPUs available: {}".format(num_gpus)

if num_gpus > 1:
    strategy = tf.distribute.MirroredStrategy()
    params.batch_size = int(math.ceil(params.batch_size / num_gpus))

run_config = tf.estimator.RunConfig(
    tf_random_seed=RANDOM_SEED,
    save_checkpoints_steps=200,
    model_dir=model_dir,
    train_distribute=strategy
)

estimator = create_estimator(params, run_config)
run_experiment(estimator, params, run_config)

8. Extending a Premade Estimator

Add an evaluation metric


In [ ]:
def metric_fn(labels, predictions):

  metrics = {}
  
  label_index = tf.contrib.lookup.index_table_from_tensor(tf.constant(TARGET_LABELS)).lookup(labels)
  one_hot_labels = tf.one_hot(label_index, len(TARGET_LABELS))

  metrics['mirco_accuracy'] = tf.metrics.mean_per_class_accuracy(
    labels=label_index,
    predictions=predictions['class_ids'],
    num_classes=2)
  
  metrics['f1_score'] = tf.contrib.metrics.f1_score(
    labels=one_hot_labels,
    predictions=predictions['probabilities'])

  return metrics

params.max_steps = 1
estimator = create_estimator(params, run_config)
estimator = tf.contrib.estimator.add_metrics(estimator, metric_fn)
run_experiment(estimator, params, run_config)

Add Forward Features


In [ ]:
estimator = tf.contrib.estimator.forward_features(estimator, keys="row_identifier")

In [ ]:
def make_serving_input_receiver_fn():
  inputs = {}
  for feature_name in FEATURE_NAMES:
    dtype = tf.float32 if feature_name in NUMERIC_FEATURE_NAMES else tf.string
    inputs[feature_name] = tf.placeholder(shape=[None], dtype=dtype)
  
  processed_inputs,_ = process_features(inputs, None)
  processed_inputs['row_identifier'] = tf.placeholder(shape=[None], dtype=tf.string)
  return tf.estimator.export.build_raw_serving_input_receiver_fn(processed_inputs)

export_dir = os.path.join(model_dir, 'export')

if tf.gfile.Exists(export_dir):
    tf.gfile.DeleteRecursively(export_dir)
        
estimator.export_savedmodel(
    export_dir_base=export_dir,
    serving_input_receiver_fn=make_serving_input_receiver_fn()
)

In [ ]:
%%bash

saved_models_base=${MODEL_DIR}/export/
saved_model_dir=$(gsutil ls ${saved_models_base} | tail -n 1)
saved_model_cli show --dir=${saved_model_dir} --all

In [ ]:
export_dir = os.path.join(model_dir, 'export')
tf.gfile.ListDirectory(export_dir)[-1]
saved_model_dir = os.path.join(export_dir, tf.gfile.ListDirectory(export_dir)[-1])
print(saved_model_dir)
print ""

predictor_fn = tf.contrib.predictor.from_saved_model(
    export_dir = saved_model_dir,
    signature_def_key="predict"
)

output = predictor_fn(
    {   'row_identifier': ['key0123'],
        'age': [34.0],
        'workclass': ['Private'],
        'education': ['Doctorate'],
        'education_num': [10.0],
        'marital_status': ['Married-civ-spouse'],
        'occupation': ['Prof-specialty'],
        'relationship': ['Husband'],
        'race': ['White'],
        'gender': ['Male'],
        'capital_gain': [0.0], 
        'capital_loss': [0.0], 
        'hours_per_week': [40.0],
        'native_country':['Egyptian']
    }
)
print(output)

In [ ]:
def create_estimator(params, run_config):
  
  wide_columns, deep_columns = create_feature_columns()
  
  def _update_optimizer(initial_learning_rate, decay_steps):
    
    # learning_rate = tf.train.exponential_decay(
    #   initial_learning_rate,
    #   global_step=tf.train.get_global_step(),
    #   decay_steps=decay_steps,
    #   decay_rate=0.9
    # )

    learning_rate = tf.train.cosine_decay_restarts(
      initial_learning_rate,
      tf.train.get_global_step(),
      first_decay_steps=50,
      t_mul=2.0,
      m_mul=1.0,
      alpha=0.0,
    )

    tf.summary.scalar('learning_rate', learning_rate)

    return tf.train.AdamOptimizer(learning_rate=learning_rate)
    
  estimator = tf.estimator.DNNLinearCombinedClassifier(

    n_classes=len(TARGET_LABELS),
    label_vocabulary=TARGET_LABELS,
    weight_column=WEIGHT_COLUMN_NAME,

    dnn_feature_columns=deep_columns,
    dnn_optimizer=lambda: _update_optimizer(params.learning_rate, params.max_steps),
    dnn_hidden_units=params.hidden_units,
    dnn_dropout=params.dropout,
    batch_norm=True,
    
    linear_feature_columns=wide_columns,
    linear_optimizer='Ftrl',
    
    config=run_config
  )
  
  return estimator

In [ ]:
params.learning_rate = 0.1
params.max_steps = 1000
run_config = tf.estimator.RunConfig(
    tf_random_seed=RANDOM_SEED,
    save_checkpoints_steps=200,
    model_dir=model_dir,
)

if COLAB:
  from tensorboardcolab import *
  TensorBoardColab(graph_path=model_dir)

estimator = create_estimator(params, run_config)
run_experiment(estimator, params, run_config)

License

Author: Khalid Salama


Disclaimer: This is not an official Google product. The sample code provided for an educational purpose.


Copyright 2019 Google LLC

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.