Custom Estimator with Keras

Learning Objectives

  • Learn how to create custom estimator using tf.keras

Introduction

Up until now we've been limited in our model architectures to premade estimators. But what if we want more control over the model? We can use the popular Keras API to create a custom model. Keras is a high-level API to build and train deep learning models. It is user-friendly, modular and makes writing custom building blocks of Tensorflow code much easier.

Once we've build a Keras model we then it to an estimator using tf.keras.estimator.model_to_estimator()This gives us access to all the flexibility of Keras for creating deep learning models, but also the production readiness of the estimator framework!


In [ ]:
import tensorflow as tf
import numpy as np
import shutil
print(tf.__version__)

Train and Evaluate input functions

For the most part, we can use the same train and evaluation input functions that we had in previous labs. Note the function create_feature_keras_input below. We will use this to create the first layer of the model. This function is called in turn during the train_input_fn and eval_input_fn as well.


In [ ]:
CSV_COLUMN_NAMES = ["fare_amount","dayofweek","hourofday","pickuplon","pickuplat","dropofflon","dropofflat"]
CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]

def read_dataset(csv_path):
    def parse_row(row):
        # Decode the CSV row into list of TF tensors
        fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)

        # Pack the result into a dictionary
        features = dict(zip(CSV_COLUMN_NAMES, fields))
        
        # NEW: Add engineered features
        features = add_engineered_features(features)
        
        # Separate the label from the features
        label = features.pop("fare_amount") # remove label from features and store

        return features, label
    
    # Create a dataset containing the text lines.
    dataset = tf.data.Dataset.list_files(file_pattern = csv_path) # (i.e. data_file_*.csv)
    dataset = dataset.flat_map(map_func = lambda filename: tf.data.TextLineDataset(filenames = filename).skip(count = 1))

    # Parse each CSV row into correct (features,label) format for Estimator API
    dataset = dataset.map(map_func = parse_row)
    
    return dataset
  
def create_feature_keras_input(features, label):
    features = tf.feature_column.input_layer(features = features, feature_columns = create_feature_columns())
    return features, label

def train_input_fn(csv_path, batch_size = 128):
    #1. Convert CSV into tf.data.Dataset with (features, label) format
    dataset = read_dataset(csv_path)
      
    #2. Shuffle, repeat, and batch the examples.
    dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)
    
    #3. Create single feature tensor for input to Keras Model
    dataset = dataset.map(map_func = create_feature_keras_input)
   
    return dataset

def eval_input_fn(csv_path, batch_size = 128):
    #1. Convert CSV into tf.data.Dataset with (features, label) format
    dataset = read_dataset(csv_path)

    #2.Batch the examples.
    dataset = dataset.batch(batch_size = batch_size)
    
    #3. Create single feature tensor for input to Keras Model
    dataset = dataset.map(map_func = create_feature_keras_input)
   
    return dataset

Feature Engineering

We'll use the same engineered features that we had in previous labs.


In [ ]:
def add_engineered_features(features):
    features["dayofweek"] = features["dayofweek"] - 1 # subtract one since our days of week are 1-7 instead of 0-6
    
    features["latdiff"] = features["pickuplat"] - features["dropofflat"] # East/West
    features["londiff"] = features["pickuplon"] - features["dropofflon"] # North/South
    features["euclidean_dist"] = tf.sqrt(x = features["latdiff"]**2 + features["londiff"]**2)

    return features

In [ ]:
def create_feature_columns():
    # One hot encode dayofweek and hourofday
    fc_dayofweek = tf.feature_column.categorical_column_with_identity(key = "dayofweek", num_buckets = 7)
    fc_hourofday = tf.feature_column.categorical_column_with_identity(key = "hourofday", num_buckets = 24)

    # Cross features to get combination of day and hour
    fc_day_hr = tf.feature_column.crossed_column(keys = [fc_dayofweek, fc_hourofday], hash_bucket_size = 24 * 7)

    # Bucketize latitudes and longitudes
    NBUCKETS = 16
    latbuckets = np.linspace(start = 38.0, stop = 42.0, num = NBUCKETS).tolist()
    lonbuckets = np.linspace(start = -76.0, stop = -72.0, num = NBUCKETS).tolist()
    fc_bucketized_plat = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "pickuplon"), boundaries = lonbuckets)
    fc_bucketized_plon = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "pickuplat"), boundaries = latbuckets)
    fc_bucketized_dlat = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "dropofflon"), boundaries = lonbuckets)
    fc_bucketized_dlon = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "dropofflat"), boundaries = latbuckets)

    feature_columns = [
        #1. Engineered using tf.feature_column module
        tf.feature_column.indicator_column(categorical_column = fc_day_hr), # 168 columns
        fc_bucketized_plat, # 16 + 1 = 17 columns
        fc_bucketized_plon, # 16 + 1 = 17 columns
        fc_bucketized_dlat, # 16 + 1 = 17 columns
        fc_bucketized_dlon, # 16 + 1 = 17 columns
        #2. Engineered in input functions
        tf.feature_column.numeric_column(key = "latdiff"), # 1 column
        tf.feature_column.numeric_column(key = "londiff"), # 1 column
        tf.feature_column.numeric_column(key = "euclidean_dist") # 1 column
    ]
  
    return feature_columns

