Introducing tf.estimator.train_and_evaluate()

Learning Objectives

  • Introduce new type of input function (serving_input_reciever_fn()) which supports remote access to our model via REST API
  • Use the tf.estimator.train_and_evaluate() method to periodically evaluate during training
  • Practice using TensorBoard to visualize training and evaluation loss curves

Introduction

In this notebook, we'll see how to use the train_and_evaluate method within tf.estimator to train and evaluate our machin learning model.

Run the following cell and reset the session if needed:


In [ ]:
import tensorflow as tf
import shutil
print(tf.__version__)

Train and Evaluate Input Functions

We'll use the same train and evaluation input functions that we created before.


In [ ]:
CSV_COLUMN_NAMES = ["fare_amount","dayofweek","hourofday","pickuplon","pickuplat","dropofflon","dropofflat"]
CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]

In [ ]:
def parse_row(row):
    fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)
    features = dict(zip(CSV_COLUMN_NAMES, fields))
    label = features.pop("fare_amount")
    return features, label

def read_dataset(csv_path):
    dataset = tf.data.TextLineDataset(filenames = csv_path).skip(count = 1) # skip header
    dataset = dataset.map(map_func = parse_row)
    return dataset

def train_input_fn(csv_path, batch_size = 128):
    dataset = read_dataset(csv_path)
    dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)
    return dataset

def eval_input_fn(csv_path, batch_size = 128):
    dataset = read_dataset(csv_path)
    dataset = dataset.batch(batch_size = batch_size)
    return dataset

Feature Columns

We also create the feature columns for the model the same as before.


In [ ]:
FEATURE_NAMES = CSV_COLUMN_NAMES[1:] # all but first column

feature_cols = [tf.feature_column.numeric_column(key = k) for k in FEATURE_NAMES]
print(feature_cols)

Serving Input Receiver Function

In a prior notebook we used the estimator.predict() function to get taxifare predictions. This worked fine because we had done our model training on the same machine.

However in a production setting this won't usually be the case. Our clients may be remote web servers, mobile apps and more. Instead of having to ship our model files to every client, it would be better to host our model in one place, and make it remotely accesible for prediction requests using a REST API.

The TensorFlow solution for this is a project called TF Serving, which is part of the larger Tensorflow Extended (TFX) platform that extends TensorFlow for production environments.

The interface between TensorFlow and TF Serving is a serving_input_receiver_fn(). It has two jobs:

  • To add tf.placeholders to the graph to specify what type of tensors TF Serving should recieve during inference requests. The placeholders are specified as a dictionary object
  • To add any additional ops needed to convert data from the client into the tensors expected by the model.

The function must return a tf.estimator.export.ServingInputReceiver object, which packages the placeholders and the neccesary transformations together.

In the cell below, we implement the serving_input_receiver_fn function that returns an instance of tf.estimator.export.ServingInputReceiver(features, receiver_tensors). Have a look at the documentation for Tensorflow's ServingInputReceiver. Here, receiver_tensors is a dictionary describing the JSON object received by the Cloud ML Engine API, and is a dictionary features that has the structure as the feature dictionary accepted by our estimator.

We keep things simple by assuming that the API receives a JSON object that has already the correct structure (i.e. features = receiver_tensors).


In [ ]:
def serving_input_receiver_fn():
    receiver_tensors = {
        'dayofweek' : tf.placeholder(dtype = tf.int32, shape = [None]),
        'hourofday' : tf.placeholder(dtype = tf.int32, shape = [None]),
        'pickuplon' : tf.placeholder(dtype = tf.float32, shape = [None]), 
        'pickuplat' : tf.placeholder(dtype = tf.float32, shape = [None]),
        'dropofflat' : tf.placeholder(dtype = tf.float32, shape = [None]),
        'dropofflon' : tf.placeholder(dtype = tf.float32, shape = [None]),
    }
    
    features = receiver_tensors 
    return tf.estimator.export.ServingInputReceiver(features = features, receiver_tensors = receiver_tensors)

Train and Evaluate

One issue with the previous notebooks is we only evaluate on our validation data once training is complete. This means we can't tell at what point overfitting began. What we really want is to evaluate at specified intervals during the training phase.

The Estimator API way of doing this is to replace estimator.train() and estimator.evaluate() with estimator.train_and_evaluate(). This causes an evaluation to be done after every training checkpoint. However by default Tensorflow only checkpoints once every 10 minutes. Since this is less than the length of our total training we'd end up with the same behavior as before which is just one evaluation at the end of training.

To remedy this we speciy in the tf.estimator.RunConfig() that TensorFlow should checkpoint every 100 steps.

The default evaluation metric average_loss is MSE, but we want RMSE. Previously we just took the square root of the final average_loss. However it would be better if we could calculate RMSE not just at the end, but for every intermediate checkpoint and plot the change over time in TensorBoard. tf.contrib.estimator.add_metrics() allows us to do this. We wrap our estimator with it, and provide a custom evaluation function.

train_and_evaluate() also allows us to use our serving_input_receiver_fn() to export our models in the SavedModel format required by TF Serving.

Note: Training will be slower than the last notebook because we are now evaluating after every 100 train steps. Previously we didn't evaluate until training finished.

