Copyright © 2019 The TensorFlow Authors.

In [0]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

TensorFlow Extended (TFX) Workshop

Run this notebook in Colab

Running a simple pipeline manually in a Colab Notebook

This notebook demonstrates how to use Jupyter/Colab notebooks for TFX iterative development. Here, we walk through the Chicago Taxi example in an interactive notebook.

Working in an interactive notebook is a useful way to become familiar with the structure of a TFX pipeline. It's also useful when doing development of your own pipelines as a lightweight development environment, but you should be aware that there are differences in the way interactive notebooks are orchestrated, and how they access metadata artifacts.

Orchestration

In a production deployment of TFX you will use an orchestrator such as Apache Airflow, Kubeflow, or Apache Beam. In an interactive notebook the notebook itself is the orchestrator, running each TFX component as you execute the notebook cells.

Metadata

In a production deployment of TFX you will access metadata through the ML Metadata (MLMD) API. MLMD stores metadata properties in a database such as MySQL, and stores the metadata payloads in a persistent store such as on your filesystem. In an interactive notebook, both properties and payloads are stored in the /tmp directory on the Jupyter notebook or Colab server.

Setup

First, install the necessary packages, download data, import modules and set up paths.

Install TFX and TensorFlow

Note

Because of some of the updates to packages you must use the button at the bottom of the output of this cell to restart the runtime. Following restart, you should rerun this cell.


In [0]:
!pip install -q -U \
  tensorflow==2.0.0 \
  tfx==0.15.0 \
  pyarrow==0.14.1
!pip install -U grpcio==1.24.3

Import packages

Import necessary packages, including standard TFX component classes.


In [0]:
import os
import pprint
import tempfile
import urllib

import tensorflow as tf
pp = pprint.PrettyPrinter()

import tfx
from tfx.components.evaluator.component import Evaluator
from tfx.components.example_gen.csv_example_gen.component import CsvExampleGen
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import evaluator_pb2
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.utils.dsl_utils import external_input

from tensorflow.core.example import example_pb2
from tensorflow_metadata.proto.v0 import anomalies_pb2
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_metadata.proto.v0 import statistics_pb2

import tensorflow_data_validation as tfdv
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma

Check the versions


In [0]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))
print('TFT version: {}'.format(tft.__version__))
print('TFDV version: {}'.format(tfdv.__version__))
print('TFMA version: {}'.format(tfma.VERSION_STRING))

Download example data

Download the sample dataset for use in our TFX pipeline.

The data comes from the Taxi Trips dataset released by the City of Chicago. You will develop a binary classification model to predict whether or not customers will tip their taxi drivers more or less than 20%.

The columns in the dataset are:

pickup_community_areafaretrip_start_month
trip_start_hourtrip_start_daytrip_start_timestamp
pickup_latitudepickup_longitudedropoff_latitude
dropoff_longitudetrip_milespickup_census_tract
dropoff_census_tractpayment_typecompany
trip_secondsdropoff_community_areatips

In [0]:
# Download the example data.
_data_root = tempfile.mkdtemp(prefix='tfx-data')
DATA_PATH = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv'
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(DATA_PATH, _data_filepath)

Take a quick look at the CSV file.


In [0]:
!head {_data_filepath}

Create the InteractiveContext

An interactive context is used to provide global context when running a TFX pipeline in a notebook without using a runner or orchestrator such as Apache Airflow or Kubeflow. This style of development is only useful when developing the code for a pipeline, and cannot currently be used to deploy a working pipeline to production.


In [0]:
# Here, we create an InteractiveContext using default parameters. This will
# use a temporary directory with an ephemeral ML Metadata database instance.
# To use your own pipeline root or database, the optional properties
# `pipeline_root` and `metadata_connection_config` may be passed to
# InteractiveContext.
context = InteractiveContext()

Run TFX Components Interactively


In the cells that follow you will construct TFX components and run each one interactively within the InteractiveContext to obtain ExecutionResult objects. This mirrors the process of an orchestrator running components in a TFX DAG based on when the dependencies for each component are met.

The ExampleGen Component

