This is a continuation of our first Keras models we created earlier but now with more feature engineering.
Each learning objective will correspond to a #TODO in this student lab notebook -- try to complete this notebook first and then review the solution notebook.
Let's start off with the Python imports that we need.
In [ ]:
!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst
In [ ]:
!pip install tensorflow==2.1 --user
Please ignore any compatibility warnings and errors. Make sure to restart your kernel to ensure this change has taken place.
In [ ]:
%%bash
export PROJECT=$(gcloud config list project --format "value(core.project)")
echo "Your current GCP Project Name is: "$PROJECT
In [ ]:
import os, json, math, shutil
import datetime
import numpy as np
import logging
# SET TF ERROR LOG VERBOSITY
logging.getLogger("tensorflow").setLevel(logging.ERROR)
import tensorflow as tf
print(tf.version.VERSION)
PROJECT = "your-gcp-project-here" # REPLACE WITH YOUR PROJECT NAME
REGION = "us-central1" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1
# Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["REGION"] = REGION
os.environ["BUCKET"] = PROJECT # DEFAULT BUCKET WILL BE PROJECT ID
if PROJECT == "your-gcp-project-here":
print("Don't forget to update your PROJECT name! Currently:", PROJECT)
In [ ]:
%%bash
## Create new ML GCS bucket if it doesn't exist already...
exists=$(gsutil ls -d | grep -w gs://${PROJECT}-ml/)
if [ -n "$exists" ]; then
echo -e "Bucket exists, let's not recreate it."
else
echo "Creating a new GCS bucket."
gsutil mb -l ${REGION} gs://${PROJECT}-ml
echo -e "\nHere are your current buckets:"
gsutil ls
fi
In [ ]:
# Note that this cell is special. It's got a tag (you can view tags by clicking on the wrench icon on the left menu in Jupyter)
# These are parameters that we will configure so that we can schedule this notebook
DATADIR = '../../data'
OUTDIR = './trained_model'
NBUCKETS = 10 # for feature crossing
TRAIN_BATCH_SIZE = 32
NUM_TRAIN_EXAMPLES = 10000 * 5 # remember the training dataset repeats, so this will wrap around
NUM_EVALS = 5 # evaluate this many times
NUM_EVAL_EXAMPLES = 10000 # enough to get a reasonable sample, but no so much that it slows down
We will start with the CSV files that we wrote out in the first notebook of this sequence. Just so you don't have to run the notebook, we saved a copy in ../data
In [ ]:
if DATADIR[:5] == 'gs://':
!gsutil ls $DATADIR/*.csv
else:
!ls -l $DATADIR/*.csv
We wrote these cells in the third notebook of this sequence.
In [ ]:
CSV_COLUMNS = ['fare_amount', 'pickup_datetime',
'pickup_longitude', 'pickup_latitude',
'dropoff_longitude', 'dropoff_latitude',
'passenger_count', 'key']
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0],['na'],[0.0],[0.0],[0.0],[0.0],[0.0],['na']]
In [ ]:
def features_and_labels(row_data):
for unwanted_col in ['key']: # keep the pickup_datetime!
row_data.pop(unwanted_col)
label = row_data.pop(LABEL_COLUMN)
return row_data, label # features, label
# load the training data
def load_dataset(pattern, batch_size=1, mode=tf.estimator.ModeKeys.EVAL):
pattern = '{}/{}'.format(DATADIR, pattern)
dataset = (
# TODO 1: Recall from earlier how you used tf.data to read the CSV files (no changes needed):
tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)
.map(features_and_labels) # features, label
)
if mode == tf.estimator.ModeKeys.TRAIN:
print("Repeating training dataset indefinitely")
dataset = dataset.shuffle(1000).repeat()
dataset = dataset.prefetch(1) # take advantage of multi-threading; 1=AUTOTUNE
return dataset
In [ ]:
def parse_datetime(s):
if type(s) is not str:
s = s.numpy().decode('utf-8') # if it is a Tensor
return datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S %Z")
In [ ]:
for s in ['2012-07-05 14:18:00 UTC']:
print(s)
for ts in [parse_datetime(s), parse_datetime(tf.constant(s))]: # as string, as tensor
print(ts.weekday())
DAYS = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']
print(DAYS[ts.weekday()])
In [ ]:
## Add transformations
def euclidean(params):
lon1, lat1, lon2, lat2 = params
# TODO 2: Create two new features called londiff and latdiff
# These should be the difference between lon - lon and lat - lat
londiff =
latdiff =
return tf.sqrt(londiff*londiff + latdiff*latdiff)
DAYS = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']
def get_dayofweek(s):
ts = parse_datetime(s)
return DAYS[ts.weekday()]
@tf.function
def dayofweek(ts_in):
return tf.map_fn(
lambda s: tf.py_function(get_dayofweek, inp=[s], Tout=tf.string),
ts_in
)
@tf.function
def fare_thresh(x):
return 60 * tf.keras.activations.relu(x)
def transform(inputs, NUMERIC_COLS, STRING_COLS):
print("BEFORE TRANSFORMATION")
print("INPUTS:", inputs.keys())
# Pass-through columns
transformed = inputs.copy()
del transformed['pickup_datetime']
feature_columns = {
colname: tf.feature_column.numeric_column(colname)
for colname in NUMERIC_COLS
}
# scale the lat, lon values to be in 0, 1
if True:
for lon_col in ['pickup_longitude', 'dropoff_longitude']: # in range -70 to -78
transformed[lon_col] = tf.keras.layers.Lambda(
lambda x: (x+78)/8.0,
name='scale_{}'.format(lon_col)
)(inputs[lon_col])
for lat_col in ['pickup_latitude', 'dropoff_latitude']: # in range 37 to 45
transformed[lat_col] = tf.keras.layers.Lambda(
lambda x: (x-37)/8.0,
name='scale_{}'.format(lat_col)
)(inputs[lat_col])
# add Euclidean distance. Doesn't have to be accurate calculation because NN will calibrate it
if True:
transformed['euclidean'] = tf.keras.layers.Lambda(euclidean, name='euclidean')([
inputs['pickup_longitude'],
inputs['pickup_latitude'],
inputs['dropoff_longitude'],
inputs['dropoff_latitude']
])
feature_columns['euclidean'] = tf.feature_column.numeric_column('euclidean')
# hour of day from timestamp of form '2010-02-08 09:17:00+00:00'
if True:
transformed['hourofday'] = tf.keras.layers.Lambda(
lambda x: tf.strings.to_number(tf.strings.substr(x, 11, 2), out_type=tf.dtypes.int32),
name='hourofday'
)(inputs['pickup_datetime'])
feature_columns['hourofday'] = tf.feature_column.indicator_column(
tf.feature_column.categorical_column_with_identity('hourofday', num_buckets=24))
if False:
# day of week is hard because there is no TensorFlow function for date handling
transformed['dayofweek'] = tf.keras.layers.Lambda(
lambda x: dayofweek(x),
name='dayofweek_pyfun'
)(inputs['pickup_datetime'])
transformed['dayofweek'] = tf.keras.layers.Reshape((), name='dayofweek')(transformed['dayofweek'])
feature_columns['dayofweek'] = tf.feature_column.indicator_column(
tf.feature_column.categorical_column_with_vocabulary_list(
'dayofweek', vocabulary_list = DAYS))
if True:
# featurecross lat, lon into nxn buckets, then embed
nbuckets = NBUCKETS
latbuckets = np.linspace(0, 1, nbuckets).tolist()
lonbuckets = np.linspace(0, 1, nbuckets).tolist()
b_plat = tf.feature_column.bucketized_column(feature_columns['pickup_latitude'], latbuckets)
b_dlat = tf.feature_column.bucketized_column(feature_columns['dropoff_latitude'], latbuckets)
b_plon = tf.feature_column.bucketized_column(feature_columns['pickup_longitude'], lonbuckets)
b_dlon = tf.feature_column.bucketized_column(feature_columns['dropoff_longitude'], lonbuckets)
ploc = tf.feature_column.crossed_column([b_plat, b_plon], nbuckets * nbuckets)
dloc = tf.feature_column.crossed_column([b_dlat, b_dlon], nbuckets * nbuckets)
pd_pair = tf.feature_column.crossed_column([ploc, dloc], nbuckets ** 4 )
feature_columns['pickup_and_dropoff'] = tf.feature_column.embedding_column(pd_pair, 100)
print("AFTER TRANSFORMATION")
print("TRANSFORMED:", transformed.keys())
print("FEATURES", feature_columns.keys())
return transformed, feature_columns
def rmse(y_true, y_pred):
return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))
def build_dnn_model():
# input layer is all float except for pickup_datetime which is a string
STRING_COLS = ['pickup_datetime']
NUMERIC_COLS = set(CSV_COLUMNS) - set([LABEL_COLUMN, 'key']) - set(STRING_COLS)
print(STRING_COLS)
print(NUMERIC_COLS)
inputs = {
colname : tf.keras.layers.Input(name=colname, shape=(), dtype='float32')
for colname in NUMERIC_COLS
}
inputs.update({
colname : tf.keras.layers.Input(name=colname, shape=(), dtype='string')
for colname in STRING_COLS
})
# transforms
transformed, feature_columns = transform(inputs, NUMERIC_COLS, STRING_COLS)
# TODO 3: Specify the dense feature layers for the DNN as inputs
# Tip: Refer to https://www.tensorflow.org/versions/r2.0/api_docs/python/tf/keras/layers/DenseFeatures
dnn_inputs =
# two hidden layers of [32, 8] just in like the BQML DNN
h1 = tf.keras.layers.Dense(32, activation='relu', name='h1')(dnn_inputs)
h2 = tf.keras.layers.Dense(8, activation='relu', name='h2')(h1)
if False:
# final output would normally have a linear activation because this is regression
# However, we know something about the taxi problem (fares are +ve and tend to be below $60).
# Use that here. (You can verify by running this query):
output = tf.keras.layers.Dense(1, activation=fare_thresh, name='fare')(h2)
else:
output = tf.keras.layers.Dense(1, name='fare')(h2)
model = tf.keras.models.Model(inputs, output)
model.compile(optimizer='adam', loss='mse', metrics=[rmse, 'mse'])
return model
model = build_dnn_model()
print(model.summary())
In [ ]:
tf.keras.utils.plot_model(model, 'dnn_model.png', show_shapes=False, rankdir='LR')
To train the model, call model.fit()
In [ ]:
trainds = load_dataset('taxi-train*', TRAIN_BATCH_SIZE, tf.estimator.ModeKeys.TRAIN)
evalds = load_dataset('taxi-valid*', 1000, tf.estimator.ModeKeys.EVAL).take(NUM_EVAL_EXAMPLES//10000) # evaluate on 1/10 final evaluation set
steps_per_epoch = NUM_TRAIN_EXAMPLES // (TRAIN_BATCH_SIZE * NUM_EVALS)
shutil.rmtree('{}/checkpoints/'.format(OUTDIR), ignore_errors=True)
checkpoint_path = '{}/checkpoints/taxi'.format(OUTDIR)
cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path,
save_weights_only=True,
verbose=1)
history = model.fit(trainds,
validation_data=evalds,
epochs=NUM_EVALS,
steps_per_epoch=steps_per_epoch,
verbose=2, # 0=silent, 1=progress bar, 2=one line per epoch
callbacks=[cp_callback])
In [ ]:
# plot
import matplotlib.pyplot as plt
nrows = 1
ncols = 2
fig = plt.figure(figsize=(10, 5))
for idx, key in enumerate(['loss', 'rmse']):
ax = fig.add_subplot(nrows, ncols, idx+1)
plt.plot(history.history[key])
plt.plot(history.history['val_{}'.format(key)])
plt.title('model {}'.format(key))
plt.ylabel(key)
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left');
In [ ]:
evalds = load_dataset('taxi-valid*', 1000, tf.estimator.ModeKeys.EVAL).take(NUM_EVAL_EXAMPLES//1000)
model.evaluate(evalds)
Although we get RMSE of around 10 (your answer will be different due to random seeds), remember that we trained on a really small subset of the data. We need a larger training dataset before making decisions about this model.
In [ ]:
# TODO 4: Make example predictions. Experiment with different passenger_counts and pickup times and re-run.
model.predict({
'pickup_longitude': tf.convert_to_tensor([-73.982683]),
'pickup_latitude': tf.convert_to_tensor([40.742104]),
'dropoff_longitude': tf.convert_to_tensor([-73.983766]),
'dropoff_latitude': tf.convert_to_tensor([40.755174]),
'passenger_count': tf.convert_to_tensor([3.0]),
'pickup_datetime': tf.convert_to_tensor(['2010-02-08 09:17:00 UTC'], dtype=tf.string),
}, steps=1)
However, this is not realistic, because we can't expect client code to have a model object in memory. We'll have to export our model to a file, and expect client code to instantiate the model from that exported file.
In [ ]:
import shutil, os, datetime
OUTPUT_DIR = os.path.join(OUTDIR, 'export/savedmodel')
if OUTPUT_DIR[:5] != 'gs://':
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
EXPORT_PATH = os.path.join(OUTPUT_DIR, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
# TODO 5: Export the model in preparation for serving later
# Specify the model and export path to save to
# Tip: Refer to: https://www.tensorflow.org/versions/r2.0/api_docs/python/tf/saved_model/save
tf. # <--- complete
In [ ]:
!saved_model_cli show --tag_set serve --signature_def serving_default --dir {EXPORT_PATH}
In [ ]:
!find {EXPORT_PATH}
os.environ['EXPORT_PATH'] = EXPORT_PATH
In [ ]:
%%bash
PROJECT=${PROJECT}
BUCKET=${PROJECT}-ml
REGION=us-east1
MODEL_NAME=taxifare
VERSION_NAME=v2
if [[ $(gcloud ai-platform models list --format='value(name)' | grep $MODEL_NAME) ]]; then
echo "$MODEL_NAME already exists"
else
# create model
echo "Creating $MODEL_NAME"
gcloud ai-platform models create --regions=$REGION $MODEL_NAME
fi
if [[ $(gcloud ai-platform versions list --model $MODEL_NAME --format='value(name)' | grep $VERSION_NAME) ]]; then
echo "Deleting already existing $MODEL_NAME:$VERSION_NAME ... "
gcloud ai-platform versions delete --model=$MODEL_NAME $VERSION_NAME
echo "Please run this cell again if you don't see a Creating message ... "
sleep 10
fi
# create model
echo "Creating $MODEL_NAME:$VERSION_NAME"
gcloud ai-platform versions create --model=$MODEL_NAME $VERSION_NAME --async \
--framework=tensorflow --python-version=3.7 --runtime-version=2.1 \
--origin=$EXPORT_PATH --staging-bucket=gs://$BUCKET
In this notebook, we have looked at how to implement a custom Keras model using feature columns.
In [ ]:
%%writefile repro.json
{"pickup_longitude": -73.982683, "pickup_latitude": 40.742104, "dropoff_longitude": -73.983766, "dropoff_latitude": 40.755174, "passenger_count": 3.0, "pickup_datetime": "2010-02-08 09:17:00 UTC"}
In [ ]:
!gcloud ai-platform predict --model taxifare --json-instances repro.json --version v2
Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.