In the cell below, we create a instance of tf.estimator.RunConfig named config and pass to its constructor information concerning:

  • the directory where we want the trained model and its checkpoints to be saved
  • the random seed which we want to be set to 1
  • the cadence at which we want the model to create checkpoints (every 100 steps)

In [ ]:
OUTDIR = "taxi_trained"

config = tf.estimator.RunConfig(
    model_dir = OUTDIR,
    tf_random_seed = 1,
    save_checkpoints_steps = 100
)

Next we create a DNNRegressor model with two layers of 10 neurons each using the RunConfig instance and the feature_cols list you just created.

Note that we do not pass the model directory directly to the estimator constructor, since that info should already be wrapped into the RunConfig instance.


In [ ]:
model = tf.estimator.DNNRegressor(
  hidden_units = [10,10], 
  feature_columns = feature_cols, 
  config = config
)

Adding custom evaluation metrics

If we want to add a custom evaluation metric (one not included automatically with the canned DNNRegressor estimator) we will can do that by wrapping our model with our custom metric function using the contrib function .add_metrics. We will implement a my_rmse function that

  • takes as input a tensor of labels and a tensor of predictions
  • returns a dictionary with the single key rmse and with value the root mean square error between the labels and the predictions

You can have a look at this blog post by Lak Lakshmanan on "How to extend a canned TensorFlow Estimator" for more information.


In [ ]:
def my_rmse(labels, predictions):  
    pred_values = tf.squeeze(input = predictions["predictions"], axis = -1)
    return {
        "rmse": tf.metrics.root_mean_squared_error(labels = labels, predictions = pred_values)
    }

Run the following cell to add the custom metric you defined above to the model:


In [ ]:
model = tf.contrib.estimator.add_metrics(estimator = model, metric_fn = my_rmse)

Next we'll create an instance of a tf.estimator.TrainSpec using the train_input_fn defined at the top of this file and train our model with a max_steps of 500. Note that the training data is loaded from ./taxi-train.csv.


In [ ]:
train_spec = tf.estimator.TrainSpec(
    input_fn = lambda: train_input_fn("./taxi-train.csv"),
    max_steps = 500
)

Finally, we create an exporter using the serving_input_receiver_fn defined at the beginning of this notebook.

You want to export the trained model and its checkpoints in the './exporter' subdirectory.

Use tf.estimator.FinalExport to create the exporter intance.

Note: You may alternatively use tf.estimator.BestExporter to export at every checkpoint that has lower loss than the previous checkpoint, instead of exporting only the last checkpoint.


In [ ]:
exporter = tf.estimator.FinalExporter(name = "exporter", serving_input_receiver_fn = serving_input_receiver_fn)

In the cell below, create an instance of an EvalSpec to which you specify that

  • the data should be loaded from /.taxi-valid.csv during evaluation (use the correct input function!)
  • the exporter you defined above should be used
  • the first evaluation should start after 1 second of training
  • and then be repeated every 1 second

Note: We use the checkpoint setting above because we want to evaluate after every checkpoint. As long as checkpoints are > 1 sec apart this ensures the throttling never kicks in.


In [ ]:
eval_spec = tf.estimator.EvalSpec(
    input_fn = lambda: eval_input_fn("./taxi-valid.csv"),
    steps = None,
    start_delay_secs = 1,
    throttle_secs = 1,
    exporters = exporter,
)

Run the following cell to start the training and evaluation as you specified them above:


In [ ]:
tf.logging.set_verbosity(tf.logging.INFO) 
shutil.rmtree(path = OUTDIR, ignore_errors = True)
tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file

tf.estimator.train_and_evaluate(estimator = model, 
                                train_spec = train_spec, 
                                eval_spec = eval_spec)

Inspect the export folder

Now in the output directory, in addition to the checkpoint files, you'll see a subfolder called 'export'. This contains one or models in the SavedModel format which is compatible with TF Serving. In the next notebook we will deploy the SavedModel behind a production grade REST API.


In [ ]:
!ls -R taxi_trained/export

Monitoring with TensorBoard

TensorBoard is a web UI that allows us to visualize various aspects of our model, including the training and evaluation loss curves. Although you won't see the loss curves yet, it is best to launch TensorBoard before you start training so that you may see them update during a long running training process.

To get Tensorboard to work within a Deep Learning VM or Colab, we need to create a tunnel connection to your local machine. To do this we'll set up a tunnel connection with ngrok. Using ngrok we'll then create a tunnel connection to our virtual machine's port 6006. We can view the Tensorboard results by following the link provided by ngrok after executing the following cell.


In [ ]:
get_ipython().system_raw(
    "tensorboard --logdir {} --host 0.0.0.0 --port 6006 &"
    .format(OUTDIR)
)

get_ipython().system_raw("./assets/ngrok http 6006 &")

In [ ]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

Tensorboard cleanup

To close the tunnel connection to Tensorboard, we can find the PIDs for ngrok and Tensorboard and stop them.


In [ ]:
# this will kill the processes for Tensorboard
!ps aux | grep tensorboard | awk '{print $2}' | xargs kill

In [ ]:
# this will kill the processes for ngrok
!ps aux | grep ngrok | awk '{print $2}' | xargs kill

Challenge exercise

Modify your solution to the challenge exercise in c_dataset.ipynb appropriately.

Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License