In any ML development process the first step when starting code development is to ingest the training and test datasets. The ExampleGen component brings data into the TFX pipeline.

Create an ExampleGen component and run it.


In [0]:
# Use the packaged CSV input data.
input_data = external_input(_data_root)

# Brings data into the pipeline or otherwise joins/converts training data.
example_gen = CsvExampleGen(input=input_data)
context.run(example_gen)

ExampleGen's outputs include 2 artifacts: the training examples and the eval examples (by default, split 2/3 training, 1/3 eval):


In [0]:
for artifact in example_gen.outputs['examples'].get():
  print(artifact.split, artifact.uri)

Take a peek at the output training examples to see what they look like.

  1. Get the URI of the output artifact representing the training examples, which is a directory
  2. Get the list of files in this directory (all compressed TFRecord files), and create a TFRecordDataset to read these files
  3. Iterate over the first 3 records and decode them using a TFExampleDecoder to check the results:

In [0]:
train_uri = example_gen.outputs['examples'].get()[0].uri
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
decoder = tfdv.TFExampleDecoder()
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = decoder.decode(serialized_example)
  pp.pprint(example)

The StatisticsGen Component

The StatisticsGen component computes descriptive statistics for your dataset. The statistics that it generates can be visualized for review, and are used for example validation and to infer a schema.

Create a StatisticsGen component and run it.


In [0]:
# Computes statistics over data for visualization and example validation.
statistics_gen = StatisticsGen(
    input_data=example_gen.outputs['examples'])
context.run(statistics_gen)

Again, let's take a peek at the output training artifact. Note that this time it is a TFRecord file containing a single record with a serialized DatasetFeatureStatisticsList protobuf:


In [0]:
train_uri = statistics_gen.outputs['statistics'].get()[0].uri
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]
dataset = tf.data.TFRecordDataset(tfrecord_filenames)
for tfrecord in dataset.take(1):
  serialized_example = tfrecord.numpy()
  stats = statistics_pb2.DatasetFeatureStatisticsList()
  stats.ParseFromString(serialized_example)

The statistics can be visualized using the tfdv.visualize_statistics() function:


In [0]:
tfdv.visualize_statistics(stats)

The SchemaGen Component

The SchemaGen component generates a schema for your data based on the statistics from StatisticsGen. It tries to infer the data types of each of your features, and the ranges of legal values for categorical features.

Create a SchemaGen component and run it.


In [0]:
# Generates schema based on statistics files.
infer_schema = SchemaGen(statistics=statistics_gen.outputs['statistics'])
context.run(infer_schema)

The generated artifact is just a schema.pbtxt containing a text representation of a schema_pb2.Schema protobuf:


In [0]:
train_uri = infer_schema.outputs['schema'].get()[0].uri
schema_filename = os.path.join(train_uri, "schema.pbtxt")
schema = tfx.utils.io_utils.parse_pbtxt_file(file_name=schema_filename,
                                             message=schema_pb2.Schema())

It can be visualized using tfdv.display_schema():


In [0]:
tfdv.display_schema(schema)

The ExampleValidator Component

The ExampleValidator performs anomaly detection, based on the statistics from StatisticsGen and the schema from SchemaGen. It looks for problems such as missing values, values of the wrong type, or categorical values outside of the domain of acceptable values.

Create an ExampleValidator component and run it.


In [0]:
# Performs anomaly detection based on statistics and data schema.
validate_stats = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=infer_schema.outputs['schema'])
context.run(validate_stats)

The output artifact of ExampleValidator is an anomalies.pbtxt file describing an anomalies_pb2.Anomalies protobuf:


In [0]:
train_uri = validate_stats.outputs['anomalies'].get()[0].uri
anomalies_filename = os.path.join(train_uri, "anomalies.pbtxt")
anomalies = tfx.utils.io_utils.parse_pbtxt_file(
    file_name=anomalies_filename,
    message=anomalies_pb2.Anomalies())

This can be visualized using the tfdv.display_anomalies() function. Did it find any anomalies?


In [0]:
tfdv.display_anomalies(anomalies)

The Transform Component

