In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
This notebook demonstrates how to use Jupyter/Colab notebooks for TFX iterative development. Here, we walk through the Chicago Taxi example in an interactive notebook.
Working in an interactive notebook is a useful way to become familiar with the structure of a TFX pipeline. It's also useful when doing development of your own pipelines as a lightweight development environment, but you should be aware that there are differences in the way interactive notebooks are orchestrated, and how they access metadata artifacts.
In a production deployment of TFX you will use an orchestrator such as Apache Airflow, Kubeflow, or Apache Beam. In an interactive notebook the notebook itself is the orchestrator, running each TFX component as you execute the notebook cells.
In a production deployment of TFX you will access metadata through the ML Metadata (MLMD) API. MLMD stores metadata properties in a database such as MySQL, and stores the metadata payloads in a persistent store such as on your filesystem. In an interactive notebook, both properties and payloads are stored in the /tmp directory on the Jupyter notebook or Colab server.
First, install the necessary packages, download data, import modules and set up paths.
Note
Because of some of the updates to packages you must use the button at the bottom of the output of this cell to restart the runtime. Following restart, you should rerun this cell.
In [0]:
!pip install -q -U \
tensorflow==2.0.0 \
tfx==0.15.0 \
pyarrow==0.14.1
!pip install -U grpcio==1.24.3
In [0]:
import os
import pprint
import tempfile
import urllib
import tensorflow as tf
pp = pprint.PrettyPrinter()
import tfx
from tfx.components.evaluator.component import Evaluator
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import evaluator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.utils.dsl_utils import external_input
from tensorflow.core.example import example_pb2
from tensorflow_metadata.proto.v0 import anomalies_pb2
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_metadata.proto.v0 import statistics_pb2
import tensorflow_data_validation as tfdv
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
Check the versions
In [0]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))
print('TFT version: {}'.format(tft.__version__))
print('TFDV version: {}'.format(tfdv.__version__))
print('TFMA version: {}'.format(tfma.VERSION_STRING))
Download the sample dataset for use in our TFX pipeline.
The data comes from the Taxi Trips dataset released by the City of Chicago. You will develop a binary classification model to predict whether or not customers will tip their taxi drivers more or less than 20%.
The columns in the dataset are:
pickup_community_area | fare | trip_start_month |
trip_start_hour | trip_start_day | trip_start_timestamp |
pickup_latitude | pickup_longitude | dropoff_latitude |
dropoff_longitude | trip_miles | pickup_census_tract |
dropoff_census_tract | payment_type | company |
trip_seconds | dropoff_community_area | tips |
In [0]:
# Download the example data.
_data_root = tempfile.mkdtemp(prefix='tfx-data')
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv'
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)
Take a quick look at the CSV file.
In [0]:
!head {_data_filepath}
An interactive context is used to provide global context when running a TFX pipeline in a notebook without using a runner or orchestrator such as Apache Airflow or Kubeflow. This style of development is only useful when developing the code for a pipeline, and cannot currently be used to deploy a working pipeline to production.
In [0]:
# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext.
context = InteractiveContext()
In the cells that follow you will construct TFX components and run each one interactively within the InteractiveContext to obtain ExecutionResult
objects. This mirrors the process of an orchestrator running components in a TFX DAG based on when the dependencies for each component are met.
In [0]:
# Use the packaged CSV input data.
input_data = external_input(_data_root)
# Brings data into the pipeline or otherwise joins/converts training data.
example_gen = CsvExampleGen(input=input_data)
context.run(example_gen)
ExampleGen's outputs include 2 artifacts: the training examples and the eval examples (by default, split 2/3 training, 1/3 eval):
In [0]:
for artifact in example_gen.outputs['examples'].get():
print(artifact.split, artifact.uri)
Take a peek at the output training examples to see what they look like.
TFRecordDataset
to read these filesTFExampleDecoder
to check the results:
In [0]:
train_uri = example_gen.outputs['examples'].get()[0].uri
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
decoder = tfdv.TFExampleDecoder()
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = decoder.decode(serialized_example)
pp.pprint(example)
In [0]:
# Computes statistics over data for visualization and example validation.
statistics_gen = StatisticsGen(
input_data=example_gen.outputs['examples'])
context.run(statistics_gen)
Again, let's take a peek at the output training artifact. Note that this time it is a TFRecord file containing a single record with a serialized DatasetFeatureStatisticsList
protobuf:
In [0]:
train_uri = statistics_gen.outputs['statistics'].get()[0].uri
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
dataset = tf.data.TFRecordDataset(tfrecord_filenames)
for tfrecord in dataset.take(1):
serialized_example = tfrecord.numpy()
stats = statistics_pb2.DatasetFeatureStatisticsList()
stats.ParseFromString(serialized_example)
The statistics can be visualized using the tfdv.visualize_statistics()
function:
In [0]:
tfdv.visualize_statistics(stats)
In [0]:
# Generates schema based on statistics files.
infer_schema = SchemaGen(statistics=statistics_gen.outputs['statistics'])
context.run(infer_schema)
The generated artifact is just a schema.pbtxt
containing a text representation of a schema_pb2.Schema
protobuf:
In [0]:
train_uri = infer_schema.outputs['schema'].get()[0].uri
schema_filename = os.path.join(train_uri, "schema.pbtxt")
schema = tfx.utils.io_utils.parse_pbtxt_file(file_name=schema_filename,
message=schema_pb2.Schema())
It can be visualized using tfdv.display_schema()
:
In [0]:
tfdv.display_schema(schema)
The ExampleValidator
performs anomaly detection, based on the statistics from StatisticsGen and the schema from SchemaGen. It looks for problems such as missing values, values of the wrong type, or categorical values outside of the domain of acceptable values.
Create an ExampleValidator component and run it.
In [0]:
# Performs anomaly detection based on statistics and data schema.
validate_stats = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=infer_schema.outputs['schema'])
context.run(validate_stats)
The output artifact of ExampleValidator
is an anomalies.pbtxt
file describing an anomalies_pb2.Anomalies
protobuf:
In [0]:
train_uri = validate_stats.outputs['anomalies'].get()[0].uri
anomalies_filename = os.path.join(train_uri, "anomalies.pbtxt")
anomalies = tfx.utils.io_utils.parse_pbtxt_file(
file_name=anomalies_filename,
message=anomalies_pb2.Anomalies())
This can be visualized using the tfdv.display_anomalies()
function. Did it find any anomalies?
In [0]:
tfdv.display_anomalies(anomalies)
The Transform
component performs data transformations and feature engineering. The results include an input TensorFlow graph which is used during both training and serving to preprocess the data before training or inference. This graph becomes part of the SavedModel that is the result of model training. Since the same input graph is used for both training and serving, the preprocessing will always be the same, and only needs to be written once.
The Transform component requires more code than many other components because of the arbitrary complexity of the feature engineering that you may need for the data and/or model that you're working with. It requires code files to be available which define the processing needed.
Define some constants and functions for both the Transform
component and the Trainer
component. Define them in a Python module, in this case saved to disk using the %%writefile
magic command since you are working in a notebook:
In [0]:
_constants_module_file = 'chicago_taxi_constants.py'
In [0]:
%%writefile {_constants_module_file}
# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]
CATEGORICAL_FEATURE_KEYS = [
'trip_start_hour', 'trip_start_day', 'trip_start_month',
'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
'dropoff_community_area'
]
DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']
# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10
BUCKET_FEATURE_KEYS = [
'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
'dropoff_longitude'
]
# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000
# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10
VOCAB_FEATURE_KEYS = [
'payment_type',
'company',
]
# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'
def transformed_name(key):
return key + '_xf'
Now define a module containing the preprocessing_fn()
function that will be passed to the Transform
component:
In [0]:
_transform_module_file = 'chicago_taxi_transform.py'
In [0]:
%%writefile {_transform_module_file}
import tensorflow_transform as tft
import tensorflow as tf
from chicago_taxi_constants import *
def _transformed_names(keys):
return [transformed_name(key) for key in keys]
# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
return schema_utils.schema_as_feature_spec(schema).feature_spec
def _gzip_reader_fn(filenames):
"""Small utility returning a record reader that can read gzip'ed files."""
return tf.data.TFRecordDataset(
filenames,
compression_type='GZIP')
def _fill_in_missing(x):
"""Replace missing values in a SparseTensor.
Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
Args:
x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1
in the second dimension.
Returns:
A rank 1 tensor where missing values of `x` have been filled in.
"""
default_value = '' if x.dtype == tf.string else 0
return tf.squeeze(
tf.sparse.to_dense(
tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
default_value),
axis=1)
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
outputs = {}
for key in DENSE_FLOAT_FEATURE_KEYS:
# Preserve this feature as a dense float, setting nan's to the mean.
outputs[transformed_name(key)] = tft.scale_to_z_score(
_fill_in_missing(inputs[key]))
for key in VOCAB_FEATURE_KEYS:
# Build a vocabulary for this feature.
outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(
_fill_in_missing(inputs[key]),
top_k=VOCAB_SIZE,
num_oov_buckets=OOV_SIZE)
for key in BUCKET_FEATURE_KEYS:
outputs[transformed_name(key)] = tft.bucketize(
_fill_in_missing(inputs[key]), FEATURE_BUCKET_COUNT,
always_return_num_quantiles=False)
for key in CATEGORICAL_FEATURE_KEYS:
outputs[transformed_name(key)] = _fill_in_missing(inputs[key])
# Was this passenger a big tipper?
taxi_fare = _fill_in_missing(inputs[FARE_KEY])
tips = _fill_in_missing(inputs[LABEL_KEY])
outputs[transformed_name(LABEL_KEY)] = tf.where(
tf.math.is_nan(taxi_fare),
tf.cast(tf.zeros_like(taxi_fare), tf.int64),
# Test if the tip was > 20% of the fare.
tf.cast(
tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))
return outputs
Create and run the Transform
component, referring to the files that were created above.
In [0]:
# Performs transformations and feature engineering in training and serving.
transform = Transform(
examples=example_gen.outputs['examples'],
schema=infer_schema.outputs['schema'],
module_file=_transform_module_file)
context.run(transform)
The Transform
component has 2 types of outputs:
transform_graph
is the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).transformed_examples
represents the preprocessed training and evaluation data.
In [0]:
transform.outputs
Take a peek at the transform_graph
artifact. It points to a directory containing 3 subdirectories.
In [0]:
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)
The transform_fn
subdirectory contains the actual preprocessing graph. The metadata
subdirectory contains the schema of the original data. The transformed_metadata
subdirectory contains the schema of the preprocessed data.
Take a look at some of the transformed examples and check that they are indeed processed as intended.
In [0]:
train_uri = transform.outputs['transformed_examples'].get()[1].uri
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
decoder = tfdv.TFExampleDecoder()
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = decoder.decode(serialized_example)
pp.pprint(example)
In [0]:
# Setup paths.
_trainer_module_file = 'chicago_taxi_trainer.py'
In [0]:
%%writefile {_trainer_module_file}
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils
from chicago_taxi_constants import *
def transformed_names(keys):
return [transformed_name(key) for key in keys]
# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
return schema_utils.schema_as_feature_spec(schema).feature_spec
def _gzip_reader_fn(filenames):
"""Small utility returning a record reader that can read gzip'ed files."""
return tf.data.TFRecordDataset(
filenames,
compression_type='GZIP')
def _build_estimator(config, hidden_units=None, warm_start_from=None):
"""Build an estimator for predicting taxi tips
Args:
config: tf.estimator.RunConfig defining the runtime environment for the
estimator (including model_dir).
hidden_units: [int], the layer sizes of the DNN (input layer first)
warm_start_from: Optional directory to warm start from.
Returns:
The estimator that will be used for training and eval.
"""
real_valued_columns = [
tf.feature_column.numeric_column(key, shape=())
for key in transformed_names(DENSE_FLOAT_FEATURE_KEYS)
]
categorical_columns = [
tf.feature_column.categorical_column_with_identity(
key, num_buckets=VOCAB_SIZE + OOV_SIZE, default_value=0)
for key in transformed_names(VOCAB_FEATURE_KEYS)
]
categorical_columns += [
tf.feature_column.categorical_column_with_identity(
key, num_buckets=FEATURE_BUCKET_COUNT, default_value=0)
for key in transformed_names(BUCKET_FEATURE_KEYS)
]
categorical_columns += [
tf.feature_column.categorical_column_with_identity(
key,
num_buckets=num_buckets,
default_value=0) for key, num_buckets in zip(
transformed_names(CATEGORICAL_FEATURE_KEYS),
MAX_CATEGORICAL_FEATURE_VALUES)
]
return tf.estimator.DNNLinearCombinedRegressor(
config=config,
linear_feature_columns=categorical_columns,
dnn_feature_columns=real_valued_columns,
dnn_hidden_units=hidden_units or [100, 70, 50, 25],
warm_start_from=warm_start_from)
def _example_serving_receiver_fn(tf_transform_graph, schema):
"""Build the serving in inputs.
Args:
tf_transform_graph: A TFTransformOutput.
schema: the schema of the input data.
Returns:
Tensorflow graph which parses examples, applying tf-transform to them.
"""
raw_feature_spec = _get_raw_feature_spec(schema)
raw_feature_spec.pop(LABEL_KEY)
raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
raw_feature_spec, default_batch_size=None)
serving_input_receiver = raw_input_fn()
transformed_features = tf_transform_graph.transform_raw_features(
serving_input_receiver.features)
return tf.estimator.export.ServingInputReceiver(
transformed_features, serving_input_receiver.receiver_tensors)
def _eval_input_receiver_fn(tf_transform_graph, schema):
"""Build everything needed for the tf-model-analysis to run the model.
Args:
tf_transform_graph: A TFTransformOutput.
schema: the schema of the input data.
Returns:
EvalInputReceiver function, which contains:
- Tensorflow graph which parses raw untransformed features, applies the
tf-transform preprocessing operators.
- Set of raw, untransformed features.
- Label against which predictions will be compared.
"""
# Notice that the inputs are raw features, not transformed features here.
raw_feature_spec = _get_raw_feature_spec(schema)
serialized_tf_example = tf.compat.v1.placeholder(
dtype=tf.string, shape=[None], name='input_example_tensor')
# Add a parse_example operator to the tensorflow graph, which will parse
# raw, untransformed, tf examples.
features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)
# Now that we have our raw examples, process them through the tf-transform
# function computed during the preprocessing step.
transformed_features = tf_transform_graph.transform_raw_features(
features)
# The key name MUST be 'examples'.
receiver_tensors = {'examples': serialized_tf_example}
# NOTE: Model is driven by transformed features (since training works on the
# materialized output of TFT, but slicing will happen on raw features.
features.update(transformed_features)
return tfma.export.EvalInputReceiver(
features=features,
receiver_tensors=receiver_tensors,
labels=transformed_features[transformed_name(LABEL_KEY)])
def _input_fn(filenames, tf_transform_graph, batch_size=200):
"""Generates features and labels for training or evaluation.
Args:
filenames: [str] list of CSV files to read data from.
tf_transform_graph: A TFTransformOutput.
batch_size: int First dimension size of the Tensors returned by input_fn
Returns:
A (features, indices) tuple where features is a dictionary of
Tensors, and indices is a single Tensor of label indices.
"""
transformed_feature_spec = (
tf_transform_graph.transformed_feature_spec().copy())
dataset = tf.data.experimental.make_batched_features_dataset(
filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)
transformed_features = dataset.make_one_shot_iterator().get_next()
# We pop the label because we do not want to use it as a feature while we're
# training.
return transformed_features, transformed_features.pop(
transformed_name(LABEL_KEY))
# TFX will call this function
def trainer_fn(hparams, schema):
"""Build the estimator using the high level API.
Args:
hparams: Holds hyperparameters used to train the model as name/value pairs.
schema: Holds the schema of the training examples.
Returns:
A dict of the following:
- estimator: The estimator that will be used for training and eval.
- train_spec: Spec for training.
- eval_spec: Spec for eval.
- eval_input_receiver_fn: Input function for eval.
"""
# Number of nodes in the first layer of the DNN
first_dnn_layer_size = 100
num_dnn_layers = 4
dnn_decay_factor = 0.7
train_batch_size = 40
eval_batch_size = 40
tf_transform_graph = tft.TFTransformOutput(hparams.transform_output)
train_input_fn = lambda: _input_fn(
hparams.train_files,
tf_transform_graph,
batch_size=train_batch_size)
eval_input_fn = lambda: _input_fn(
hparams.eval_files,
tf_transform_graph,
batch_size=eval_batch_size)
train_spec = tf.estimator.TrainSpec(
train_input_fn,
max_steps=hparams.train_steps)
serving_receiver_fn = lambda: _example_serving_receiver_fn(
tf_transform_graph, schema)
exporter = tf.estimator.FinalExporter('chicago-taxi', serving_receiver_fn)
eval_spec = tf.estimator.EvalSpec(
eval_input_fn,
steps=hparams.eval_steps,
exporters=[exporter],
name='chicago-taxi-eval')
run_config = tf.estimator.RunConfig(
save_checkpoints_steps=999, keep_checkpoint_max=1)
run_config = run_config.replace(model_dir=hparams.serving_model_dir)
estimator = _build_estimator(
# Construct layers sizes with exponetial decay
hidden_units=[
max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
for i in range(num_dnn_layers)
],
config=run_config,
warm_start_from=hparams.warm_start_from)
# Create an input receiver for TFMA processing
receiver_fn = lambda: _eval_input_receiver_fn(
tf_transform_graph, schema)
return {
'estimator': estimator,
'train_spec': train_spec,
'eval_spec': eval_spec,
'eval_input_receiver_fn': receiver_fn
}
Create and run the Trainer
component.
In [0]:
# Uses user-provided Python function that implements a model using TensorFlow.
trainer = Trainer(
module_file=_trainer_module_file,
transformed_examples=transform.outputs['transformed_examples'],
schema=infer_schema.outputs['schema'],
transform_graph=transform.outputs['transform_graph'],
train_args=trainer_pb2.TrainArgs(num_steps=10000),
eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)
Take a peek at the trained model which was exported from Trainer
.
In [0]:
train_uri = trainer.outputs['model'].get()[0].uri
serving_model_path = os.path.join(train_uri, 'serving_model_dir', 'export', 'chicago-taxi')
latest_serving_model_path = os.path.join(serving_model_path, max(os.listdir(serving_model_path)))
exported_model = tf.saved_model.load(latest_serving_model_path)
In [0]:
exported_model.graph.get_operations()[:10] + ["..."]
In [0]:
%load_ext tensorboard
In [0]:
%tensorboard --bind_all --logdir {os.path.join(train_uri, 'serving_model_dir')}
The Evaluator
component analyzes model performance using the TensorFlow Model Analysis library. It runs inference requests on particular subsets of the test dataset, based on which slices
are defined by the developer. Knowing which slices should be analyzed requires domain knowledge of what is imporant in this particular use case or domain.
Create and run an Evaluator component.
In [0]:
# Uses TFMA to compute a evaluation statistics over features of a model.
model_analyzer = Evaluator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'],
feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[
evaluator_pb2.SingleSlicingSpec(
column_for_slicing=['weekday'])
]))
context.run(model_analyzer)
In [0]:
model_analyzer.outputs
Use the Evaluator results to generate model performance data which can be visualized. First create evaluation input data.
In [0]:
import csv
BASE_DIR = tempfile.mkdtemp()
reader = csv.DictReader(open(_data_filepath))
examples = []
for line in reader:
example = tf.train.Example()
for feature in schema.feature:
key = feature.name
if len(line[key]) > 0:
if feature.type == schema_pb2.FLOAT:
example.features.feature[key].float_list.value[:] = [float(line[key])]
elif feature.type == schema_pb2.INT:
example.features.feature[key].int64_list.value[:] = [int(line[key])]
elif feature.type == schema_pb2.BYTES:
example.features.feature[key].bytes_list.value[:] = [line[key].encode('utf8')]
else:
if feature.type == schema_pb2.FLOAT:
example.features.feature[key].float_list.value[:] = []
elif feature.type == schema_pb2.INT:
example.features.feature[key].int64_list.value[:] = []
elif feature.type == schema_pb2.BYTES:
example.features.feature[key].bytes_list.value[:] = []
examples.append(example)
TFRecord_file = os.path.join(BASE_DIR, 'train_data.rio')
with tf.io.TFRecordWriter(TFRecord_file) as writer:
for example in examples:
writer.write(example.SerializeToString())
writer.flush()
writer.close()
!ls {TFRecord_file}
Run the analysis of a particular slice of data.
In [0]:
def run_and_render(eval_model=None, slice_list=None, slice_idx=0):
"""Runs the model analysis and renders the slicing metrics
Args:
eval_model: An instance of tf.saved_model saved with evaluation data
slice_list: A list of tfma.slicer.SingleSliceSpec giving the slices
slice_idx: An integer index into slice_list specifying the slice to use
Returns:
A SlicingMetricsViewer object if in Jupyter notebook; None if in Colab.
"""
eval_result = tfma.run_model_analysis(eval_shared_model=eval_model,
data_location=TFRecord_file,
file_format='tfrecords',
slice_spec=slice_list,
output_path='sample_data',
extractors=None)
return tfma.view.render_slicing_metrics(eval_result, slicing_spec=slice_list[slice_idx] if slice_list else None)
# Load the TFMA results for the first training run
# This will take a minute
eval_model_base_dir_0 = os.path.join(train_uri, 'eval_model_dir')
eval_model_dir_0 = os.path.join(eval_model_base_dir_0,
max(os.listdir(eval_model_base_dir_0)))
eval_shared_model_0 = tfma.default_eval_shared_model(
eval_saved_model_path=eval_model_dir_0)
# Slice our data by the trip_start_hour feature
slices = [tfma.slicer.SingleSliceSpec(columns=['trip_start_hour'])]
run_and_render(eval_model=eval_shared_model_0, slice_list=slices, slice_idx=0)
Print the slicing metrics.
In [0]:
evaluation_uri = model_analyzer.outputs['output'].get()[0].uri
eval_result = tfma.load_eval_result(evaluation_uri)
print('{}\n\nslicing_metrics:\n'.format(eval_result))
for metric in eval_result.slicing_metrics:
pp.pprint(metric)
Examine the output data.
In [0]:
eval_path_uri = model_analyzer.outputs['output'].get()[0].uri
tfrecord_filenames = [os.path.join(eval_path_uri, name)
for name in os.listdir(eval_path_uri)]
pp.pprint(tfrecord_filenames)
dataset = tf.data.TFRecordDataset(tfrecord_filenames)
pp.pprint(dataset)
The ModelValidator
component performs validation of your candidate model compared to the previously deployed model (if any) using criteria that you define, or to a baseline value. If the new model scores better than the previous model it will be "blessed" by ModelValidator, approving it for deployment.
In [0]:
# Performs quality validation of a candidate model (compared to a baseline).
model_validator = ModelValidator(
examples=example_gen.outputs['examples'],
model=trainer.outputs['model'])
context.run(model_validator)
Examine the output of ModelValidator.
In [0]:
model_validator.outputs
In [0]:
blessing_uri = model_validator.outputs['blessing'].get()[0].uri
!ls -l {blessing_uri}
In [0]:
# Setup serving path
_serving_model_dir = os.path.join(tempfile.mkdtemp(),
'serving_model/chicago_taxi_simple')
Create and run a Pusher component.
In [0]:
# Checks whether the model passed the validation steps and pushes the model
# to a file destination if check passed.
pusher = Pusher(
model=trainer.outputs['model'],
model_blessing=model_validator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=_serving_model_dir)))
context.run(pusher)
Examine the output of Pusher.
In [0]:
pusher.outputs
In [0]:
push_uri = pusher.outputs['pushed_model'].get()[0].uri
latest_version = max(os.listdir(push_uri))
latest_version_path = os.path.join(push_uri, latest_version)
model = tf.saved_model.load(latest_version_path)
In [0]:
for item in model.signatures.items():
pp.pprint(item)
Now that we have a trained model that has been blessed by ModelValidator, and pushed to our deployment target by Pusher, we can load it into TensorFlow Serving and start serving inference requests.
We'll use the command line utility saved_model_cli
to look at the MetaGraphDefs (the models) and SignatureDefs (the methods you can call) in our SavedModel. See this discussion of the SavedModel CLI in the TensorFlow Guide.
In [0]:
latest_pushed_model = os.path.join(_serving_model_dir, max(os.listdir(_serving_model_dir)))
!saved_model_cli show --dir {latest_pushed_model} --all
That tells us a lot about our model! In this case we just trained our model, so we already know the inputs and outputs, but if we didn't this would be important information. It doesn't tell us everything, but it's a great start.
We're preparing to install TensorFlow Serving using Aptitude since this Colab runs in a Debian environment. We'll add the tensorflow-model-server
package to the list of packages that Aptitude knows about. Note that we're running as root.
Note: This example is running TensorFlow Serving natively, but you can also run it in a Docker container, which is one of the easiest ways to get started using TensorFlow Serving.
In [0]:
# This is the same as you would do from your command line, but without the [arch=amd64], and no sudo
# You would instead do:
# echo "deb [arch=amd64] http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" | sudo tee /etc/apt/sources.list.d/tensorflow-serving.list && \
# curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | sudo apt-key add -
!echo "deb http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" | tee /etc/apt/sources.list.d/tensorflow-serving.list && \
curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | apt-key add -
!apt update
In [0]:
!apt-get install tensorflow-model-server
This is where we start running TensorFlow Serving and load our model. After it loads we can start making inference requests using REST. There are some important parameters:
rest_api_port
: The port that you'll use for REST requests.model_name
: You'll use this in the URL of REST requests. It can be anything.model_base_path
: This is the path to the directory where you've saved your model. Note that this base_path should not include the model version directory, which is why we split it off below.
In [0]:
os.environ["MODEL_DIR"] = os.path.split(latest_pushed_model)[0]
In [0]:
%%bash --bg
nohup tensorflow_model_server \
--rest_api_port=8501 \
--model_name=chicago_taxi_simple \
--model_base_path="${MODEL_DIR}" >server.log 2>&1
In [0]:
!tail server.log
Our example data is stored in a CSV file on disk. We first have to read the file and decode the examples from CSV, and then encode these as Example protos to feed to Tensorflow Serving.
A few notes:
regress
and classify
APIs are higher-level and thus encouraged to be used over predict
- here we use the predict
API to showcase the more involved route.regress
and classify
APIs expect and can parse tf.Example
, the predict
API expects arbitrary TensorProto
. This means we will have to construct the tf.Example
proto using coders from Tensorflow Transform.tf.Example
using Base64.predict
API, you should strongly consider using the gRPC API surface.
In [0]:
import base64
import json
import requests
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import schema_utils
from tensorflow_transform import coders as tft_coders
from chicago_taxi_constants import *
def _get_raw_feature_spec(schema):
return schema_utils.schema_as_feature_spec(schema).feature_spec
def _make_proto_coder(schema):
raw_feature_spec = _get_raw_feature_spec(schema)
raw_schema = dataset_schema.from_feature_spec(raw_feature_spec)
return tft_coders.ExampleProtoCoder(raw_schema)
def _make_csv_coder(schema, column_names):
"""Return a coder for tf.transform to read csv files."""
raw_feature_spec = _get_raw_feature_spec(schema)
parsing_schema = dataset_schema.from_feature_spec(raw_feature_spec)
return tft_coders.CsvCoder(column_names, parsing_schema)
def make_serialized_examples(examples_csv_file, num_examples, schema):
"""Parses examples from CSV file and returns seralized proto examples."""
filtered_features = [
feature for feature in schema.feature if feature.name != LABEL_KEY
]
del schema.feature[:]
schema.feature.extend(filtered_features)
columns = tfx.utils.io_utils.load_csv_column_names(examples_csv_file)
csv_coder = _make_csv_coder(schema, columns)
proto_coder = _make_proto_coder(schema)
input_file = open(examples_csv_file, 'r')
input_file.readline() # skip header line
serialized_examples = []
for _ in range(num_examples):
one_line = input_file.readline()
if not one_line:
print('End of example file reached')
break
one_example = csv_coder.decode(one_line)
serialized_example = proto_coder.encode(one_example)
serialized_examples.append(serialized_example)
return serialized_examples
In [0]:
def do_inference(server_addr, model_name, serialized_examples):
"""Sends requests to the model and prints the results.
Args:
server_addr: network address of model server in "host:port" format
model_name: name of the model as understood by the model server
serialized_examples: serialized examples of data to do inference on
"""
parsed_server_addr = server_addr.split(':')
host=parsed_server_addr[0]
port=parsed_server_addr[1]
json_examples = []
for serialized_example in serialized_examples:
# The encoding follows the guidelines in:
# https://www.tensorflow.org/tfx/serving/api_rest
example_bytes = base64.b64encode(serialized_example).decode('utf-8')
predict_request = '{ "b64": "%s" }' % example_bytes
json_examples.append(predict_request)
json_request = '{ "instances": [' + ','.join(map(str, json_examples)) + ']}'
server_url = 'http://' + host + ':' + port + '/v1/models/' + model_name + ':predict'
response = requests.post(
server_url, data=json_request, timeout=5.0)
response.raise_for_status()
prediction = response.json()
print(json.dumps(prediction, indent=4))
In [0]:
serialized_examples = make_serialized_examples(
examples_csv_file=_data_filepath,
num_examples=3,
schema=schema)
do_inference(server_addr='127.0.0.1:8501',
model_name='chicago_taxi_simple',
serialized_examples=serialized_examples)