Learning Objectives
Up until now we've been limited in our model architectures to premade estimators. But what if we want more control over the model? We can use the popular Keras API to create a custom model. Keras is a high-level API to build and train deep learning models. It is user-friendly, modular and makes writing custom building blocks of Tensorflow code much easier.
Once we've build a Keras model we then it to an estimator using tf.keras.estimator.model_to_estimator()
This gives us access to all the flexibility of Keras for creating deep learning models, but also the production readiness of the estimator framework!
In [ ]:
import tensorflow as tf
import numpy as np
import shutil
print(tf.__version__)
For the most part, we can use the same train and evaluation input functions that we had in previous labs. Note the function create_feature_keras_input
below. We will use this to create the first layer of the model. This function is called in turn during the train_input_fn
and eval_input_fn
as well.
In [ ]:
CSV_COLUMN_NAMES = ["fare_amount","dayofweek","hourofday","pickuplon","pickuplat","dropofflon","dropofflat"]
CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]
def read_dataset(csv_path):
def parse_row(row):
# Decode the CSV row into list of TF tensors
fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)
# Pack the result into a dictionary
features = dict(zip(CSV_COLUMN_NAMES, fields))
# NEW: Add engineered features
features = add_engineered_features(features)
# Separate the label from the features
label = features.pop("fare_amount") # remove label from features and store
return features, label
# Create a dataset containing the text lines.
dataset = tf.data.Dataset.list_files(file_pattern = csv_path) # (i.e. data_file_*.csv)
dataset = dataset.flat_map(map_func = lambda filename: tf.data.TextLineDataset(filenames = filename).skip(count = 1))
# Parse each CSV row into correct (features,label) format for Estimator API
dataset = dataset.map(map_func = parse_row)
return dataset
def create_feature_keras_input(features, label):
features = tf.feature_column.input_layer(features = features, feature_columns = create_feature_columns())
return features, label
def train_input_fn(csv_path, batch_size = 128):
#1. Convert CSV into tf.data.Dataset with (features, label) format
dataset = read_dataset(csv_path)
#2. Shuffle, repeat, and batch the examples.
dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)
#3. Create single feature tensor for input to Keras Model
dataset = dataset.map(map_func = create_feature_keras_input)
return dataset
def eval_input_fn(csv_path, batch_size = 128):
#1. Convert CSV into tf.data.Dataset with (features, label) format
dataset = read_dataset(csv_path)
#2.Batch the examples.
dataset = dataset.batch(batch_size = batch_size)
#3. Create single feature tensor for input to Keras Model
dataset = dataset.map(map_func = create_feature_keras_input)
return dataset
In [ ]:
def add_engineered_features(features):
features["dayofweek"] = features["dayofweek"] - 1 # subtract one since our days of week are 1-7 instead of 0-6
features["latdiff"] = features["pickuplat"] - features["dropofflat"] # East/West
features["londiff"] = features["pickuplon"] - features["dropofflon"] # North/South
features["euclidean_dist"] = tf.sqrt(x = features["latdiff"]**2 + features["londiff"]**2)
return features
In [ ]:
def create_feature_columns():
# One hot encode dayofweek and hourofday
fc_dayofweek = tf.feature_column.categorical_column_with_identity(key = "dayofweek", num_buckets = 7)
fc_hourofday = tf.feature_column.categorical_column_with_identity(key = "hourofday", num_buckets = 24)
# Cross features to get combination of day and hour
fc_day_hr = tf.feature_column.crossed_column(keys = [fc_dayofweek, fc_hourofday], hash_bucket_size = 24 * 7)
# Bucketize latitudes and longitudes
NBUCKETS = 16
latbuckets = np.linspace(start = 38.0, stop = 42.0, num = NBUCKETS).tolist()
lonbuckets = np.linspace(start = -76.0, stop = -72.0, num = NBUCKETS).tolist()
fc_bucketized_plat = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "pickuplon"), boundaries = lonbuckets)
fc_bucketized_plon = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "pickuplat"), boundaries = latbuckets)
fc_bucketized_dlat = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "dropofflon"), boundaries = lonbuckets)
fc_bucketized_dlon = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "dropofflat"), boundaries = latbuckets)
feature_columns = [
#1. Engineered using tf.feature_column module
tf.feature_column.indicator_column(categorical_column = fc_day_hr), # 168 columns
fc_bucketized_plat, # 16 + 1 = 17 columns
fc_bucketized_plon, # 16 + 1 = 17 columns
fc_bucketized_dlat, # 16 + 1 = 17 columns
fc_bucketized_dlon, # 16 + 1 = 17 columns
#2. Engineered in input functions
tf.feature_column.numeric_column(key = "latdiff"), # 1 column
tf.feature_column.numeric_column(key = "londiff"), # 1 column
tf.feature_column.numeric_column(key = "euclidean_dist") # 1 column
]
return feature_columns
Calculate the number of feature columns that will be input to our Keras model
In [ ]:
num_feature_columns = 168 + (16 + 1) * 4 + 3
print("num_feature_columns = {}".format(num_feature_columns))
Now we can begin building our Keras model. Have a look at the guide here to see more explanation.
Complete the code in the cell below to add a sequence of dense layers using Keras's Sequential
API. Create a model which consists of six layers with relu
as activation function. Have a look at the documentation for tf.keras.layers.Dense to see which arguments to provide.
In [ ]:
def create_keras_model():
model = tf.keras.Sequential()
model.add(tf.keras.layers.InputLayer(input_shape = (num_feature_columns,), name = "dense_input"))
# TODO: Your code goes here
# TODO: Your code goes here
# TODO: Your code goes here
# TODO: Your code goes here
# TODO: Your code goes here
# TODO: Your code goes here
def rmse(y_true, y_pred): # Root Mean Squared Error
return tf.sqrt(x = tf.reduce_mean(input_tensor = tf.square(x = y_pred - y_true)))
model.compile(
optimizer = tf.train.AdamOptimizer(),
loss = "mean_squared_error",
metrics = [rmse])
return model
Once we've constructed our model in Keras, we next create the serving input function. This is also similar to what we have done in previous labs. Note that we use our create_feature_keras_input
function again so that we perform our feature engineering during inference.## Build Custom Keras Model
In [ ]:
# Create serving input function
def serving_input_fn():
feature_placeholders = {
"dayofweek": tf.placeholder(dtype = tf.int32, shape = [None]),
"hourofday": tf.placeholder(dtype = tf.int32, shape = [None]),
"pickuplon": tf.placeholder(dtype = tf.float32, shape = [None]),
"pickuplat": tf.placeholder(dtype = tf.float32, shape = [None]),
"dropofflon": tf.placeholder(dtype = tf.float32, shape = [None]),
"dropofflat": tf.placeholder(dtype = tf.float32, shape = [None]),
}
features = {key: tensor for key, tensor in feature_placeholders.items()}
# Perform our feature engineering during inference as well
features, _ = create_feature_keras_input((add_engineered_features(features)), None)
return tf.estimator.export.ServingInputReceiver(features = {"dense_input": features}, receiver_tensors = feature_placeholders)
To train our model, we can use train_and_evaluate
as we have before. Note that we use tf.keras.estimator.model_to_estimator
to create our estimator. It takes as arguments the compiled keras model, the OUTDIR, and optionally a tf.estimator.Runconfig
. Have a look at the documentation for tf.keras.estimator.model_to_estimator to make sure you understand how arguments are used.
In [ ]:
def train_and_evaluate(output_dir):
tf.logging.set_verbosity(v = tf.logging.INFO) # so loss is printed during training
estimator = tf.keras.estimator.model_to_estimator(
# TODO: Your code goes here
)
train_spec = tf.estimator.TrainSpec(
input_fn = lambda: train_input_fn(csv_path = "./taxi-train.csv"),
max_steps = 500)
exporter = tf.estimator.LatestExporter(name = 'exporter', serving_input_receiver_fn = serving_input_fn)
eval_spec = tf.estimator.EvalSpec(
input_fn = lambda: eval_input_fn(csv_path = "./taxi-valid.csv"),
steps = None,
start_delay_secs = 10, # wait at least N seconds before first evaluation (default 120)
throttle_secs = 10, # wait at least N seconds before each subsequent evaluation (default 600)
exporters = exporter) # export SavedModel once at the end of training
tf.estimator.train_and_evaluate(
estimator = estimator,
train_spec = train_spec,
eval_spec = eval_spec)
In [ ]:
%%time
OUTDIR = "taxi_trained"
shutil.rmtree(path = OUTDIR, ignore_errors = True) # start fresh each time
tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file
train_and_evaluate(OUTDIR)
Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License