The Transform component performs data transformations and feature engineering. The results include an input TensorFlow graph which is used during both training and serving to preprocess the data before training or inference. This graph becomes part of the SavedModel that is the result of model training. Since the same input graph is used for both training and serving, the preprocessing will always be the same, and only needs to be written once.

The Transform component requires more code than many other components because of the arbitrary complexity of the feature engineering that you may need for the data and/or model that you're working with. It requires code files to be available which define the processing needed.

Define some constants and functions for both the Transform component and the Trainer component. Define them in a Python module, in this case saved to disk using the %%writefile magic command since you are working in a notebook:


In [0]:
_constants_module_file = 'chicago_taxi_constants.py'

In [0]:
%%writefile {_constants_module_file}

# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]

CATEGORICAL_FEATURE_KEYS = [
    'trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area'
]

DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']

# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10

BUCKET_FEATURE_KEYS = [
    'pickup_latitude', 'pickup_longitude', 'dropoff_latitude',
    'dropoff_longitude'
]

# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000

# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10

VOCAB_FEATURE_KEYS = [
    'payment_type',
    'company',
]

# Keys
LABEL_KEY = 'tips'
FARE_KEY = 'fare'

def transformed_name(key):
  return key + '_xf'

Now define a module containing the preprocessing_fn() function that will be passed to the Transform component:


In [0]:
_transform_module_file = 'chicago_taxi_transform.py'

In [0]:
%%writefile {_transform_module_file}

import tensorflow_transform as tft
import tensorflow as tf
from chicago_taxi_constants import *


def _transformed_names(keys):
  return [transformed_name(key) for key in keys]


# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
  return schema_utils.schema_as_feature_spec(schema).feature_spec


def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')


def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)


def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.
  Args:
    inputs: map from feature keys to raw not-yet-transformed features.
  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  for key in DENSE_FLOAT_FEATURE_KEYS:
    # Preserve this feature as a dense float, setting nan's to the mean.
    outputs[transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in VOCAB_FEATURE_KEYS:
    # Build a vocabulary for this feature.
    outputs[transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=VOCAB_SIZE,
        num_oov_buckets=OOV_SIZE)

  for key in BUCKET_FEATURE_KEYS:
    outputs[transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]), FEATURE_BUCKET_COUNT,
        always_return_num_quantiles=False)

  for key in CATEGORICAL_FEATURE_KEYS:
    outputs[transformed_name(key)] = _fill_in_missing(inputs[key])

  # Was this passenger a big tipper?
  taxi_fare = _fill_in_missing(inputs[FARE_KEY])
  tips = _fill_in_missing(inputs[LABEL_KEY])
  outputs[transformed_name(LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Test if the tip was > 20% of the fare.
      tf.cast(
          tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))

  return outputs

Create and run the Transform component, referring to the files that were created above.


In [0]:
# Performs transformations and feature engineering in training and serving.
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=infer_schema.outputs['schema'],
    module_file=_transform_module_file)
context.run(transform)

The Transform component has 2 types of outputs:

  • transform_graph is the graph that can perform the preprocessing operations (this graph will be included in the serving and evaluation models).
  • transformed_examples represents the preprocessed training and evaluation data.

In [0]:
transform.outputs

Take a peek at the transform_graph artifact. It points to a directory containing 3 subdirectories.


In [0]:
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)

The transform_fn subdirectory contains the actual preprocessing graph. The metadata subdirectory contains the schema of the original data. The transformed_metadata subdirectory contains the schema of the preprocessed data.

Take a look at some of the transformed examples and check that they are indeed processed as intended.


In [0]:
train_uri = transform.outputs['transformed_examples'].get()[1].uri
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
decoder = tfdv.TFExampleDecoder()
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = decoder.decode(serialized_example)
  pp.pprint(example)

The Trainer Component

The Trainer component trains models using TensorFlow.

Create a Python module containing a trainer_fn function, which must return an estimator. If you prefer creating a Keras model, you can do so and then convert it to an estimator using keras.model_to_estimator().


In [0]:
# Setup paths.
_trainer_module_file = 'chicago_taxi_trainer.py'

In [0]:
%%writefile {_trainer_module_file}

