Learning Objectives
serving_input_reciever_fn()
) which supports remote access to our model via REST APItf.estimator.train_and_evaluate()
method to periodically evaluate during training
In [ ]:
import tensorflow as tf
import shutil
print(tf.__version__)
In [ ]:
CSV_COLUMN_NAMES = ["fare_amount","dayofweek","hourofday","pickuplon","pickuplat","dropofflon","dropofflat"]
CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]
In [ ]:
def parse_row(row):
fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)
features = dict(zip(CSV_COLUMN_NAMES, fields))
label = features.pop("fare_amount")
return features, label
def read_dataset(csv_path):
dataset = tf.data.TextLineDataset(filenames = csv_path).skip(count = 1) # skip header
dataset = dataset.map(map_func = parse_row)
return dataset
def train_input_fn(csv_path, batch_size = 128):
dataset = read_dataset(csv_path)
dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)
return dataset
def eval_input_fn(csv_path, batch_size = 128):
dataset = read_dataset(csv_path)
dataset = dataset.batch(batch_size = batch_size)
return dataset
In [ ]:
FEATURE_NAMES = CSV_COLUMN_NAMES[1:] # all but first column
feature_cols = [tf.feature_column.numeric_column(key = k) for k in FEATURE_NAMES]
print(feature_cols)
In a prior notebook we used the estimator.predict()
function to get taxifare predictions. This worked fine because we had done our model training on the same machine.
However in a production setting this won't usually be the case. Our clients may be remote web servers, mobile apps and more. Instead of having to ship our model files to every client, it would be better to host our model in one place, and make it remotely accesible for prediction requests using a REST API.
The TensorFlow solution for this is a project called TF Serving, which is part of the larger Tensorflow Extended (TFX) platform that extends TensorFlow for production environments.
The interface between TensorFlow and TF Serving is a serving_input_receiver_fn()
. It has two jobs:
tf.placeholder
s to the graph to specify what type of tensors TF Serving should recieve during inference requests. The placeholders are specified as a dictionary objectThe function must return a tf.estimator.export.ServingInputReceiver
object, which packages the placeholders and the neccesary transformations together.
In the cell below, we implement the serving_input_receiver_fn
function that returns an instance of
tf.estimator.export.ServingInputReceiver(features, receiver_tensors)
. Have a look at the documentation for Tensorflow's ServingInputReceiver. Here, receiver_tensors
is a dictionary describing the JSON object received by the Cloud ML Engine API, and is a dictionary features
that has the structure as the feature dictionary accepted by our estimator.
We keep things simple by assuming that the API receives a JSON object that has already the correct structure
(i.e. features = receiver_tensors
).
In [ ]:
def serving_input_receiver_fn():
receiver_tensors = {
'dayofweek' : tf.placeholder(dtype = tf.int32, shape = [None]),
'hourofday' : tf.placeholder(dtype = tf.int32, shape = [None]),
'pickuplon' : tf.placeholder(dtype = tf.float32, shape = [None]),
'pickuplat' : tf.placeholder(dtype = tf.float32, shape = [None]),
'dropofflat' : tf.placeholder(dtype = tf.float32, shape = [None]),
'dropofflon' : tf.placeholder(dtype = tf.float32, shape = [None]),
}
features = receiver_tensors
return tf.estimator.export.ServingInputReceiver(features = features, receiver_tensors = receiver_tensors)
One issue with the previous notebooks is we only evaluate on our validation data once training is complete. This means we can't tell at what point overfitting began. What we really want is to evaluate at specified intervals during the training phase.
The Estimator API way of doing this is to replace estimator.train()
and estimator.evaluate()
with estimator.train_and_evaluate()
. This causes an evaluation to be done after every training checkpoint. However by default Tensorflow only checkpoints once every 10 minutes. Since this is less than the length of our total training we'd end up with the same behavior as before which is just one evaluation at the end of training.
To remedy this we speciy in the tf.estimator.RunConfig()
that TensorFlow should checkpoint every 100 steps.
The default evaluation metric average_loss
is MSE, but we want RMSE. Previously we just took the square root of the final average_loss
. However it would be better if we could calculate RMSE not just at the end, but for every intermediate checkpoint and plot the change over time in TensorBoard. tf.contrib.estimator.add_metrics()
allows us to do this. We wrap our estimator with it, and provide a custom evaluation function.
train_and_evaluate()
also allows us to use our serving_input_receiver_fn()
to export our models in the SavedModel format required by TF Serving.
Note: Training will be slower than the last notebook because we are now evaluating after every 100 train steps. Previously we didn't evaluate until training finished.
In the cell below, we create a instance of tf.estimator.RunConfig
named config
and pass to its
constructor information concerning:
In [ ]:
OUTDIR = "taxi_trained"
config = tf.estimator.RunConfig(
model_dir = OUTDIR,
tf_random_seed = 1,
save_checkpoints_steps = 100
)
Next we create a DNNRegressor
model with two layers of 10 neurons each using
the RunConfig
instance and the feature_cols
list you just created.
Note that we do not pass the model directory directly to the estimator constructor, since that info should
already be wrapped into the RunConfig
instance.
In [ ]:
model = tf.estimator.DNNRegressor(
hidden_units = [10,10],
feature_columns = feature_cols,
config = config
)
If we want to add a custom evaluation metric (one not included automatically with the canned DNNRegressor
estimator) we will can do that by wrapping our model with our custom metric function using the contrib
function .add_metrics
. We will implement a my_rmse
function that
labels
and a tensor of predictions
rmse
and with value the root mean square error between the labels and the predictionsYou can have a look at this blog post by Lak Lakshmanan on "How to extend a canned TensorFlow Estimator" for more information.
In [ ]:
def my_rmse(labels, predictions):
pred_values = tf.squeeze(input = predictions["predictions"], axis = -1)
return {
"rmse": tf.metrics.root_mean_squared_error(labels = labels, predictions = pred_values)
}
Run the following cell to add the custom metric you defined above to the model
:
In [ ]:
model = tf.contrib.estimator.add_metrics(estimator = model, metric_fn = my_rmse)
Next we'll create an instance of a tf.estimator.TrainSpec
using the train_input_fn
defined at the top of this file and train our model
with a max_steps
of 500. Note that the training data is loaded from ./taxi-train.csv
.
In [ ]:
train_spec = tf.estimator.TrainSpec(
input_fn = lambda: train_input_fn("./taxi-train.csv"),
max_steps = 500
)
Finally, we create an exporter using the serving_input_receiver_fn
defined at the beginning of this notebook.
You want to export the trained model and its checkpoints in the './exporter' subdirectory.
Use tf.estimator.FinalExport
to create the exporter intance.
Note: You may alternatively use tf.estimator.BestExporter
to export at every checkpoint that has lower loss than the previous checkpoint, instead
of exporting only the last checkpoint.
In [ ]:
exporter = tf.estimator.FinalExporter(name = "exporter", serving_input_receiver_fn = serving_input_receiver_fn)
In the cell below, create an instance of an EvalSpec
to which you specify
that
/.taxi-valid.csv
during evaluation (use the correct input function!)Note: We use the checkpoint setting above because we want to evaluate after every checkpoint. As long as checkpoints are > 1 sec apart this ensures the throttling never kicks in.
In [ ]:
eval_spec = tf.estimator.EvalSpec(
input_fn = lambda: eval_input_fn("./taxi-valid.csv"),
steps = None,
start_delay_secs = 1,
throttle_secs = 1,
exporters = exporter,
)
Run the following cell to start the training and evaluation as you specified them above:
In [ ]:
tf.logging.set_verbosity(tf.logging.INFO)
shutil.rmtree(path = OUTDIR, ignore_errors = True)
tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file
tf.estimator.train_and_evaluate(estimator = model,
train_spec = train_spec,
eval_spec = eval_spec)
Now in the output directory, in addition to the checkpoint files, you'll see a subfolder called 'export'. This contains one or models in the SavedModel format which is compatible with TF Serving. In the next notebook we will deploy the SavedModel behind a production grade REST API.
In [ ]:
!ls -R taxi_trained/export
TensorBoard is a web UI that allows us to visualize various aspects of our model, including the training and evaluation loss curves. Although you won't see the loss curves yet, it is best to launch TensorBoard before you start training so that you may see them update during a long running training process.
To get Tensorboard to work within a Deep Learning VM or Colab, we need to create a tunnel connection to your local machine. To do this we'll set up a tunnel connection with ngrok
. Using ngrok we'll then create a tunnel connection to our virtual machine's port 6006. We can view the Tensorboard results by following the link provided by ngrok after executing the following cell.
In [ ]:
get_ipython().system_raw(
"tensorboard --logdir {} --host 0.0.0.0 --port 6006 &"
.format(OUTDIR)
)
get_ipython().system_raw("./assets/ngrok http 6006 &")
In [ ]:
!curl -s http://localhost:4040/api/tunnels | python3 -c \
"import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"
In [ ]:
# this will kill the processes for Tensorboard
!ps aux | grep tensorboard | awk '{print $2}' | xargs kill
In [ ]:
# this will kill the processes for ngrok
!ps aux | grep ngrok | awk '{print $2}' | xargs kill
Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License