Learning Objectives
serving_input_reciever_fn()
) which supports remote access to our model via REST APItf.estimator.train_and_evaluate()
method to periodically evaluate during training
In [ ]:
import tensorflow as tf
import shutil
print(tf.__version__)
In [ ]:
CSV_COLUMN_NAMES = ["fare_amount","dayofweek","hourofday","pickuplon","pickuplat","dropofflon","dropofflat"]
CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]
In [ ]:
def parse_row(row):
fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)
features = dict(zip(CSV_COLUMN_NAMES, fields))
label = features.pop("fare_amount")
return features, label
def read_dataset(csv_path):
dataset = tf.data.TextLineDataset(filenames = csv_path).skip(count = 1) # skip header
dataset = dataset.map(map_func = parse_row)
return dataset
def train_input_fn(csv_path, batch_size = 128):
dataset = read_dataset(csv_path)
dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)
return dataset
def eval_input_fn(csv_path, batch_size = 128):
dataset = read_dataset(csv_path)
dataset = dataset.batch(batch_size = batch_size)
return dataset
In [ ]:
FEATURE_NAMES = CSV_COLUMN_NAMES[1:] # all but first column
feature_cols = [tf.feature_column.numeric_column(key = k) for k in FEATURE_NAMES]
feature_cols
In a prior notebook we used the estimator.predict()
function to get taxifare predictions. This worked fine because we had done our model training on the same machine.
However in a production setting this won't usually be the case. Our clients may be remote web servers, mobile apps and more. Instead of having to ship our model files to every client, it would be better to host our model in one place, and make it remotely accesible for prediction requests using a REST API.
The TensorFlow solution for this is a project called TF Serving, which is part of the larger Tensorflow Extended (TFX) platform that extends TensorFlow for production environments.
The interface between TensorFlow and TF Serving is a serving_input_receiver_fn()
. It has two jobs:
tf.placeholder
s to the graph to specify what type of tensors TF Serving should recieve during inference requests. The placeholders are specified as a dictionary objectThe function must return a tf.estimator.export.ServingInputReceiver
object, which packages the placeholders and the neccesary transformations together.
In the cell below, implement a serving_input_receiver_fn
function that returns an instance of
tf.estimator.export.ServingInputReceiver(features, receiver_tensors)
. Have a look at the documentation for Tensorflow's ServingInputReceiver. Here receiver_tensors
is a dictionary describing the JSON object received by the Cloud ML Engine API, and is a dictionary features
that has the structure as the feature dictionary accepted by our estimator.
Here we keep things simple by assuming that the API receives a JSON object that has already the correct structure
(i.e. features = receiver_tensors
):
In [ ]:
def serving_input_receiver_fn():
receiver_tensors = # TODO: Your code goes here
features = receiver_tensors
return tf.estimator.export.ServingInputReceiver(features = features, receiver_tensors = receiver_tensors)
One issue with the previous notebooks is we only evaluate on our validation data once training is complete. This means we can't tell at what point overfitting began. What we really want is to evaluate at specified intervals during the training phase.
The Estimator API way of doing this is to replace estimator.train()
and estimator.evaluate()
with estimator.train_and_evaluate()
. This causes an evaluation to be done after every training checkpoint. However by default Tensorflow only checkpoints once every 10 minutes. Since this is less than the length of our total training we'd end up with the same behavior as before which is just one evaluation at the end of training.
To remedy this we speciy in the tf.estimator.RunConfig()
that TensorFlow should checkpoint every 100 steps.
The default evaluation metric average_loss
is MSE, but we want RMSE. Previously we just took the square root of the final average_loss
. However it would be better if we could calculate RMSE not just at the end, but for every intermediate checkpoint and plot the change over time in TensorBoard. tf.contrib.estimator.add_metrics()
allows us to do this. We wrap our estimator with it, and provide a custom evaluation function.
train_and_evaluate()
also allows us to use our serving_input_receiver_fn()
to export our models in the SavedModel format required by TF Serving.
Note: Training will be slower than the last notebook because we are now evaluating after every 100 train steps. Previously we didn't evaluate until training finished.
In the cell below, create a instance of tf.estimator.RunConfig
named config
and pass to its
constructor information concerning:
To remind yourself what arguments tf.estimator.RunConfig
takes have a look at the documentation.
In [ ]:
OUTDIR = "taxi_trained"
config = tf.estimator.RunConfig(
# TODO: Your code goes here
)
In the cell below, create a DNNRegressor
model with two layers of 10 neurons each using
the RunConfig
instance and the feature_cols
list you just created.
Note that we do not need to pass the model directory directly to the estimator constructor, since that info should
already be wrapped into the RunConfig
instance.
In [ ]:
model = tf.estimator.DNNRegressor(
# TODO: Your code goes here
)
If we want to add a custom evaluation metric (one not included automatically with the canned DNNRegressor
estimator) we will can do that by wrapping our model with our custom metric function using the contrib
function .add_metrics
. We will implement a my_rmse
function that
labels
and a tensor of predictions
rmse
and with value the root mean square error between the labels and the predictionsYou can have a look at this blog post by Lak Lakshmanan on "How to extend a canned TensorFlow Estimator" for more information.
Implement a my_rmse
function that
labels
and a tensor of predictions
rmse
and with value the root mean square error between the labels and the predictionsHint: Have a look at the Tensorflow documentation for tf.metrics.root_mean_squared_error
. You will have to do some preliminary step to predictions
before you can compute the RMSE. In fact, you may notice that you get a shape error if you try to use the prediction values as is. It may help to use tf.squeeze
. Have a closer look at what tf.sqeeze
does in the docs here.
In [ ]:
def my_rmse(labels, predictions):
pred_values = # TODO: Your code goes here
return {
"rmse": # TODO: Your code goes here
}
Run the following cell to add the custom metric you defined above to the model
:
In [ ]:
model = tf.contrib.estimator.add_metrics(estimator = model, metric_fn = my_rmse)
In the cell block below, create an instance of a tf.estimator.TrainSpec
using the train_input_fn
defined at the top of this file and
with a max_steps
of 500. Note, the training data should be loaded from ./taxi-train.csv
. See the details of how to implement a Tensorflow TrainSpec
in the documentation.
Hint: You may need to use a lambda
function to pass the
training input function correctly.
In [ ]:
train_spec = tf.estimator.TrainSpec(
input_fn = # TODO: Your code goes here
max_steps = # TODO: Your code goes here
)
Next, create an exporter using the serving_input_receiver_fn
defined at the beginning of this notebook. You want to export the trained model and its checkpoints in the './exporter' subdirectory. Use tf.estimator.FinalExporter
to create the exporter intance. Have a look at the documentation for FinalExporter to ensure proper usage.
Note: You may alternatively use tf.estimator.BestExporter
to export at every checkpoint that has lower loss than the previous checkpoint, instead
of exporting only the last checkpoint.
In [ ]:
exporter = # TODO: Your code goes here
In the cell below, create an instance of an EvalSpec
to which you specify that
/.taxi-valid.csv
during evaluation (use the correct input function!)Look at the documentaiton for tf.estimator.EvalSpec to help.
Note: We use the checkpoint setting above because we want to evaluate after every checkpoint. As long as checkpoints are > 1 sec apart this ensures the throttling never kicks in.
In [ ]:
eval_spec = # TODO: Your code goes here
Finally we use tf.estimator.train_and_evaluate
to start the training and evaluation as you specified them above. Complete the code in the cell below, providing the necessary arguments. Have a look at the documentation for the train_and_evaluate method to make sure you pass everything it needs.
In [ ]:
tf.logging.set_verbosity(tf.logging.INFO)
shutil.rmtree(path = OUTDIR, ignore_errors = True)
tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file
tf.estimator.train_and_evaluate(# TODO: Your code goes here
)
Now in the output directory, in addition to the checkpoint files, you'll see a subfolder called 'export'. This contains one or models in the SavedModel format which is compatible with TF Serving. In the next notebook we will deploy the SavedModel behind a production grade REST API.
In [ ]:
!ls -R taxi_trained/export
TensorBoard is a web UI that allows us to visualize various aspects of our model, including the training and evaluation loss curves. Although you won't see the loss curves yet, it is best to launch TensorBoard before you start training so that you may see them update during a long running training process.
To get Tensorboard to work within a Deep Learning VM or Colab, we need to create a tunnel connection to your local machine. To do this we'll set up a tunnel connection with ngrok
. Using ngrok we'll then create a tunnel connection to our virtual machine's port 6006. We can view the Tensorboard results by following the link provided by ngrok after executing the following cell.
In [ ]:
get_ipython().system_raw(
"tensorboard --logdir {} --host 0.0.0.0 --port 6006 &"
.format(OUTDIR)
)
get_ipython().system_raw("../assets/ngrok http 6006 &")
In [ ]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \
"import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"
In [ ]:
# this will kill the processes for Tensorboard
!ps aux | grep tensorboard | awk '{print $2}' | xargs kill
In [ ]:
# this will kill the processes for ngrok
!ps aux | grep ngrok | awk '{print $2}' | xargs kill
Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License