Learning Objectives
In this notebook we'll use what we learned about feature columns to build a Wide & Deep model. Recall, that the idea behind Wide & Deep models is to join the two methods of learning through memorization and generalization by making a wide linear model and a deep learning model to accommodate both.
The Wide part of the model is associated with the memory element. In this case, we train a linear model with a wide set of crossed features and learn the correlation of this related data with the assigned label. The Deep part of the model is associated with the generalization element where we use embedding vectors for features. The best embeddings are then learned through the training process. While both of these methods can work well alone, Wide & Deep models excel by combining these techniques together.
In [ ]:
import tensorflow as tf
import numpy as np
import shutil
print(tf.__version__)
In [ ]:
!gsutil cp gs://cloud-training-demos/taxifare/small/*.csv .
!ls -l *.csv
In [ ]:
CSV_COLUMN_NAMES = ["fare_amount","dayofweek","hourofday","pickuplon","pickuplat","dropofflon","dropofflat"]
CSV_DEFAULTS = [[0.0],[1],[0],[-74.0],[40.0],[-74.0],[40.7]]
def read_dataset(csv_path):
def _parse_row(row):
# Decode the CSV row into list of TF tensors
fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)
# Pack the result into a dictionary
features = dict(zip(CSV_COLUMN_NAMES, fields))
# NEW: Add engineered features
features = add_engineered_features(features)
# Separate the label from the features
label = features.pop("fare_amount") # remove label from features and store
return features, label
# Create a dataset containing the text lines.
dataset = tf.data.Dataset.list_files(file_pattern = csv_path) # (i.e. data_file_*.csv)
dataset = dataset.flat_map(map_func = lambda filename:tf.data.TextLineDataset(filenames = filename).skip(count = 1))
# Parse each CSV row into correct (features,label) format for Estimator API
dataset = dataset.map(map_func = _parse_row)
return dataset
def train_input_fn(csv_path, batch_size = 128):
#1. Convert CSV into tf.data.Dataset with (features,label) format
dataset = read_dataset(csv_path)
#2. Shuffle, repeat, and batch the examples.
dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)
return dataset
def eval_input_fn(csv_path, batch_size = 128):
#1. Convert CSV into tf.data.Dataset with (features,label) format
dataset = read_dataset(csv_path)
#2.Batch the examples.
dataset = dataset.batch(batch_size = batch_size)
return dataset
For the Wide columns, we will create feature columns of crossed features. To do this, we'll create a collection of Tensorflow feature columns to pass to the tf.feature_column.crossed_column
constructor. The Deep columns will consist of numberic columns and any embedding columns we want to create.
In [ ]:
# 1. One hot encode dayofweek and hourofday
fc_dayofweek = tf.feature_column.categorical_column_with_identity(key = "dayofweek", num_buckets = 7)
fc_hourofday = tf.feature_column.categorical_column_with_identity(key = "hourofday", num_buckets = 24)
# 2. Bucketize latitudes and longitudes
NBUCKETS = 16
latbuckets = np.linspace(start = 38.0, stop = 42.0, num = NBUCKETS).tolist()
lonbuckets = np.linspace(start = -76.0, stop = -72.0, num = NBUCKETS).tolist()
fc_bucketized_plat = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "pickuplon"), boundaries = lonbuckets)
fc_bucketized_plon = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "pickuplat"), boundaries = latbuckets)
fc_bucketized_dlat = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "dropofflon"), boundaries = lonbuckets)
fc_bucketized_dlon = tf.feature_column.bucketized_column(source_column = tf.feature_column.numeric_column(key = "dropofflat"), boundaries = latbuckets)
# 3. Cross features to get combination of day and hour
fc_crossed_day_hr = tf.feature_column.crossed_column(keys = [fc_dayofweek, fc_hourofday], hash_bucket_size = 24 * 7)
fc_crossed_dloc = tf.feature_column.crossed_column(keys = [fc_bucketized_dlat, fc_bucketized_dlon], hash_bucket_size = NBUCKETS * NBUCKETS)
fc_crossed_ploc = tf.feature_column.crossed_column(keys = [fc_bucketized_plat, fc_bucketized_plon], hash_bucket_size = NBUCKETS * NBUCKETS)
fc_crossed_pd_pair = tf.feature_column.crossed_column(keys = [fc_crossed_dloc, fc_crossed_ploc], hash_bucket_size = NBUCKETS**4)
We also add our engineered features that we used previously.
In [ ]:
def add_engineered_features(features):
features["dayofweek"] = features["dayofweek"] - 1 # subtract one since our days of week are 1-7 instead of 0-6
features["latdiff"] = features["pickuplat"] - features["dropofflat"] # East/West
features["londiff"] = features["pickuplon"] - features["dropofflon"] # North/South
features["euclidean_dist"] = tf.sqrt(x = features["latdiff"]**2 + features["londiff"]**2)
return features
Next we gather the list of wide and deep feature columns we'll pass to our Wide & Deep model in Tensorflow. To do this, we'll create a function get_wide_deep
which will use our previously bucketized columns to collect crossed feature columns and sparse feature columns for our wide columns, and embedding feature columns and numeric features columns for the deep columns.
In [ ]:
def get_wide_deep():
# Wide columns are sparse, have linear relationship with the output
wide_columns = [
# Feature crosses
fc_crossed_day_hr, fc_crossed_dloc,
fc_crossed_ploc, fc_crossed_pd_pair,
# Sparse columns
fc_dayofweek, fc_hourofday
]
# Continuous columns are deep, have a complex relationship with the output
deep_columns = [
# Embedding_column to "group" together ...
tf.feature_column.embedding_column(categorical_column = fc_crossed_pd_pair, dimension = 10),
tf.feature_column.embedding_column(categorical_column = fc_crossed_day_hr, dimension = 10),
# Numeric columns
tf.feature_column.numeric_column(key = "pickuplat"),
tf.feature_column.numeric_column(key = "pickuplon"),
tf.feature_column.numeric_column(key = "dropofflon"),
tf.feature_column.numeric_column(key = "dropofflat"),
tf.feature_column.numeric_column(key = "latdiff"),
tf.feature_column.numeric_column(key = "londiff"),
tf.feature_column.numeric_column(key = "euclidean_dist"),
tf.feature_column.indicator_column(categorical_column = fc_crossed_day_hr),
]
return wide_columns, deep_columns
In [ ]:
def serving_input_receiver_fn():
receiver_tensors = {
'dayofweek' : tf.placeholder(dtype = tf.int32, shape = [None]), # shape is vector to allow batch of requests
'hourofday' : tf.placeholder(dtype = tf.int32, shape = [None]),
'pickuplon' : tf.placeholder(dtype = tf.float32, shape = [None]),
'pickuplat' : tf.placeholder(dtype = tf.float32, shape = [None]),
'dropofflat' : tf.placeholder(dtype = tf.float32, shape = [None]),
'dropofflon' : tf.placeholder(dtype = tf.float32, shape = [None]),
}
features = add_engineered_features(receiver_tensors) # 'features' is what is passed on to the model
return tf.estimator.export.ServingInputReceiver(features = features, receiver_tensors = receiver_tensors)
In [ ]:
%%time
OUTDIR = "taxi_trained_wd/500"
shutil.rmtree(path = OUTDIR, ignore_errors = True) # start fresh each time
tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file
tf.logging.set_verbosity(v = tf.logging.INFO) # so loss is printed during training
# Collect the wide and deep columns from above
wide_columns, deep_columns = get_wide_deep()
model = tf.estimator.DNNLinearCombinedRegressor(
model_dir = OUTDIR,
linear_feature_columns = wide_columns,
dnn_feature_columns = deep_columns,
dnn_hidden_units = [10,10], # specify neural architecture
config = tf.estimator.RunConfig(
tf_random_seed = 1, # for reproducibility
save_checkpoints_steps = 100 # checkpoint every N steps
)
)
# Add custom evaluation metric
def my_rmse(labels, predictions):
pred_values = tf.squeeze(input = predictions["predictions"], axis = -1)
return {"rmse": tf.metrics.root_mean_squared_error(labels = labels, predictions = pred_values)}
model = tf.contrib.estimator.add_metrics(estimator = model, metric_fn = my_rmse)
train_spec = tf.estimator.TrainSpec(
input_fn = lambda: train_input_fn("./taxi-train.csv"),
max_steps = 500)
exporter = tf.estimator.FinalExporter(name = "exporter", serving_input_receiver_fn = serving_input_receiver_fn) # export SavedModel once at the end of training
# Note: alternatively use tf.estimator.BestExporter to export at every checkpoint that has lower loss than the previous checkpoint
eval_spec = tf.estimator.EvalSpec(
input_fn = lambda: eval_input_fn("./taxi-valid.csv"),
steps = None,
start_delay_secs = 1, # wait at least N seconds before first evaluation (default 120)
throttle_secs = 1, # wait at least N seconds before each subsequent evaluation (default 600)
exporters = exporter) # export SavedModel once at the end of training
tf.estimator.train_and_evaluate(estimator = model, train_spec = train_spec, eval_spec = eval_spec)
In [ ]:
%%time
OUTDIR = "taxi_trained_wd/5000"
shutil.rmtree(path = OUTDIR, ignore_errors = True) # start fresh each time
tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file
tf.logging.set_verbosity(v = tf.logging.INFO) # so loss is printed during training
# Collect the wide and deep columns from above
wide_columns, deep_columns = get_wide_deep()
model = tf.estimator.DNNLinearCombinedRegressor(
model_dir = OUTDIR,
linear_feature_columns = wide_columns,
dnn_feature_columns = deep_columns,
dnn_hidden_units = [10,10], # specify neural architecture
config = tf.estimator.RunConfig(
tf_random_seed = 1, # for reproducibility
save_checkpoints_steps = 100 # checkpoint every N steps
)
)
# Add custom evaluation metric
def my_rmse(labels, predictions):
pred_values = tf.squeeze(input = predictions["predictions"], axis = -1)
return {"rmse": tf.metrics.root_mean_squared_error(labels = labels, predictions = pred_values)}
model = tf.contrib.estimator.add_metrics(estimator = model, metric_fn = my_rmse)
train_spec = tf.estimator.TrainSpec(
input_fn = lambda: train_input_fn("./taxi-train.csv"),
max_steps = 5000)
exporter = tf.estimator.FinalExporter(name = "exporter", serving_input_receiver_fn = serving_input_receiver_fn) # export SavedModel once at the end of training
# Note: alternatively use tf.estimator.BestExporter to export at every checkpoint that has lower loss than the previous checkpoint
eval_spec = tf.estimator.EvalSpec(
input_fn = lambda: eval_input_fn("./taxi-valid.csv"),
steps = None,
start_delay_secs = 1, # wait at least N seconds before first evaluation (default 120)
throttle_secs = 1, # wait at least N seconds before each subsequent evaluation (default 600)
exporters = exporter) # export SavedModel once at the end of training
tf.estimator.train_and_evaluate(estimator = model, train_spec = train_spec, eval_spec = eval_spec)
Our RMSE is better but still not as good as the DNN we built. It looks like RMSE may still be reducing, but training is getting slow so we should move to the cloud if we want to train longer.
Also we haven't explored our hyperparameters much. Is our neural architecture of two layers with 10 nodes each optimal?
In the next notebook we'll explore this.
Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License