import tensorflow as tf
import tensorflow_model_analysis as tfma
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from chicago_taxi_constants import *


def transformed_names(keys):
  return [transformed_name(key) for key in keys]


# Tf.Transform considers these features as "raw"
def _get_raw_feature_spec(schema):
  return schema_utils.schema_as_feature_spec(schema).feature_spec


def _gzip_reader_fn(filenames):
  """Small utility returning a record reader that can read gzip'ed files."""
  return tf.data.TFRecordDataset(
      filenames,
      compression_type='GZIP')


def _build_estimator(config, hidden_units=None, warm_start_from=None):
  """Build an estimator for predicting taxi tips

  Args:
    config: tf.estimator.RunConfig defining the runtime environment for the
      estimator (including model_dir).
    hidden_units: [int], the layer sizes of the DNN (input layer first)
    warm_start_from: Optional directory to warm start from.

  Returns:
    The estimator that will be used for training and eval.
  """
  real_valued_columns = [
      tf.feature_column.numeric_column(key, shape=())
      for key in transformed_names(DENSE_FLOAT_FEATURE_KEYS)
  ]
  categorical_columns = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=VOCAB_SIZE + OOV_SIZE, default_value=0)
      for key in transformed_names(VOCAB_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=FEATURE_BUCKET_COUNT, default_value=0)
      for key in transformed_names(BUCKET_FEATURE_KEYS)
  ]
  categorical_columns += [
      tf.feature_column.categorical_column_with_identity(
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              transformed_names(CATEGORICAL_FEATURE_KEYS),
              MAX_CATEGORICAL_FEATURE_VALUES)
  ]
  return tf.estimator.DNNLinearCombinedRegressor(
      config=config,
      linear_feature_columns=categorical_columns,
      dnn_feature_columns=real_valued_columns,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25],
      warm_start_from=warm_start_from)


def _example_serving_receiver_fn(tf_transform_graph, schema):
  """Build the serving in inputs.

  Args:
    tf_transform_graph: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    Tensorflow graph which parses examples, applying tf-transform to them.
  """
  raw_feature_spec = _get_raw_feature_spec(schema)
  raw_feature_spec.pop(LABEL_KEY)

  raw_input_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
      raw_feature_spec, default_batch_size=None)
  serving_input_receiver = raw_input_fn()

  transformed_features = tf_transform_graph.transform_raw_features(
      serving_input_receiver.features)

  return tf.estimator.export.ServingInputReceiver(
      transformed_features, serving_input_receiver.receiver_tensors)


def _eval_input_receiver_fn(tf_transform_graph, schema):
  """Build everything needed for the tf-model-analysis to run the model.

  Args:
    tf_transform_graph: A TFTransformOutput.
    schema: the schema of the input data.

  Returns:
    EvalInputReceiver function, which contains:
      - Tensorflow graph which parses raw untransformed features, applies the
        tf-transform preprocessing operators.
      - Set of raw, untransformed features.
      - Label against which predictions will be compared.
  """
  # Notice that the inputs are raw features, not transformed features here.
  raw_feature_spec = _get_raw_feature_spec(schema)

  serialized_tf_example = tf.compat.v1.placeholder(
      dtype=tf.string, shape=[None], name='input_example_tensor')

  # Add a parse_example operator to the tensorflow graph, which will parse
  # raw, untransformed, tf examples.
  features = tf.io.parse_example(serialized_tf_example, raw_feature_spec)

  # Now that we have our raw examples, process them through the tf-transform
  # function computed during the preprocessing step.
  transformed_features = tf_transform_graph.transform_raw_features(
      features)

  # The key name MUST be 'examples'.
  receiver_tensors = {'examples': serialized_tf_example}

  # NOTE: Model is driven by transformed features (since training works on the
  # materialized output of TFT, but slicing will happen on raw features.
  features.update(transformed_features)

  return tfma.export.EvalInputReceiver(
      features=features,
      receiver_tensors=receiver_tensors,
      labels=transformed_features[transformed_name(LABEL_KEY)])