Calculate the number of feature columns that will be input to our Keras model


In [ ]:
num_feature_columns = 168 + (16 + 1) * 4 + 3
print("num_feature_columns = {}".format(num_feature_columns))

Build Custom Keras Model

Now we can begin building our Keras model. Have a look at the guide here to see more explanation.

Exercise 1

Complete the code in the cell below to add a sequence of dense layers using Keras's Sequential API. Create a model which consists of six layers with relu as activation function. Have a look at the documentation for tf.keras.layers.Dense to see which arguments to provide.


In [ ]:
def create_keras_model():
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.InputLayer(input_shape = (num_feature_columns,), name = "dense_input"))
    # TODO: Your code goes here
    # TODO: Your code goes here
    # TODO: Your code goes here
    # TODO: Your code goes here
    # TODO: Your code goes here
    # TODO: Your code goes here

    def rmse(y_true, y_pred): # Root Mean Squared Error
        return tf.sqrt(x = tf.reduce_mean(input_tensor = tf.square(x = y_pred - y_true)))

    model.compile(
        optimizer = tf.train.AdamOptimizer(),
        loss = "mean_squared_error",
        metrics = [rmse])
  
    return model

Serving input function

Once we've constructed our model in Keras, we next create the serving input function. This is also similar to what we have done in previous labs. Note that we use our create_feature_keras_input function again so that we perform our feature engineering during inference.## Build Custom Keras Model


In [ ]:
# Create serving input function
def serving_input_fn():
    feature_placeholders = {
        "dayofweek": tf.placeholder(dtype = tf.int32, shape = [None]),
        "hourofday": tf.placeholder(dtype = tf.int32, shape = [None]),
        "pickuplon": tf.placeholder(dtype = tf.float32, shape = [None]),
        "pickuplat": tf.placeholder(dtype = tf.float32, shape = [None]),
        "dropofflon": tf.placeholder(dtype = tf.float32, shape = [None]),
        "dropofflat": tf.placeholder(dtype = tf.float32, shape = [None]),
    }
  
    features = {key: tensor for key, tensor in feature_placeholders.items()}
  
    # Perform our feature engineering during inference as well
    features, _ = create_feature_keras_input((add_engineered_features(features)), None)
    
    return tf.estimator.export.ServingInputReceiver(features = {"dense_input": features}, receiver_tensors = feature_placeholders)

Train and Evaluate

To train our model, we can use train_and_evaluate as we have before. Note that we use tf.keras.estimator.model_to_estimator to create our estimator. It takes as arguments the compiled keras model, the OUTDIR, and optionally a tf.estimator.Runconfig. Have a look at the documentation for tf.keras.estimator.model_to_estimator to make sure you understand how arguments are used.

Exercise 2

Complete the code below to create an estimator out of the Keras model we built above.


In [ ]:
def train_and_evaluate(output_dir):
    tf.logging.set_verbosity(v = tf.logging.INFO) # so loss is printed during training
        
    estimator = tf.keras.estimator.model_to_estimator(
        # TODO: Your code goes here
    )

    train_spec = tf.estimator.TrainSpec(
        input_fn = lambda: train_input_fn(csv_path = "./taxi-train.csv"),
        max_steps = 500)

    exporter = tf.estimator.LatestExporter(name = 'exporter', serving_input_receiver_fn = serving_input_fn)

    eval_spec = tf.estimator.EvalSpec(
        input_fn = lambda: eval_input_fn(csv_path = "./taxi-valid.csv"),
        steps = None,
        start_delay_secs = 10, # wait at least N seconds before first evaluation (default 120)
        throttle_secs = 10, # wait at least N seconds before each subsequent evaluation (default 600)
        exporters = exporter) # export SavedModel once at the end of training

    tf.estimator.train_and_evaluate(
        estimator = estimator, 
        train_spec = train_spec, 
        eval_spec = eval_spec)

In [ ]:
%%time
OUTDIR = "taxi_trained"
shutil.rmtree(path = OUTDIR, ignore_errors = True) # start fresh each time
tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file

train_and_evaluate(OUTDIR)

Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License