def _input_fn(filenames, tf_transform_graph, batch_size=200):
  """Generates features and labels for training or evaluation.

  Args:
    filenames: [str] list of CSV files to read data from.
    tf_transform_graph: A TFTransformOutput.
    batch_size: int First dimension size of the Tensors returned by input_fn

  Returns:
    A (features, indices) tuple where features is a dictionary of
      Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_graph.transformed_feature_spec().copy())

  dataset = tf.data.experimental.make_batched_features_dataset(
      filenames, batch_size, transformed_feature_spec, reader=_gzip_reader_fn)

  transformed_features = dataset.make_one_shot_iterator().get_next()
  # We pop the label because we do not want to use it as a feature while we're
  # training.
  return transformed_features, transformed_features.pop(
      transformed_name(LABEL_KEY))


# TFX will call this function
def trainer_fn(hparams, schema):
  """Build the estimator using the high level API.
  Args:
    hparams: Holds hyperparameters used to train the model as name/value pairs.
    schema: Holds the schema of the training examples.
  Returns:
    A dict of the following:
      - estimator: The estimator that will be used for training and eval.
      - train_spec: Spec for training.
      - eval_spec: Spec for eval.
      - eval_input_receiver_fn: Input function for eval.
  """
  # Number of nodes in the first layer of the DNN
  first_dnn_layer_size = 100
  num_dnn_layers = 4
  dnn_decay_factor = 0.7

  train_batch_size = 40
  eval_batch_size = 40

  tf_transform_graph = tft.TFTransformOutput(hparams.transform_output)

  train_input_fn = lambda: _input_fn(
      hparams.train_files,
      tf_transform_graph,
      batch_size=train_batch_size)

  eval_input_fn = lambda: _input_fn(
      hparams.eval_files,
      tf_transform_graph,
      batch_size=eval_batch_size)

  train_spec = tf.estimator.TrainSpec(
      train_input_fn,
      max_steps=hparams.train_steps)

  serving_receiver_fn = lambda: _example_serving_receiver_fn(
      tf_transform_graph, schema)

  exporter = tf.estimator.FinalExporter('chicago-taxi', serving_receiver_fn)
  eval_spec = tf.estimator.EvalSpec(
      eval_input_fn,
      steps=hparams.eval_steps,
      exporters=[exporter],
      name='chicago-taxi-eval')

  run_config = tf.estimator.RunConfig(
      save_checkpoints_steps=999, keep_checkpoint_max=1)

  run_config = run_config.replace(model_dir=hparams.serving_model_dir)

  estimator = _build_estimator(
      # Construct layers sizes with exponetial decay
      hidden_units=[
          max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layers)
      ],
      config=run_config,
      warm_start_from=hparams.warm_start_from)

  # Create an input receiver for TFMA processing
  receiver_fn = lambda: _eval_input_receiver_fn(
      tf_transform_graph, schema)

  return {
      'estimator': estimator,
      'train_spec': train_spec,
      'eval_spec': eval_spec,
      'eval_input_receiver_fn': receiver_fn
  }

Create and run the Trainer component.


In [0]:
# Uses user-provided Python function that implements a model using TensorFlow.
trainer = Trainer(
    module_file=_trainer_module_file,
    transformed_examples=transform.outputs['transformed_examples'],
    schema=infer_schema.outputs['schema'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)

Take a peek at the trained model which was exported from Trainer.


In [0]:
train_uri = trainer.outputs['model'].get()[0].uri
serving_model_path = os.path.join(train_uri, 'serving_model_dir', 'export', 'chicago-taxi')
latest_serving_model_path = os.path.join(serving_model_path, max(os.listdir(serving_model_path)))
exported_model = tf.saved_model.load(latest_serving_model_path)

In [0]:
exported_model.graph.get_operations()[:10] + ["..."]

Analyze Training with TensorBoard

Use TensorBoard to analyze the model training that was done in Trainer, and see how well our model trained.


In [0]:
%load_ext tensorboard

In [0]:
%tensorboard --bind_all --logdir {os.path.join(train_uri, 'serving_model_dir')}

The Evaluator Component

The Evaluator component analyzes model performance using the TensorFlow Model Analysis library. It runs inference requests on particular subsets of the test dataset, based on which slices are defined by the developer. Knowing which slices should be analyzed requires domain knowledge of what is imporant in this particular use case or domain.

Create and run an Evaluator component.


In [0]:
# Uses TFMA to compute a evaluation statistics over features of a model.
model_analyzer = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    feature_slicing_spec=evaluator_pb2.FeatureSlicingSpec(specs=[
        evaluator_pb2.SingleSlicingSpec(
            column_for_slicing=['weekday'])
    ]))
context.run(model_analyzer)

In [0]:
model_analyzer.outputs

Use the Evaluator results to generate model performance data which can be visualized. First create evaluation input data.


In [0]:
import csv

BASE_DIR = tempfile.mkdtemp()

reader = csv.DictReader(open(_data_filepath))
examples = []
for line in reader:
  example = tf.train.Example()
  for feature in schema.feature:
    key = feature.name
    if len(line[key]) > 0:
      if feature.type == schema_pb2.FLOAT:
        example.features.feature[key].float_list.value[:] = [float(line[key])]
      elif feature.type == schema_pb2.INT:
        example.features.feature[key].int64_list.value[:] = [int(line[key])]
      elif feature.type == schema_pb2.BYTES:
        example.features.feature[key].bytes_list.value[:] = [line[key].encode('utf8')]
    else:
      if feature.type == schema_pb2.FLOAT:
        example.features.feature[key].float_list.value[:] = []
      elif feature.type == schema_pb2.INT:
        example.features.feature[key].int64_list.value[:] = []
      elif feature.type == schema_pb2.BYTES:
        example.features.feature[key].bytes_list.value[:] = []
  examples.append(example)

TFRecord_file = os.path.join(BASE_DIR, 'train_data.rio')
with tf.io.TFRecordWriter(TFRecord_file) as writer:
  for example in examples:
    writer.write(example.SerializeToString())
  writer.flush()
  writer.close()

!ls {TFRecord_file}

Run the analysis of a particular slice of data.


In [0]:
def run_and_render(eval_model=None, slice_list=None, slice_idx=0):
  """Runs the model analysis and renders the slicing metrics

  Args:
      eval_model: An instance of tf.saved_model saved with evaluation data
      slice_list: A list of tfma.slicer.SingleSliceSpec giving the slices
      slice_idx: An integer index into slice_list specifying the slice to use

  Returns:
      A SlicingMetricsViewer object if in Jupyter notebook; None if in Colab.
  """
  eval_result = tfma.run_model_analysis(eval_shared_model=eval_model,
                                        data_location=TFRecord_file,
                                        file_format='tfrecords',
                                        slice_spec=slice_list,
                                        output_path='sample_data',
                                        extractors=None)
  return tfma.view.render_slicing_metrics(eval_result, slicing_spec=slice_list[slice_idx] if slice_list else None)

# Load the TFMA results for the first training run
# This will take a minute
eval_model_base_dir_0 = os.path.join(train_uri, 'eval_model_dir')
eval_model_dir_0 = os.path.join(eval_model_base_dir_0,
                                max(os.listdir(eval_model_base_dir_0)))
eval_shared_model_0 = tfma.default_eval_shared_model(
    eval_saved_model_path=eval_model_dir_0)

# Slice our data by the trip_start_hour feature
slices = [tfma.slicer.SingleSliceSpec(columns=['trip_start_hour'])]

run_and_render(eval_model=eval_shared_model_0, slice_list=slices, slice_idx=0)

Print the slicing metrics.


In [0]:
evaluation_uri = model_analyzer.outputs['output'].get()[0].uri
eval_result = tfma.load_eval_result(evaluation_uri)

print('{}\n\nslicing_metrics:\n'.format(eval_result))

for metric in eval_result.slicing_metrics:
  pp.pprint(metric)

Examine the output data.


In [0]:
eval_path_uri = model_analyzer.outputs['output'].get()[0].uri
tfrecord_filenames = [os.path.join(eval_path_uri, name)
                      for name in os.listdir(eval_path_uri)]
pp.pprint(tfrecord_filenames)
dataset = tf.data.TFRecordDataset(tfrecord_filenames)
pp.pprint(dataset)

The ModelValidator Component

The ModelValidator component performs validation of your candidate model compared to the previously deployed model (if any) using criteria that you define, or to a baseline value. If the new model scores better than the previous model it will be "blessed" by ModelValidator, approving it for deployment.


In [0]:
# Performs quality validation of a candidate model (compared to a baseline).
model_validator = ModelValidator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'])
context.run(model_validator)

Examine the output of ModelValidator.


In [0]:
model_validator.outputs

In [0]:
blessing_uri = model_validator.outputs['blessing'].get()[0].uri
!ls -l {blessing_uri}

The Pusher Component

The Pusher component checks whether a model has been "blessed", and if so, deploys it to production by pushing the model to a well known file destination.


In [0]:
# Setup serving path
_serving_model_dir = os.path.join(tempfile.mkdtemp(),
                                  'serving_model/chicago_taxi_simple')

Create and run a Pusher component.


In [0]:
# Checks whether the model passed the validation steps and pushes the model
# to a file destination if check passed.
pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=model_validator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
context.run(pusher)

Examine the output of Pusher.


In [0]:
pusher.outputs

In [0]:
push_uri = pusher.outputs['pushed_model'].get()[0].uri
latest_version = max(os.listdir(push_uri))
latest_version_path = os.path.join(push_uri, latest_version)
model = tf.saved_model.load(latest_version_path)

In [0]:
for item in model.signatures.items():
  pp.pprint(item)

TensorFlow Serving

Now that we have a trained model that has been blessed by ModelValidator, and pushed to our deployment target by Pusher, we can load it into TensorFlow Serving and start serving inference requests.

Examine your saved model

We'll use the command line utility saved_model_cli to look at the MetaGraphDefs (the models) and SignatureDefs (the methods you can call) in our SavedModel. See this discussion of the SavedModel CLI in the TensorFlow Guide.


In [0]:
latest_pushed_model = os.path.join(_serving_model_dir, max(os.listdir(_serving_model_dir)))
!saved_model_cli show --dir {latest_pushed_model} --all

That tells us a lot about our model! In this case we just trained our model, so we already know the inputs and outputs, but if we didn't this would be important information. It doesn't tell us everything, but it's a great start.

Add TensorFlow Serving distribution URI as a package source:

We're preparing to install TensorFlow Serving using Aptitude since this Colab runs in a Debian environment. We'll add the tensorflow-model-server package to the list of packages that Aptitude knows about. Note that we're running as root.

Note: This example is running TensorFlow Serving natively, but you can also run it in a Docker container, which is one of the easiest ways to get started using TensorFlow Serving.


In [0]:
# This is the same as you would do from your command line, but without the [arch=amd64], and no sudo
# You would instead do:
# echo "deb [arch=amd64] http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" | sudo tee /etc/apt/sources.list.d/tensorflow-serving.list && \
# curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | sudo apt-key add -

!echo "deb http://storage.googleapis.com/tensorflow-serving-apt stable tensorflow-model-server tensorflow-model-server-universal" | tee /etc/apt/sources.list.d/tensorflow-serving.list && \
curl https://storage.googleapis.com/tensorflow-serving-apt/tensorflow-serving.release.pub.gpg | apt-key add -
!apt update

Install TensorFlow Serving

This is all you need - one command line! Please note that running TensorFlow Serving in a Docker Container is also a great option, with a lot of advantages.


In [0]:
!apt-get install tensorflow-model-server

Start running TensorFlow Serving

This is where we start running TensorFlow Serving and load our model. After it loads we can start making inference requests using REST. There are some important parameters:

  • rest_api_port: The port that you'll use for REST requests.
  • model_name: You'll use this in the URL of REST requests. It can be anything.
  • model_base_path: This is the path to the directory where you've saved your model. Note that this base_path should not include the model version directory, which is why we split it off below.

In [0]:
os.environ["MODEL_DIR"] = os.path.split(latest_pushed_model)[0]

In [0]:
%%bash --bg 
nohup tensorflow_model_server \
  --rest_api_port=8501 \
  --model_name=chicago_taxi_simple \
  --model_base_path="${MODEL_DIR}" >server.log 2>&1

In [0]:
!tail server.log

Prepare data for inference requests

Our example data is stored in a CSV file on disk. We first have to read the file and decode the examples from CSV, and then encode these as Example protos to feed to Tensorflow Serving.

A few notes:

  • The regress and classify APIs are higher-level and thus encouraged to be used over predict - here we use the predict API to showcase the more involved route.
  • While the regress and classify APIs expect and can parse tf.Example, the predict API expects arbitrary TensorProto. This means we will have to construct the tf.Example proto using coders from Tensorflow Transform.
  • The REST API surface accepts JSON, which uses UTF-8 encoding. Thus to access the model via REST, we will encode our serialized tf.Example using Base64.
  • This is quite complicated and in general, if using the predict API, you should strongly consider using the gRPC API surface.

In [0]:
import base64
import json
import requests

from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import schema_utils

from tensorflow_transform import coders as tft_coders

from chicago_taxi_constants import * 

def _get_raw_feature_spec(schema):
  return schema_utils.schema_as_feature_spec(schema).feature_spec

def _make_proto_coder(schema):
  raw_feature_spec = _get_raw_feature_spec(schema)
  raw_schema = dataset_schema.from_feature_spec(raw_feature_spec)
  return tft_coders.ExampleProtoCoder(raw_schema)

def _make_csv_coder(schema, column_names):
  """Return a coder for tf.transform to read csv files."""
  raw_feature_spec = _get_raw_feature_spec(schema)
  parsing_schema = dataset_schema.from_feature_spec(raw_feature_spec)
  return tft_coders.CsvCoder(column_names, parsing_schema)

def make_serialized_examples(examples_csv_file, num_examples, schema):
  """Parses examples from CSV file and returns seralized proto examples."""
  filtered_features = [
      feature for feature in schema.feature if feature.name != LABEL_KEY
  ]
  del schema.feature[:]
  schema.feature.extend(filtered_features)

  columns = tfx.utils.io_utils.load_csv_column_names(examples_csv_file)
  csv_coder = _make_csv_coder(schema, columns)
  proto_coder = _make_proto_coder(schema)

  input_file = open(examples_csv_file, 'r')
  input_file.readline()  # skip header line

  serialized_examples = []
  for _ in range(num_examples):
    one_line = input_file.readline()
    if not one_line:
      print('End of example file reached')
      break
    one_example = csv_coder.decode(one_line)

    serialized_example = proto_coder.encode(one_example)
    serialized_examples.append(serialized_example)
  
  return serialized_examples

Perform Inference on example data

Prepare the example data using the utility defined above and batch all requests together to send a single REST API call to Tensorflow Serving.


In [0]:
def do_inference(server_addr, model_name, serialized_examples):
  """Sends requests to the model and prints the results.
  Args:
    server_addr: network address of model server in "host:port" format
    model_name: name of the model as understood by the model server
    serialized_examples: serialized examples of data to do inference on
  """
  parsed_server_addr = server_addr.split(':')

  host=parsed_server_addr[0]
  port=parsed_server_addr[1]
  json_examples = []
  
  for serialized_example in serialized_examples:
    # The encoding follows the guidelines in:
    # https://www.tensorflow.org/tfx/serving/api_rest
    example_bytes = base64.b64encode(serialized_example).decode('utf-8')
    predict_request = '{ "b64": "%s" }' % example_bytes
    json_examples.append(predict_request)

  json_request = '{ "instances": [' + ','.join(map(str, json_examples)) + ']}'

  server_url = 'http://' + host + ':' + port + '/v1/models/' + model_name + ':predict'
  response = requests.post(
      server_url, data=json_request, timeout=5.0)
  response.raise_for_status()
  prediction = response.json()
  print(json.dumps(prediction, indent=4))

In [0]:
serialized_examples = make_serialized_examples(
    examples_csv_file=_data_filepath, 
    num_examples=3, 
    schema=schema)

do_inference(server_addr='127.0.0.1:8501', 
     model_name='chicago_taxi_simple',
     serialized_examples=serialized_examples)